0

I'm trying to configure a single consumer function to handle messages from two different Kafka topics using Spring Cloud Stream. The application only consumes from the first topic (consumeMessage-in-0) but completely ignores the second topic (consumeMessage-in-1). Both topics carry the same data structure, but I need to use different consumer groups for each topic due to business requirements.

application.yml

spring:
 cloud:
 function:
 definition: consumeMessage
 stream:
 binders:
 local-bkr:
 type: kafka
 environment:
 spring.cloud.stream:
 kafka.binder:
 brokers: localhost:9092
 autoCreateTopics: false
 consumerProperties:
 key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
 value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
 bindings:
 consumeMessage-in-0:
 destination: topic-1
 group: group-1
 binder: local-bkr
 consumer:
 concurrency: 3
 consumeMessage-in-1:
 destination: topic-2
 group: group-2
 binder: local-bkr
 consumer:
 concurrency: 3

Consumer Function

@Configuration
public class MessageConsumer {
 private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
 @Bean
 Consumer<Message<String>> consumeMessage() {
 return stringMessage -> {
 var receivedTopic = stringMessage.getHeaders().get("kafka_receivedTopic");
 log.info("Received from topic {}: {}", receivedTopic, stringMessage);
 };
 }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-parent</artifactId>
 <version>3.5.5</version>
 <relativePath/>
 </parent>
 <groupId>com.example</groupId>
 <artifactId>demo</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>demo</name>
 <description>Demo project for Spring Boot</description>
 <properties>
 <java.version>21</java.version>
 <spring-cloud.version>202500</spring-cloud.version>
 </properties>
 <dependencies>
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-stream-binder-kafka</artifactId>
 </dependency>
 <dependency>
 <groupId>org.springframework.kafka</groupId>
 <artifactId>spring-kafka</artifactId>
 </dependency>
 </dependencies>
 <dependencyManagement>
 <dependencies>
 <dependency>
 <groupId>org.springframework.cloud</groupId>
 <artifactId>spring-cloud-dependencies</artifactId>
 <version>${spring-cloud.version}</version>
 <type>pom</type>
 <scope>import</scope>
 </dependency>
 </dependencies>
 </dependencyManagement>
</project>

Problem

  • The application starts successfully and consumes messages from topic-1 (the first binding), but it completely ignores messages from topic-2 (the second binding).

I can see in the logs that:

  • Messages from topic-1 are being processed
  • No messages from topic-2 are being processed (even though I can confirm messages are being published to that topic)
  • Consumer config for consumeMessage-in-1 is not considered during startup - only the first binding appears to be registered
  • No errors are thrown during startup or runtime

What I Know Works (But Want to Avoid)

I'm aware that creating separate consumer function definitions works:

spring:
 cloud:
 function:
 definition: consumeMessage1;consumeMessage2
@Bean
Consumer<Message<String>> consumeMessage1() { /* same logic */ }
@Bean
Consumer<Message<String>> consumeMessage2() { /* same logic */ }

However, this approach would require a code change every time a new topic needs to be added, which is not ideal for our use case where topics might be added dynamically through configuration only.

Questions

  1. Why is Spring Cloud Stream only binding to the first input (consumeMessage-in-0) and ignoring the second (consumeMessage-in-1)?
  2. Is it possible to have multiple input bindings for a single Consumer function? Or does Spring Cloud Stream limitation require separate consumer functions for each topic?
  3. What's the correct configuration pattern for consuming from multiple topics with the same processing logic but different consumer groups?

Environment

  • Spring Boot 3.5.5
  • Spring Cloud 202500
  • Java 21
  • Kafka binder

Is this a known limitation of Spring Cloud Stream, or am I missing something in my configuration?

asked Oct 7 at 5:33

2 Answers 2

0

I have a small example that can maybe help you. I have stream of orders as input and also users ktable. I am enriching orders with users, and then i have 2 outputs

@Configuration
public class BindingKafkaConfiguration {
 @Bean
 public BiFunction<KStream<String, Order>, KTable<String, User>, KStream<?, ?>[]> process() {
 return (KStream<String, Order> orders, KTable<String, User> users) -> {
 KStream<String, EnrichedOrder> enrichedOrders = orders
 .leftJoin(users, (Order order, User user) -> {
 EnrichedOrder enrichedOrder = new EnrichedOrder();
 enrichedOrder.setOrderId(order.getOrderId());
 enrichedOrder.setAmount(order.getAmount());
 enrichedOrder.setStatus(order.getStatus());
 if (user != null) {
 enrichedOrder.setUserId(user.getUserId());
 enrichedOrder.setName(user.getName());
 return enrichedOrder;
 }
 return enrichedOrder;
 });
 KStream<String, String> dump = enrichedOrders.mapValues(EnrichedOrder::toString);
 return new KStream<?, ?>[] {enrichedOrders, dump};
 };
 }
}
spring:
 cloud:
 stream:
 bindings:
 process-in-0:
 destination: PLAYGROUND_ORDER
 process-in-1:
 destination: PLAYGROUND_USER
 process-out-0:
 destination: PLAYGROUND_ENRICHED_ORDER
 process-out-1:
 destination: PLAYGROUND_DUMP
 kafka:
 streams:
 binder:
 application-id: playground
 brokers: 192.168.112.80:9092
 bindings:
 process-in-0:
 consumer:
 group: playground-0
 key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
 value-serde: hr.avrbanac.spring.playground.kafka.serde.OrderSerde
 materialized-as: user-store
 process-in-1:
 consumer:
 group: playground-1
 key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
 value-serde: hr.avrbanac.spring.playground.kafka.serde.UserSerde
 process-out-0:
 producer:
 key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
 value-serde: hr.avrbanac.spring.playground.kafka.serde.EnrichedOrderSerde
 process-out-1:
 producer:
 key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
 value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
answered Oct 7 at 13:17
Sign up to request clarification or add additional context in comments.

Comments

0

Because Consumer contract is just one argument which index is 0. To consume from different topics you indeed has to declare several Consumer beans but all of them are going to be only -in-0.

See more info in docs: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/functional-binding-names.html

answered Oct 7 at 14:53

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.