0

Building upon my 2 previous questions:

I have the following structure:

  • A Spring Boot Backend that runs Axon without Issues, publishes Events to the Kafka Source, etc

  • A normal application that uses the AxonConnector interface to listen to Events and send Commands to the Backend

This is the implementation of the AxonConnector using Kafka:

@Log4j2
public class KafkaPostgresAxonConnector implements AxonConnector {
 private final String kafkaBootstrapServers;
 private final String postgresUrl;
 private final String postgresUser;
 private final String postgresPassword;
 private final String componentName;
 private final String groupId;
 private final Configurer configurer;
 private final JacksonSerializer serializer;
 private KafkaProducer<String, String> producer;
 private Configuration axonConfig;
 public KafkaPostgresAxonConnector(String kafkaBootstrapServers, String postgresUrl,
 String postgresUser, String postgresPassword, String componentName) {
 this.kafkaBootstrapServers = kafkaBootstrapServers;
 this.postgresUrl = postgresUrl;
 this.postgresUser = postgresUser;
 this.postgresPassword = postgresPassword;
 this.componentName = componentName;
 this.groupId = componentName + UUID.randomUUID();
 serializer = JacksonSerializer.builder()
 .defaultTyping()
 .lenientDeserialization()
 .build();
 configurer = DefaultConfigurer.defaultConfiguration()
 .configureSerializer(c -> serializer)
 .configureEventSerializer(c -> serializer)
 .configureMessageSerializer(c -> serializer);
 }
 @Override
 public void start() {
 DataSource dataSource = dataSource();
 initializePostgres(dataSource);
 initializeKafkaConsumer();
 initializeKafkaProducer();
 log.info("Starting AxonFramework with component: {}", componentName);
 axonConfig = configurer.buildConfiguration();
 axonConfig.start();
 }
 @Override
 public void registerEventHandler(Object eventHandler) {
 configurer.registerEventHandler(conf -> eventHandler);
 }
 @Override
 public <T> void registerAggregate(Class<T> aggregateType) {
 configurer.configureAggregate(aggregateType);
 }
 @Override
 public UUID sendCommand(Object command) {
 if (axonConfig == null) {
 throw new IllegalStateException("Axon Framework not initialized");
 }
 CommandGateway commandGateway = axonConfig.commandGateway();
 return commandGateway.sendAndWait(command);
 }
 @Override
 public <R> R sendQuery(Object query, ResponseType<R> responseType) {
 if (axonConfig == null) {
 log.warn("Cannot send query - Axon Framework not initialized");
 return null;
 }
 QueryGateway queryGateway = axonConfig.queryGateway();
 return queryGateway.query(query, responseType).join();
 }
 @Override
 public void close() {
 if (producer != null) {
 producer.close();
 }
 if (axonConfig != null) {
 axonConfig.shutdown();
 }
 }
 /**
 * KAFKA
 * */
 private KafkaMessageConverter<String, byte[]> messageConverter() {
 return DefaultKafkaMessageConverter.builder()
 .serializer(serializer)
 .build();
 }
 /**
 * KAFKA PRODUCER
 * */
 private void initializeKafkaProducer() {
 ProducerFactory<String, byte[]> producerFactory = producerFactory();
 KafkaMessageConverter<String, byte[]> messageConverter = messageConverter();
 KafkaPublisher<String, byte[]> kafkaPublisher = kafkaPublisher(
 producerFactory,
 messageConverter
 );
 KafkaEventPublisher<String, byte[]> kafkaEventPublisher = kafkaEventPublisher(kafkaPublisher);
 registerPublisherToEventProcessor(configurer.eventProcessing(), kafkaEventPublisher);
 }
 public void registerPublisherToEventProcessor(
 EventProcessingConfigurer eventProcessingConfigurer,
 KafkaEventPublisher<String, byte[]> kafkaEventPublisher
 ) {
 String processingGroup = KafkaEventPublisher.DEFAULT_PROCESSING_GROUP;
 eventProcessingConfigurer
 .registerEventHandler(configuration -> kafkaEventPublisher)
 .registerSubscribingEventProcessor(processingGroup)
 .usingSubscribingEventProcessors();
 }
 private KafkaPublisher<String, byte[]> kafkaPublisher(
 ProducerFactory<String, byte[]> producerFactory,
 KafkaMessageConverter<String, byte[]> kafkaMessageConverter
 ) {
 return KafkaPublisher.<String, byte[]>builder()
 .producerFactory(producerFactory)
 .messageConverter(kafkaMessageConverter)
 .serializer(serializer)
 .build();
 }
 public KafkaEventPublisher<String, byte[]> kafkaEventPublisher(
 KafkaPublisher<String, byte[]> kafkaPublisher
 ) {
 return KafkaEventPublisher.<String, byte[]>builder()
 .kafkaPublisher(kafkaPublisher)
 .build();
 }
 public ProducerFactory<String, byte[]> producerFactory() {
 Map<String, Object> producerProps = new HashMap<>();
 producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
 producerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
 StringSerializer.class);
 producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
 ByteArraySerializer.class);
 return DefaultProducerFactory.<String, byte[]>builder()
 .configuration(producerProps)
 .confirmationMode(ConfirmationMode.NONE)
 .build();
 }
 /**
 * KAFKA CONSUMER
 * */
 private void initializeKafkaConsumer() {
 ConsumerFactory<String, byte[]> consumerFactory = consumerFactory();
 Fetcher<String, byte[], EventMessage<?>> fetcher = fetcher();
 KafkaMessageConverter<String, byte[]> messageConverter = messageConverter();
 KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = kafkaMessageSourceConfigurer(
 configurer);
 SubscribableKafkaMessageSource<String, byte[]> messageSource = subscribableKafkaMessageSource(
 consumerFactory,
 fetcher,
 messageConverter,
 kafkaMessageSourceConfigurer
 );
 configureSubscribableKafkaSource(configurer.eventProcessing(), messageSource);
 }
 private Fetcher<String, byte[], EventMessage<?>> fetcher() {
 return AsyncFetcher.<String, byte[], EventMessage<?>>builder()
 .pollTimeout(1000)
 .executorService(Executors.newSingleThreadExecutor())
 .build();
 }
 private ConsumerFactory<String, byte[]> consumerFactory() {
 Map<String, Object> consumerProps = new HashMap<>();
 consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
 StringDeserializer.class);
 consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 ByteArrayDeserializer.class);
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 return new DefaultConsumerFactory<>(consumerProps);
 }
 private KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer(Configurer configurer) {
 KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer = new KafkaMessageSourceConfigurer();
 configurer.registerModule(kafkaMessageSourceConfigurer);
 return kafkaMessageSourceConfigurer;
 }
 private SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource(
 ConsumerFactory<String, byte[]> consumerFactory,
 Fetcher<String, byte[], EventMessage<?>> fetcher,
 KafkaMessageConverter<String, byte[]> messageConverter,
 KafkaMessageSourceConfigurer kafkaMessageSourceConfigurer
 ) {
 SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource = SubscribableKafkaMessageSource.<String, byte[]>builder()
 .autoStart()
 .groupId(groupId)
 .consumerFactory(consumerFactory)
 .fetcher(fetcher)
 .messageConverter(messageConverter)
 .consumerCount(1)
 .serializer(serializer)
 .build();
 kafkaMessageSourceConfigurer.configureSubscribableSource(
 configuration -> subscribableKafkaMessageSource
 );
 return subscribableKafkaMessageSource;
 }
 private void configureSubscribableKafkaSource(EventProcessingConfigurer eventProcessingConfigurer,
 SubscribableKafkaMessageSource<String, byte[]> subscribableKafkaMessageSource) {
 eventProcessingConfigurer.configureDefaultSubscribableMessageSource(
 configuration -> subscribableKafkaMessageSource
 );
 eventProcessingConfigurer.usingSubscribingEventProcessors();
 }
 /**
 * POSTGRES
 */
 private void initializePostgres(DataSource dataSource) {
 ConnectionProvider connectionProvider = new DataSourceConnectionProvider(dataSource);
 EventSchema schema = EventSchema.builder()
 .eventTable("domain_event_entry")
 .snapshotTable("snapshot_event_entry")
 .build();
 // Initialize Postgres connection
 EventStorageEngine eventStorageEngine = JdbcEventStorageEngine.builder()
 .transactionManager(transactionManager())
 .connectionProvider(connectionProvider)
 .snapshotFilter(SnapshotFilter.allowAll())
 .eventSerializer(serializer)
 .snapshotSerializer(serializer)
 .schema(schema)
 .build();
 configurer.configureEmbeddedEventStore(c -> eventStorageEngine);
 }
 private DataSource dataSource() {
 HikariConfig config = new HikariConfig();
 config.setJdbcUrl(postgresUrl);
 config.setUsername(postgresUser);
 config.setPassword(postgresPassword);
 config.setDriverClassName("org.postgresql.Driver");
 config.addDataSourceProperty("cachePrepStmts", "true");
 config.addDataSourceProperty("prepStmtCacheSize", "250");
 config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
 return new HikariDataSource(config);
 }
 private TransactionManager transactionManager() {
 return NoTransactionManager.INSTANCE;
 }
}

To manually test the AxonConnector, I created this app:

@Log4j2
public class ConnectorTest {
 public static void main(String[] args) {
 AxonConnector connector = AxonConnector.kafka(
 "localhost:29092,localhost:39092,localhost:49092",
 "jdbc:postgresql://localhost:5432/database",
 "postgres-user",
 "postgres-password",
 "Tester-1"
 );
 connector.start();
 connector.registerEventHandler(new Handler());
 UUID id =
 connector.sendCommand(new CreateModuleCommand("TestModule", 10, 1, 5, true, true));
 log.info(connector.sendQuery(new FindModuleByIdQuery(id), ResponseTypes.optionalInstanceOf(ModuleDto.class)));
 // Add Shutdown Hook
 Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 log.info("Shutting down Axon Connector...");
 try {
 connector.close();
 } catch (Exception e) {
 log.error("Error while shutting down Axon Connector", e);
 }
 }));
 }
 @Log4j2
 public static class Handler {
 @EventHandler
 public void on(ModuleCreatedEvent event) {
 System.out.println("Module Created: " + event.getModule().getId());
 }
 }
}

The Containers are running like follows:
Docker Containers db-1, controller 1 to 3, broker 1 to 3, kafka-ui running with open ports on db-1: 5432, broker2: 39092, broker3: 49092, broker1: 29092, kafka-ui: 8088

Issue 1

But running the main function throws the following exception:

Exception in thread "main" org.axonframework.commandhandling.NoHandlerForCommandException: No handler was subscribed for command [de.cronixzero.test.messages.commands.modules.CreateModuleCommand].
 at org.axonframework.commandhandling.SimpleCommandBus.doDispatch(SimpleCommandBus.java:167)
 at org.axonframework.commandhandling.SimpleCommandBus.lambda$dispatch1ドル(SimpleCommandBus.java:131)
 at org.axonframework.tracing.Span.run(Span.java:101)
 at org.axonframework.commandhandling.SimpleCommandBus.dispatch(SimpleCommandBus.java:125)
 at org.axonframework.commandhandling.gateway.AbstractCommandGateway.send(AbstractCommandGateway.java:76)
 at org.axonframework.commandhandling.gateway.DefaultCommandGateway.send(DefaultCommandGateway.java:83)
 at org.axonframework.commandhandling.gateway.DefaultCommandGateway.sendAndWait(DefaultCommandGateway.java:100)
 at de.cronixzero.test.messages.connector.kafka.KafkaPostgresAxonConnector.sendCommand(KafkaPostgresAxonConnector.java:136)
 at de.cronixzero.test.messages.testing.ConnectorTest.main(ConnectorTest.java:30)

ConnectorTest.java:30

UUID id = connector.sendCommand(new CreateModuleCommand("TestModule", 10, 1, 5, true, true));

KafkaPostgresAxonConnector.java:136:

CommandGateway commandGateway = axonConfig.commandGateway();
return commandGateway.sendAndWait(command);

What is it, that I'm doing wrong here?

Issue 2

Running the Java Debugger shows that the @EventHandler annotated method never gets reached, even though the application seems to be connected to the kafka source and is even registered as a Consumer.

Why is this?

Issue 3

Just now, I noticed that once I start the TestApplication, the messageCount in the Kafka Topic ramps up astronomically (The same messages seem to be resent constantly)

Table showing multiple repeating kafka messages, always with the same id and body while the offset keeps ramping up to 200.000 messages in short succession

They are absolutely identical in their content and easily ramp up to 200.000 repititions in just a couple of minutes

asked Apr 19, 2025 at 12:37
2
  • Could you share where you got this AxonConnector from? As a maintainer of the Axon Framework, I know for a fact that that isn't a class that exists. Without knowing the origin of the class you're using, it is hard to give any suggestion on the way forward. Commented Apr 28, 2025 at 9:46
  • @Steven Just a simple interface: toptal.com/developers/hastebin I did find out, though, that I wasn't using the DistributedCommandBus which caused Axon not to find the CommandHandlers. That'd explain issue 1. Commented May 1, 2025 at 7:53

1 Answer 1

0

For the distribution of command messages in Axon Framework, you would need to us a distributed version of the CommandBus. There are roughly two options you have for this:

1. Axon Server
2. The DistributedCommandBus with Spring Cloud Service Discovery
3. The DistributedCommandBus with JGroups

Using Axon Server is arguably the easiest way to set it up seamlessly. Furthermore, it would mean you also have a dedicated Event Store solution, distributed event messaging, and distributed query messaging. Alongside all other features given by Axon Server to simplify things. Running a single Axon Server instance is very straightforward. You could also follow the setup process on AxonIQ Console to simplify it further. Note that Axon Server is not necessarily a paid tool.

If you feel like doing more work, you can try out the Spring Cloud or JGroups extensions from Axon Framework, in combination with a DistributedCommandBus. For the former, you will need to select a preferred Spring Cloud Service Discovery solution, like Eureka, Consul, Kubernetes, or ZooKeeper. For JGroups you would, well, setup JGroups as the service discovery.

answered May 15, 2025 at 9:09
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.