There are very few example on how to implement saga pattern with Kafka so I tried to come up with my own implementation choreography saga pattern using Kafka. I need to implement outbox pattern as well I guess.
public interface SagaPattern<M extends Message<?>> {
void beforeStart();
void beforeEnd();
}
public interface ChoreographySaga<M extends Message<E>, E> extends SagaPattern<M> {
void on(E event);
void compensate(E event);
}
abstract class AbstractChoreographySaga<E> implements ChoreographySaga<Message<E>, E> {
@Override
public void on(E event) {
try {
beforeStart();
onTransaction(event);
beforeEnd();
}
catch (Exception error) {
compensate(event);
}
}
abstract void onTransaction(E event);
@Override
public void beforeStart() {
// do nothing yet
}
@Override
public void beforeEnd() {
// do nothing yet
}
}
public class OrderPlacementSaga extends AbstractChoreographySaga<OrderCreatedEvent> {
private final MessageBroker<Message<?>> messageBroker;
private final OrderRepository orderRepository;
public OrderPlacementSaga(MessageBroker<Message<?>> messageBroker, OrderRepository orderRepository) {
this.messageBroker = messageBroker;
this.orderRepository = orderRepository;
}
@Override
@MessageConsumer(topics = KafkaTopic.ORDER, groupId = KafkaGroup.ORDER_GROUP)
public void on(OrderCreatedEvent event) {
super.on(event);
}
@Override
void onTransaction(OrderCreatedEvent event) {
// todo outbox pattern
var order = Order.fromEvent(event);
orderRepository.save(order);
messageBroker.publish(
KafkaMessage.asGenericMessage(
KafkaTopic.ORDER,
PaymentCreatedEvent.fromOrder(order)));
}
@Override
public void compensate(OrderCreatedEvent event) {
orderRepository.deleteById(event.getOrderId());
}
}
1 Answer 1
This part of on()
seems like trouble:
catch (Exception error) {
compensate(event);
}
I'm sympathetic for the need "to keep going!",
so the broad Exception
umbrella makes sense.
But ignoring error
makes me nervous.
We'd want to at least know what the top two or three
error types are, yet we don't log it
nor even offer it as a compensate()
argument.
-
\$\begingroup\$ tank you for your response, I do have some additional questions. do you think it is a bad idea to implement stateful saga I mean it would be able to store some state during transaction. \$\endgroup\$Kazul Haram– Kazul Haram2023年08月22日 03:26:57 +00:00Commented Aug 22, 2023 at 3:26
-
\$\begingroup\$ A bad idea? No, not at all. By all means, persist those logs! I suspect I do not fully realize what issues you are grappling with, it must be more complex than a one-sentence question. The idea behind saga is to record that historic events (components of a complex transaction) definitely happened, such that subsequent observers can easily verify that and act on that. Compensating or reconciling conflicts downstream can be "hard", but recording that an event happened? That's easy. \$\endgroup\$J_H– J_H2023年08月22日 03:44:54 +00:00Commented Aug 22, 2023 at 3:44