\$\begingroup\$
\$\endgroup\$
Code
@Service
@Slf4j
public class SpendingServiceImpl implements SpendingService {
private final SpendingGroupRepository spendingGroupRepository;
private final SpendingRepository spendingRepository;
private final IdempotencyRepository idempotencyRepository;
private final TransactionTemplate transactionTemplate;
public SpendingServiceImpl(SpendingGroupRepository spendingGroupRepository, SpendingRepository spendingRepository, IdempotencyRepository idempotencyRepository, PlatformTransactionManager platformTransactionManager) {
this.spendingGroupRepository = spendingGroupRepository;
this.spendingRepository = spendingRepository;
this.idempotencyRepository = idempotencyRepository;
this.transactionTemplate = new TransactionTemplate(platformTransactionManager);
this.transactionTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
}
@Override
public void addSpending(SpendingMessageDto spendingMessageDto) {
SpendingGroup spendingGroup;
try {
spendingGroup = spendingGroupRepository
.findByExternalId(spendingMessageDto.getSpendingGroupId())
.orElseGet(() -> spendingGroupRepository.save(SpendingGroup.builder()
.externalId(spendingMessageDto.getSpendingGroupId())
.description("Banana").build()));
} catch (DataIntegrityViolationException e) {
// Parallel Insert Violates Unique Key, get the entry
spendingGroup = spendingGroupRepository.findByExternalId(spendingMessageDto.getSpendingGroupId()).orElseThrow();
}
if (spendingGroup == null) {
throw new RuntimeException(""); // TODO: Real exception
}
Idempotency idempotency;
try {
// Needs transaction for LOB (TEXT)
idempotency = transactionTemplate.execute(transactionStatus -> idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey())
.orElseGet(() -> idempotencyRepository.save(Idempotency.builder()
.idempotencyKey(spendingMessageDto.getIdempotencyKey())
.status(IdempotencyStatus.CREATED)
.build())));
} catch (DataIntegrityViolationException e) {
// Parallel Insert Violates Unique Key, get the entry
idempotency = idempotencyRepository.findByIdempotencyKey(spendingMessageDto.getIdempotencyKey()).orElseThrow();
}
if (idempotency == null) {
throw new RuntimeException(""); // TODO: Real exception
}
switch (idempotency.getStatus()) {
case ERROR_NOT_RETRYABLE, FINISHED -> {
log.info("Returning response: " + idempotency.getStatus() + " - " + idempotency.getResponse());
return;
}
}
// Now try to lock the row
final SpendingGroup spendingGroup1 = spendingGroup;
transactionTemplate.executeWithoutResult(transactionStatus -> {
// PESSIMISTIC_WRITE, SHOULD block all others with SELECT FOR UPDATE
Idempotency idm = idempotencyRepository.findByIdempotencyKeyPW(spendingMessageDto.getIdempotencyKey()).orElseThrow();
switch (idm.getStatus()) {
case ERROR_NOT_RETRYABLE, FINISHED -> {
log.info("Returning response: " + idm.getStatus() + " - " + idm.getResponse());
return;
}
}
log.warn("-----------PROCESSING----------");
Spending spending = Spending.builder()
.spendingGroupId(spendingGroup1.getId())
.amount(new BigDecimal(spendingMessageDto.getAmount()))
.build();
spendingRepository.save(spending);
idm.setStatus(IdempotencyStatus.FINISHED);
idm.setCode(200);
idm.setResponse("Ok");
});
}
}
What is the goal?
There are many parallel messages delivered from a message queue. These messages contain a "payment" and follow the pattern amount=11.11;spending_group=adeu47;idempotency_key={uuid}
So basically Payment Group is a collector for payments. And each payment shall only be inserted exactly once. But the message broker has a retry mechanism when a message isn't acknowledged in time.
Current Design
- For every message, check if the spending group exists. If not insert it.
- This can fail if a parallel process also reads and tries to insert. Then the solution is to read what was written. Is catching the UniqueKeyViolation the right thing here?
- Same with the idempotency key, get or create, catch the UniqueKeyViolation
- If the idempotency request is in a final state (basically was sent already), return the saved response.
- If the idempotency request is in status new or retryable, select again, this time with a pessimistic write lock.
- Is there any better way do to this? The double select seems weird. But I only need a lock when the state isn't finished.
- While holding the row lock, do the processing. then persist the response and set the status to FINISHED.
Additional questions
- I tried the design in a concurrent test case and it seemed to work. Is there any obvious oversight?
default