Sample project to show how to implement Integration Test in Spring Boot. With Spring Kafka and EmbeddedKafka.
Example with Spring Boot 2.2.6 (Spring Kafka 2.4.5)
@Service public class ConsumerService { Logger log = LoggerFactory.getLogger(ConsumerService.class); private ExampleRepository exampleRepository; ConsumerService(ExampleRepository exampleRepository) { this.exampleRepository = exampleRepository; } /** * Consume ExampleDTO on topic : TOPIC_EXAMPLE * Then save it in database. * * @param exampleDTO {@link ExampleDTO} */ @KafkaListener(topics = "TOPIC_EXAMPLE", groupId = "consumer_example_dto") public void consumeExampleDTO(ExampleDTO exampleDTO) { log.info("Received from topic=TOPIC_EXAMPLE ExampleDTO={}", exampleDTO); exampleRepository.save(convertToExampleEntity(exampleDTO)); log.info("saved in database {}", exampleDTO); } /** * In Java world you should use an Mapper, or an dedicated service to do this. */ public ExampleEntity convertToExampleEntity(ExampleDTO exampleDTO) { ExampleEntity exampleEntity = new ExampleEntity(); exampleEntity.setDescription(exampleDTO.getDescription()); exampleEntity.setName(exampleDTO.getName()); return exampleEntity; } }
@Service public class ProducerService { Logger log = LoggerFactory.getLogger(ProducerService.class); private String topic = "TOPIC_EXAMPLE_EXTERNE"; private KafkaTemplate<String, ExampleDTO> kafkaTemplate; ProducerService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } /** * Send ExampleDTO to an external topic : TOPIC_EXAMPLE_EXTERNE. * * @param exampleDTO */ public void send(ExampleDTO exampleDTO) { log.info("send to topic={} ExampleDTO={}", topic, exampleDTO); kafkaTemplate.send(topic, exampleDTO); } }
@ExtendWith(SpringExtension.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(topics = {"TOPIC_EXAMPLE", "TOPIC_EXAMPLE_EXTERNE"}) public class ConsumerServiceIntegrationTest { Logger log = LoggerFactory.getLogger(ConsumerServiceIntegrationTest.class); private static final String TOPIC_EXAMPLE = "TOPIC_EXAMPLE"; @Autowired private EmbeddedKafkaBroker embeddedKafkaBroker; @Autowired private ExampleRepository exampleRepository; public ExampleDTO mockExampleDTO(String name, String description) { ExampleDTO exampleDTO = new ExampleDTO(); exampleDTO.setDescription(description); exampleDTO.setName(name); return exampleDTO; } /** * We verify the output in the topic. But aslo in the database. */ @Test public void itShould_ConsumeCorrectExampleDTO_from_TOPIC_EXAMPLE_and_should_saveCorrectExampleEntity() throws ExecutionException, InterruptedException { // GIVEN ExampleDTO exampleDTO = mockExampleDTO("Un nom 2", "Une description 2"); // simulation consumer Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString()); log.info("props {}", producerProps); Producer<String, ExampleDTO> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>()); // Or // ProducerFactory producerFactory = new DefaultKafkaProducerFactory<String, ExampleDTO>(producerProps, new StringSerializer(), new JsonSerializer<ExampleDTO>()); // Producer<String, ExampleDTO> producerTest = producerFactory.createProducer(); // Or // ProducerRecord<String, ExampleDTO> producerRecord = new ProducerRecord<String, ExampleDTO>(TOPIC_EXAMPLE, "key", exampleDTO); // KafkaTemplate<String, ExampleDTO> template = new KafkaTemplate<>(producerFactory); // template.setDefaultTopic(TOPIC_EXAMPLE); // template.send(producerRecord); // WHEN producerTest.send(new ProducerRecord(TOPIC_EXAMPLE, "", exampleDTO)); // THEN // we must have 1 entity inserted // We cannot predict when the insertion into the database will occur. So we wait until the value is present. Thank to Awaitility. await().atMost(Durations.TEN_SECONDS).untilAsserted(() -> { var exampleEntityList = exampleRepository.findAll(); assertEquals(1, exampleEntityList.size()); ExampleEntity firstEntity = exampleEntityList.get(0); assertEquals(exampleDTO.getDescription(), firstEntity.getDescription()); assertEquals(exampleDTO.getName(), firstEntity.getName()); }); producerTest.close(); } }
@ExtendWith(SpringExtension.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(topics = {"TOPIC_EXAMPLE", "TOPIC_EXAMPLE_EXTERNE"}) public class ProducerServiceIntegrationTest { private static final String TOPIC_EXAMPLE_EXTERNE = "TOPIC_EXAMPLE_EXTERNE"; @Autowired private EmbeddedKafkaBroker embeddedKafkaBroker; @Autowired private ProducerService producerService; public ExampleDTO mockExampleDTO(String name, String description) { ExampleDTO exampleDTO = new ExampleDTO(); exampleDTO.setDescription(description); exampleDTO.setName(name); return exampleDTO; } /** * We verify the output in the topic. With an simulated consumer. */ @Test public void itShould_ProduceCorrectExampleDTO_to_TOPIC_EXAMPLE_EXTERNE() { // GIVEN ExampleDTO exampleDTO = mockExampleDTO("Un nom", "Une description"); // simulation consumer Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group_consumer_test", "false", embeddedKafkaBroker); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); ConsumerFactory cf = new DefaultKafkaConsumerFactory<String, ExampleDTO>(consumerProps, new StringDeserializer(), new JsonDeserializer<>(ExampleDTO.class, false)); Consumer<String, ExampleDTO> consumerServiceTest = cf.createConsumer(); embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumerServiceTest, TOPIC_EXAMPLE_EXTERNE); // WHEN producerService.send(exampleDTO); // THEN ConsumerRecord<String, ExampleDTO> consumerRecordOfExampleDTO = KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_EXAMPLE_EXTERNE); ExampleDTO valueReceived = consumerRecordOfExampleDTO.value(); assertEquals("Une description", valueReceived.getDescription()); assertEquals("Un nom", valueReceived.getName()); consumerServiceTest.close(); } }
Official Example : github.com/spring-projects/spring-kafka
To write integration tests you can also have a look at :