describe
springboot integration Kafka @RetryableTopic is not effective,I cannot control retry count; it is retrying continuously.There are two types of error logs continuously printed in the console: one is 'Unable to deliver to retry queue',and the other is 'Send exception not in transaction'.
Because I want to send transactional messages, I have configured the ProducerConfig.TRANSACTIONAL_ID_CONFIG in the ProducerFactory and KafkaTemplate, and I have also added the @Transactional annotation on the message consumer side.
version
JDK1.8 spring-boot-starter-parent:2.7.17 spring-kafka:2.9.13 broker:kafka_2.12-3.6.1
code
https://github.com/LimerenceAmumu/Kafka-study
consumer
// 消费者组1:处理订单
@RetryableTopic(
attempts = "4", // 总共尝试 4 次(1次原始 + 3次重试)
backoff = @Backoff(delay = 1000, multiplier = 2.0), // 1s, 2s, 4s 重试
dltTopicSuffix = ".dlt", // 死信队列 Topic 名:order-topic.dlt
include = {Exception.class}, // 重试所有异常类型,
dltStrategy = DltStrategy.FAIL_ON_ERROR // 处理失败时进入 DLQ
)
@KafkaListener(
topics = "order-topic-new",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeOrderMessage(ConsumerRecord<String, Order> record, @org.springframework.messaging.handler.annotation.Payload(required = false) Object payload, Acknowledgment acknowledgment) {
try {
Order order = record.value();
log.info("接收到订单消息: 订单ID={}, 主题={}, 分区={}, 偏移量={}",
order.getOrderId(),
record.topic(),
record.partition(),
record.offset());
// 处理订单
processOrder(order);
//加限定 抛异常
if (order.getCustomerName().equals("ex")) {
throw new RuntimeException("order process error");
}
acknowledgment.acknowledge();
log.info("order process ack: {}", order.getOrderId());
} catch (Exception e) {
log.error("order process error: {}", e.getMessage(), e);
throw e;
}
}
config
@RequiredArgsConstructor
@EnableScheduling
@Configuration
@EnableKafka
public class KafkaConfig extends RetryTopicConfigurationSupport {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(Exception.class)
.backOff(new FixedBackOff(5000, 2));
}
// // 创建订单主题
// @Bean
// public NewTopic orderTopic() {
// Map<String, String> configs = new HashMap<>();
// configs.put("retention.ms", "86400000"); // 保留1天
//
// return TopicBuilder.name("order-topic")
// .partitions(3) // 3个分区
// .replicas(1) // 1个副本
// .configs(configs)
// .build();
// }
// 创建订单主题
@Bean
public NewTopic orderTopicDLQ() {
Map<String, String> configs = new HashMap<>();
configs.put("retention.ms", "86400000"); // 保留1天
return TopicBuilder.name("order-topic-new.dlt")
.partitions(3) // 3个分区
.replicas(1) // 1个副本
.configs(configs)
.build();
}
// 创建订单确认主题
@Bean
public NewTopic orderConfirmationTopic() {
return TopicBuilder.name("order-confirmation-topic")
.partitions(2)
.replicas(1)
.build();
}
// 生产者配置
@Bean
public ProducerFactory<String, Order> orderProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-transaction"); // 事务ID配置
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Order> kafkaTemplate() {
return new KafkaTemplate<>(orderProducerFactory());
}
//
// @Bean("defaultRetryTopicKafkaTemplate")
// public KafkaTemplate<String, Order> defaultRetryTopicKafkaTemplate(ProducerFactory<String, Order> producerFactory) {
// KafkaTemplate<String, Order> template = new KafkaTemplate<>(producerFactory);
// // 👇 关键:允许非事务发送
// template.setAllowNonTransactional(true);
// return template;
// }
// @Bean
// public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate<String, Order> retryKafkaTemplate) {
// return new DeadLetterPublishingRecoverer(retryKafkaTemplate);
// }
// 消费者配置
@Bean
public ConsumerFactory<String, Order> orderConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.kafkaorderdemo.model");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(orderConsumerFactory());
factory.setBatchListener(false);
factory.setReplyTemplate(kafkaTemplate());
// 设置手动确认模式,使Acknowledgment可用
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
// 事务管理器配置
@Bean
public KafkaTransactionManager<String, Order> kafkaTransactionManager() {
KafkaTransactionManager<String, Order> transactionManager = new KafkaTransactionManager<>(orderProducerFactory());
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return transactionManager;
}
}
errorLogs:
2025年10月07日 12:01:09 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.s.k.r.DeadLetterPublishingRecovererFactory1ドル - Dead-letter publication to order-topic-new-retry-0failed for: order-topic-new-0@0
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:782)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:674)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:459)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:654)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:562)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:527)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:499)
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:237)
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:191)
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks5ドル(SeekUtils.java:107)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:104)
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:207)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:174)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2854)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2722)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2572)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2448)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2078)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1430)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1394)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1291)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
2025年10月07日 12:01:09 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.s.k.listener.DefaultErrorHandler - Failed to determine if this record (order-topic-new-0@0) should be recovererd, including in seeks
org.springframework.kafka.KafkaException: Dead-letter publication to order-topic-new-retry-0failed for: order-topic-new-0@0
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:683)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:666)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:562)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:527)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:499)
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:237)
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:191)
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks5ドル(SeekUtils.java:107)
-
You may want to try not manually acknowledging the commit and turning off manual mode, it can interfere with the Retryable annotation which handles all that for you.Sebastiaan van den Broek– Sebastiaan van den Broek2025年10月07日 06:28:30 +00:00Commented Oct 7 at 6:28