Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Kevded/integration-test-spring-kafka-with-embedded-kafka-consumer-and-producer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

History

37 Commits

Repository files navigation

Java CI

Integration Test Spring Kafka with Embedded Kafka Consumer and Producer

Sample project to show how to implement Integration Test in Spring Boot. With Spring Kafka and EmbeddedKafka.

Example with Spring Boot 2.2.6 (Spring Kafka 2.4.5)

Service

@Service
public class ConsumerService {
 Logger log = LoggerFactory.getLogger(ConsumerService.class);
 private ExampleRepository exampleRepository;
 ConsumerService(ExampleRepository exampleRepository) {
 this.exampleRepository = exampleRepository;
 }
 /**
 * Consume ExampleDTO on topic : TOPIC_EXAMPLE
 * Then save it in database.
 *
 * @param exampleDTO {@link ExampleDTO}
 */
 @KafkaListener(topics = "TOPIC_EXAMPLE", groupId = "consumer_example_dto")
 public void consumeExampleDTO(ExampleDTO exampleDTO) {
 log.info("Received from topic=TOPIC_EXAMPLE ExampleDTO={}", exampleDTO);
 exampleRepository.save(convertToExampleEntity(exampleDTO));
 log.info("saved in database {}", exampleDTO);
 }
 /**
 * In Java world you should use an Mapper, or an dedicated service to do this.
 */
 public ExampleEntity convertToExampleEntity(ExampleDTO exampleDTO) {
 ExampleEntity exampleEntity = new ExampleEntity();
 exampleEntity.setDescription(exampleDTO.getDescription());
 exampleEntity.setName(exampleDTO.getName());
 return exampleEntity;
 }
}
@Service
public class ProducerService {
 Logger log = LoggerFactory.getLogger(ProducerService.class);
 private String topic = "TOPIC_EXAMPLE_EXTERNE";
 private KafkaTemplate<String, ExampleDTO> kafkaTemplate;
 ProducerService(KafkaTemplate kafkaTemplate) {
 this.kafkaTemplate = kafkaTemplate;
 }
 /**
 * Send ExampleDTO to an external topic : TOPIC_EXAMPLE_EXTERNE.
 *
 * @param exampleDTO
 */
 public void send(ExampleDTO exampleDTO) {
 log.info("send to topic={} ExampleDTO={}", topic, exampleDTO);
 kafkaTemplate.send(topic, exampleDTO);
 }
}

Integration test

@ExtendWith(SpringExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = {"TOPIC_EXAMPLE", "TOPIC_EXAMPLE_EXTERNE"})
public class ConsumerServiceIntegrationTest {
 Logger log = LoggerFactory.getLogger(ConsumerServiceIntegrationTest.class);
 private static final String TOPIC_EXAMPLE = "TOPIC_EXAMPLE";
 @Autowired
 private EmbeddedKafkaBroker embeddedKafkaBroker;
 @Autowired
 private ExampleRepository exampleRepository;
 public ExampleDTO mockExampleDTO(String name, String description) {
 ExampleDTO exampleDTO = new ExampleDTO();
 exampleDTO.setDescription(description);
 exampleDTO.setName(name);
 return exampleDTO;
 }
 /**
 * We verify the output in the topic. But aslo in the database.
 */
 @Test
 public void itShould_ConsumeCorrectExampleDTO_from_TOPIC_EXAMPLE_and_should_saveCorrectExampleEntity() throws ExecutionException, InterruptedException {
 // GIVEN
 ExampleDTO exampleDTO = mockExampleDTO("Un nom 2", "Une description 2");
 // simulation consumer
 Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
 log.info("props {}", producerProps);
 Producer<String, ExampleDTO> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>());
 // Or
 // ProducerFactory producerFactory = new DefaultKafkaProducerFactory<String, ExampleDTO>(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>());
 // Producer<String, ExampleDTO> producerTest = producerFactory.createProducer();
 // Or
 // ProducerRecord<String, ExampleDTO> producerRecord = new ProducerRecord<String, ExampleDTO>(TOPIC_EXAMPLE, "key", exampleDTO);
 // KafkaTemplate<String, ExampleDTO> template = new KafkaTemplate<>(producerFactory);
 // template.setDefaultTopic(TOPIC_EXAMPLE);
 // template.send(producerRecord);
 // WHEN
 producerTest.send(new ProducerRecord(TOPIC_EXAMPLE, "", exampleDTO));
 // THEN
 // we must have 1 entity inserted
 // We cannot predict when the insertion into the database will occur. So we wait until the value is present. Thank to Awaitility.
 await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> {
 var exampleEntityList = exampleRepository.findAll();
 assertEquals(1, exampleEntityList.size());
 ExampleEntity firstEntity = exampleEntityList.get(0);
 assertEquals(exampleDTO.getDescription(), firstEntity.getDescription());
 assertEquals(exampleDTO.getName(), firstEntity.getName());
 });
 producerTest.close();
 }
}
@ExtendWith(SpringExtension.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = {"TOPIC_EXAMPLE", "TOPIC_EXAMPLE_EXTERNE"})
public class ProducerServiceIntegrationTest {
 private static final String TOPIC_EXAMPLE_EXTERNE = "TOPIC_EXAMPLE_EXTERNE";
 @Autowired
 private EmbeddedKafkaBroker embeddedKafkaBroker;
 @Autowired
 private ProducerService producerService;
 public ExampleDTO mockExampleDTO(String name, String description) {
 ExampleDTO exampleDTO = new ExampleDTO();
 exampleDTO.setDescription(description);
 exampleDTO.setName(name);
 return exampleDTO;
 }
 /**
 * We verify the output in the topic. With an simulated consumer.
 */
 @Test
 public void itShould_ProduceCorrectExampleDTO_to_TOPIC_EXAMPLE_EXTERNE() {
 // GIVEN
 ExampleDTO exampleDTO = mockExampleDTO("Un nom", "Une description");
 // simulation consumer
 Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group_consumer_test", "false", embeddedKafkaBroker);
 consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 ConsumerFactory cf = new DefaultKafkaConsumerFactory<String, ExampleDTO>(consumerProps, new StringDeserializer(), new JsonDeserializer<>(ExampleDTO.class, false));
 Consumer<String, ExampleDTO> consumerServiceTest = cf.createConsumer();
 embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumerServiceTest, TOPIC_EXAMPLE_EXTERNE);
 // WHEN
 producerService.send(exampleDTO);
 // THEN
 ConsumerRecord<String, ExampleDTO> consumerRecordOfExampleDTO = KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_EXAMPLE_EXTERNE);
 ExampleDTO valueReceived = consumerRecordOfExampleDTO.value();
 assertEquals("Une description", valueReceived.getDescription());
 assertEquals("Un nom", valueReceived.getName());
 consumerServiceTest.close();
 }
}

Official Example : github.com/spring-projects/spring-kafka

To write integration tests you can also have a look at :

Articles

About

Integration Test in spring boot with spring kafka Embedded Kafka Consumer Producer

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

Languages

AltStyle によって変換されたページ (->オリジナル) /