2
\$\begingroup\$

Code

@Service
@Slf4j
public class SpendingServiceImpl implements SpendingService {
 private final SpendingGroupRepository spendingGroupRepository;
 private final SpendingRepository spendingRepository;
 private final IdempotencyRepository idempotencyRepository;
 private final TransactionTemplate transactionTemplate;
 public SpendingServiceImpl(SpendingGroupRepository spendingGroupRepository, SpendingRepository spendingRepository, IdempotencyRepository idempotencyRepository, PlatformTransactionManager platformTransactionManager) {
 this.spendingGroupRepository = spendingGroupRepository;
 this.spendingRepository = spendingRepository;
 this.idempotencyRepository = idempotencyRepository;
 this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
 this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
 }
 @Override
 public void addSpending(SpendingMessageDto spendingMessageDto) {
 SpendingGroup spendingGroup;
 try {
 spendingGroup = spendingGroupRepository
 .findByExternalId(spendingMessageDto.getSpendingGroupId())
 .orElseGet(() -> spendingGroupRepository.save(SpendingGroup.builder()
 .externalId(spendingMessageDto.getSpendingGroupId())
 .description("Banana").build()));
 } catch (DataIntegrityViolationException e) {
 // Parallel Insert Violates Unique Key, get the entry
 spendingGroup = spendingGroupRepository.findByExternalId(spendingMessageDto.getSpendingGroupId()).orElseThrow();
 }
 if (spendingGroup == null) {
 throw new RuntimeException(""); // TODO: Real exception
 }
 Idempotency idempotency;
 try {
 // Needs transaction for LOB (TEXT)
 idempotency = transactionTemplate.execute(transactionStatus -> idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey())
 .orElseGet(() -> idempotencyRepository.save(Idempotency.builder()
 .idempotencyKey(spendingMessageDto.getIdempotencyKey())
 .status(IdempotencyStatus.CREATED)
 .build())));
 } catch (DataIntegrityViolationException e) {
 // Parallel Insert Violates Unique Key, get the entry
 idempotency = idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey()).orElseThrow();
 }
 if (idempotency == null) {
 throw new RuntimeException(""); // TODO: Real exception
 }
 switch (idempotency.getStatus()) {
 case ERROR_NOT_RETRYABLE, FINISHED -> {
 log.info("Returning response: " + idempotency.getStatus() + " - " + idempotency.getResponse());
 return;
 }
 }
 // Now try to lock the row
 final SpendingGroup spendingGroup1 = spendingGroup;
 transactionTemplate.executeWithoutResult(transactionStatus -> {
 // PESSIMISTIC_WRITE, SHOULD block all others with SELECT FOR UPDATE
 Idempotency idm = idempotencyRepository.findByIdempotencyKeyPW(spendingMessageDto.getIdempotencyKey()).orElseThrow();
 switch (idm.getStatus()) {
 case ERROR_NOT_RETRYABLE, FINISHED -> {
 log.info("Returning response: " + idm.getStatus() + " - " + idm.getResponse());
 return;
 }
 }
 log.warn("-----------PROCESSING----------");
 Spending spending = Spending.builder()
 .spendingGroupId(spendingGroup1.getId())
 .amount(new BigDecimal(spendingMessageDto.getAmount()))
 .build();
 spendingRepository.save(spending);
 idm.setStatus(IdempotencyStatus.FINISHED);
 idm.setCode(200);
 idm.setResponse("Ok");
 });
 }
}

What is the goal?

There are many parallel messages delivered from a message queue. These messages contain a "payment" and follow the pattern amount=11.11;spending_group=adeu47;idempotency_key={uuid}

So basically Payment Group is a collector for payments. And each payment shall only be inserted exactly once. But the message broker has a retry mechanism when a message isn't acknowledged in time.

Current Design

  1. For every message, check if the spending group exists. If not insert it.
    • This can fail if a parallel process also reads and tries to insert. Then the solution is to read what was written. Is catching the UniqueKeyViolation the right thing here?
  2. Same with the idempotency key, get or create, catch the UniqueKeyViolation
  3. If the idempotency request is in a final state (basically was sent already), return the saved response.
  4. If the idempotency request is in status new or retryable, select again, this time with a pessimistic write lock.
    • Is there any better way do to this? The double select seems weird. But I only need a lock when the state isn't finished.
  5. While holding the row lock, do the processing. then persist the response and set the status to FINISHED.

Additional questions

  1. I tried the design in a concurrent test case and it seemed to work. Is there any obvious oversight?
pacmaninbw
26.1k13 gold badges47 silver badges113 bronze badges
asked Feb 3, 2021 at 14:07
\$\endgroup\$

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.