In event based architecture integration test is hard. Because of event based architecture is asynchronous, it is hard to test that events are sent and handled by consumers. Another problem is, need to run Kafka when application started for testing. In some mocking/embedded solutions are restrict the Kafka’s features so it is not flexible to write tests with these mocking/embedded solutions.


Testcontainers help us to solve above issues. It provides to run kafka inside the container when application is start in test mode.


We will use the same project which explained in detail at Spring Boot Kafka post. Please refer previous link for all configuration details.

Kafka configuration

Because we need to run kafka in test mode we need to start KafkaContainer in Then we will use this kafka container to create our producerProps and consumerProps which are using to create kafkaConsumerFactory and kafkaProducerFactory. Please check below code which is in IntegrationTestConfiguration.

public class IntegrationTestConfiguration {
	//Postgres configuration is deleted from here. For full configuration please refer #Configoration section.

	@Bean(initMethod = "start")
	public KafkaContainer kafka() {
		return new KafkaContainer();

	public Map<String, Object> producerProps(KafkaContainer kafkaContainer) {
		Map<String, Object> props = new ConcurrentHashMap<>();"Kafka hashCode {}", kafkaContainer.hashCode());
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.ACKS_CONFIG, "all");
		props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
		props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
		return props;

	public Map<String, Object> consumerProps(KafkaContainer kafkaContainer) {
		Map<String, Object> props = new ConcurrentHashMap<>();"Kafka hashCode {}", kafkaContainer.hashCode());
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
		return props;

	public KafkaConsumerFactory<String, String> kafkaConsumerFactory() {
		return new KafkaConsumerFactory<>(consumerProps(kafka()));

	public KafkaProducerFactory<String, String> kafkaProducerFactory() {
		return new KafkaProducerFactory<>(producerProps(kafka()));

Configure spring boot service

In this project we want to test product save and update. In ProductProducer we sent an ProductChange event, if product is in this event does not exist in product table, then we save it as a new product. If product is exist we update the product price. So we have two different test scenarios as saveProduct_ifProductChangeEventSent_andProductNotExist and updateProduct_ifProductChangeEventSent.

public class ProductKafkaIntegrationTest extends BaseIntegrationTest {

	ProductProducer productProducer;
	ProductService productService;

	public void updateProduct_ifProductChangeEventSent() throws JsonProcessingException {
		 save new product to product table which name is product1
		String productName = "product1";
		BigDecimal price = new BigDecimal("22.25");
		Product product = new PersistantProduct(null, productName, price);

		//Sent price change event
		BigDecimal newPrice = new BigDecimal("20.00");
		Product productChange = new ProductChange(productName, newPrice);

		//Product should be updated with new price
		Product updatedProductParam = new PersistantProduct(productName);
		Optional<Product> updatedProduct = retryUntil(
				() -> productService.getProduct(updatedProductParam),
				l -> l.get().price().equals(newPrice));
		Assert.assertEquals(productName, updatedProduct.get().name());

	public void saveProduct_ifProductChangeEventSent_andProductNotExist() throws JsonProcessingException {
		String productName = "product2";
		BigDecimal price = new BigDecimal("20.00");
		//Sent price change event
		Product productChange = new ProductChange(productName, price);

		//Check product is saved
		Product paramSavedProduct = new PersistantProduct(productName);
		Optional<Product> savedProduct = retryUntil(
				() -> productService.getProduct(paramSavedProduct),
		Assert.assertEquals(productName, savedProduct.get().name());
		Assert.assertEquals(price, savedProduct.get().price());

	private <T> T retryUntil(Callable<T> callable, Predicate<T> predicate) {
		return retryUntil(callable, predicate, Duration.ofSeconds(10L), Duration.ofMillis(100L));

	private <T> T retryUntil(Callable<T> callable, Predicate<T> predicate, Duration maxDuration, Duration checkInterval) {
		Instant start =;
		Instant endTime =;

		T result;
		do {
			result = TestUtil.callUnchecked(callable);
			if (predicate.test(result)) {

			try {
			} catch (InterruptedException e) {
		} while (;

		return result;

After sent the event messages to kafka, then retryUntil method check that provided predicate is matched in 10 seconds. If it is then tests are passed.

How to run the project

It is easy to run the project just with ./


You can find the all project on Github


Happy coding :)


A software engineer who loves open source.