I'm trying to configure a single consumer function to handle messages from two different Kafka topics using Spring Cloud Stream. The application only consumes from the first topic (consumeMessage-in-0) but completely ignores the second topic (consumeMessage-in-1). Both topics carry the same data structure, but I need to use different consumer groups for each topic due to business requirements.
application.yml
spring:
cloud:
function:
definition: consumeMessage
stream:
binders:
local-bkr:
type: kafka
environment:
spring.cloud.stream:
kafka.binder:
brokers: localhost:9092
autoCreateTopics: false
consumerProperties:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
bindings:
consumeMessage-in-0:
destination: topic-1
group: group-1
binder: local-bkr
consumer:
concurrency: 3
consumeMessage-in-1:
destination: topic-2
group: group-2
binder: local-bkr
consumer:
concurrency: 3
Consumer Function
@Configuration
public class MessageConsumer {
private static final Logger log = LoggerFactory.getLogger(MessageConsumer.class);
@Bean
Consumer<Message<String>> consumeMessage() {
return stringMessage -> {
var receivedTopic = stringMessage.getHeaders().get("kafka_receivedTopic");
log.info("Received from topic {}: {}", receivedTopic, stringMessage);
};
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.5.5</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>21</java.version>
<spring-cloud.version>202500</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
Problem
- The application starts successfully and consumes messages from
topic-1
(the first binding), but it completely ignores messages fromtopic-2
(the second binding).
I can see in the logs that:
- Messages from
topic-1
are being processed - No messages from
topic-2
are being processed (even though I can confirm messages are being published to that topic) - Consumer config for consumeMessage-in-1 is not considered during startup - only the first binding appears to be registered
- No errors are thrown during startup or runtime
What I Know Works (But Want to Avoid)
I'm aware that creating separate consumer function definitions works:
spring:
cloud:
function:
definition: consumeMessage1;consumeMessage2
@Bean
Consumer<Message<String>> consumeMessage1() { /* same logic */ }
@Bean
Consumer<Message<String>> consumeMessage2() { /* same logic */ }
However, this approach would require a code change every time a new topic needs to be added, which is not ideal for our use case where topics might be added dynamically through configuration only.
Questions
- Why is Spring Cloud Stream only binding to the first input (consumeMessage-in-0) and ignoring the second (consumeMessage-in-1)?
- Is it possible to have multiple input bindings for a single Consumer function? Or does Spring Cloud Stream limitation require separate consumer functions for each topic?
- What's the correct configuration pattern for consuming from multiple topics with the same processing logic but different consumer groups?
Environment
- Spring Boot 3.5.5
- Spring Cloud 202500
- Java 21
- Kafka binder
Is this a known limitation of Spring Cloud Stream, or am I missing something in my configuration?
2 Answers 2
I have a small example that can maybe help you. I have stream of orders as input and also users ktable. I am enriching orders with users, and then i have 2 outputs
@Configuration
public class BindingKafkaConfiguration {
@Bean
public BiFunction<KStream<String, Order>, KTable<String, User>, KStream<?, ?>[]> process() {
return (KStream<String, Order> orders, KTable<String, User> users) -> {
KStream<String, EnrichedOrder> enrichedOrders = orders
.leftJoin(users, (Order order, User user) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setOrderId(order.getOrderId());
enrichedOrder.setAmount(order.getAmount());
enrichedOrder.setStatus(order.getStatus());
if (user != null) {
enrichedOrder.setUserId(user.getUserId());
enrichedOrder.setName(user.getName());
return enrichedOrder;
}
return enrichedOrder;
});
KStream<String, String> dump = enrichedOrders.mapValues(EnrichedOrder::toString);
return new KStream<?, ?>[] {enrichedOrders, dump};
};
}
}
spring:
cloud:
stream:
bindings:
process-in-0:
destination: PLAYGROUND_ORDER
process-in-1:
destination: PLAYGROUND_USER
process-out-0:
destination: PLAYGROUND_ENRICHED_ORDER
process-out-1:
destination: PLAYGROUND_DUMP
kafka:
streams:
binder:
application-id: playground
brokers: 192.168.112.80:9092
bindings:
process-in-0:
consumer:
group: playground-0
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: hr.avrbanac.spring.playground.kafka.serde.OrderSerde
materialized-as: user-store
process-in-1:
consumer:
group: playground-1
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: hr.avrbanac.spring.playground.kafka.serde.UserSerde
process-out-0:
producer:
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: hr.avrbanac.spring.playground.kafka.serde.EnrichedOrderSerde
process-out-1:
producer:
key-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value-serde: org.apache.kafka.common.serialization.Serdes$StringSerde
Comments
Because Consumer
contract is just one argument which index is 0
.
To consume from different topics you indeed has to declare several Consumer
beans but all of them are going to be only -in-0
.
See more info in docs: https://docs.spring.io/spring-cloud-stream/reference/spring-cloud-stream/functional-binding-names.html
Comments
Explore related questions
See similar questions with these tags.