How to Set Up Kafka Integration Test – Grape Up

Maria J. Smith

Do you consider device screening as not enough alternative for holding the application’s trustworthiness and stability? Are you worried that by some means or someplace there is a likely bug hiding in the assumption that unit assessments should really go over all circumstances? And also is mocking Kafka not more than enough for venture necessities? If even one solution is  ‘yes’, then welcome to a awesome and easy guide on how to set up Integration Checks for Kafka making use of TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open up-supply Java library specialised in furnishing all desired solutions for the integration and testing of external sources. It suggests that we are equipped to mimic an genuine database, internet server, or even an event bus environment and handle that as a responsible place to examination app operation. All these extravagant attributes are hooked into docker photographs, described as containers. Do we want to exam the database layer with actual MongoDB? No problems, we have a check container for that. We can not also ignore about UI assessments – Selenium Container will do everything that we really have to have.
In our circumstance, we will concentration on Kafka Testcontainer.

What is Embedded Kafka?

As the name indicates, we are likely to deal with an in-memory Kafka occasion, completely ready to be utilized as a usual broker with whole features. It will allow us to operate with producers and buyers, as normal, making our integration assessments lightweight. 

In advance of we start out

The strategy for our examination is easy – I would like to test Kafka shopper and producer employing two distinct methods and check how we can make the most of them in genuine conditions. 

Kafka Messages are serialized utilizing Avro schemas.

Embedded Kafka – Producer Exam

The notion is uncomplicated – let us create a straightforward task with the controller, which invokes a company approach to thrust a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also worthy of mentioning amazing plugin for Avro. Right here plugins part:

id 'org.springframework.boot' version '2.6.8'
id 'io.spring.dependency-management' version '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3."

Avro Plugin supports schema auto-creating. This is a ought to-have.

Website link to plugin:

Now let’s define the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "sort": "history",
  "name": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be focused only on sending messages to Kafka making use of a template, practically nothing enjoyable about that aspect. Major features can be done just working with this line:

ListenableFuture> upcoming = this.kafkaTemplate.ship("register-request", kafkaMessage)

We just cannot fail to remember about test qualities:

    let-bean-definition-overriding: legitimate
      group-id: team_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.popular.serialization.StringDeserializer
      worth-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
      car.sign up.schemas: accurate
      important-serializer: org.apache.kafka.prevalent.serialization.StringSerializer
      worth-serializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroSerializer
      unique.avro.reader: accurate

As we see in the pointed out test houses, we declare a customized deserializer/serializer for KafkaMessages. It is highly proposed to use Kafka with Avro – don’t allow JSONs retain item structure, let us use civilized mapper and object definition like Avro.


public course CustomKafkaAvroSerializer extends KafkaAvroSerializer
    public CustomKafkaAvroSerializer()
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    general public CustomKafkaAvroSerializer(SchemaRegistryClient customer)
        super(new MockSchemaRegistryClient())

    community CustomKafkaAvroSerializer(SchemaRegistryClient customer, Map props)
        tremendous(new MockSchemaRegistryClient(), props)


community course CustomKafkaAvroSerializer extends KafkaAvroSerializer
    community CustomKafkaAvroSerializer()
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient client)
        tremendous(new MockSchemaRegistryClient())

    public CustomKafkaAvroSerializer(SchemaRegistryClient customer, Map props)
        super(new MockSchemaRegistryClient(), props)

And we have everything to commence creating our exam.

@EmbeddedKafka(partitions = 1, subject areas = "sign up-ask for")
course ProducerControllerTest {

All we need to do is add @EmbeddedKafka annotation with detailed matters and partitions. Application Context will boot Kafka Broker with presented configuration just like that. Hold in thoughts that @TestInstance should really be utilized with distinctive thought. Lifecycle.Per_Class will avoid producing the same objects/context for just about every examination method. Well worth checking if assessments are far too time-consuming.

Consumer consumerServiceTest
void Set up()
DefaultKafkaConsumerFactory buyer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = purchaser.createConsumer()

Here we can declare the exam consumer, dependent on the Avro schema return style. All Kafka qualities are presently furnished in the .yml file. That client will be made use of as a test if the producer essentially pushed a concept.

Below is the genuine exam technique:

void whenValidInput_therReturns200() throws Exception
        RegisterRequestDto ask for = RegisterRequestDto.builder()

                      .content material(objectMapper.writeValueAsBytes(request)))

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Subject_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.benefit()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

Initial of all, we use MockMvc to conduct an motion on our endpoint. That endpoint uses ProducerService to force messages to Kafka. KafkaConsumer is made use of to verify if the producer worked as expected. And that is it – we have a thoroughly working check with embedded Kafka.

Take a look at Containers – Purchaser Examination

TestContainers are absolutely nothing else like independent docker photos completely ready for currently being dockerized. The pursuing check circumstance will be improved by a MongoDB impression. Why not keep our details in the databases ideal soon after everything happened in Kafka flow?

Dependencies are not significantly various than in the former instance. The following steps are essential for check containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

established('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let’s focus now on the Customer part. The test scenario will be easy – a single shopper company will be responsible for getting the Kafka message and storing the parsed payload in the MongoDB collection. All that we require to know about KafkaListeners, for now, is that annotation:

@KafkaListener(matters = "sign up-ask for")

By the functionality of the annotation processor, KafkaListenerContainerFactory will be dependable to build a listener on our approach. From this instant our strategy will react to any future Kafka information with the described matter.

Avro serializer and deserializer configs are the very same as in the previous take a look at.

Regarding TestContainer, we need to start off with the subsequent annotations:

general public course AbstractIntegrationTest {

Through startup, all configured TestContainers modules will be activated. It implies that we will get access to the complete functioning atmosphere of the picked supply. As illustration:

non-public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a outcome of booting the take a look at, we can assume two docker containers to start out with the provided configuration.

What is actually crucial for the mongo container – it presents us total accessibility to the database applying just a straightforward connection uri. With these a attribute, we are ready to choose a glimpse what is the existing condition in our collections, even all through debug manner and prepared breakpoints.
Get a look also at the Ryuk container – it functions like overwatch and checks if our containers have started accurately.

And in this article is the very last aspect of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry)
   registry.increase("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.incorporate("spring.kafka.purchaser.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.include("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.facts.mongodb.uri", mongoDBContainer::getReplicaSetUrl)

   kafkaContainer.start out()

   mongoDBContainer.waitingFor(Hold out.forListeningPort()

community void beforeTest()

           messageListenerContainer ->
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown()

DynamicPropertySource presents us the solution to established all needed ecosystem variables all through the test lifecycle. Strongly desired for any config uses for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for every listener to get expected partitions through container startup.

And the last element of the Kafka take a look at containers journey – the main physique of the examination:

general public void containerStartsAndPublicPortIsAvailable() throws Exception
   writeToTopic("sign up-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct())

   //Wait around for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().size())

personal KafkaProducer createProducer()
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

non-public void writeToTopic(String topicName, RegisterRequest... registerRequests)

   try (KafkaProducer producer = createProducer())
               .forEach(registerRequest ->
                           ProducerRecord history = new ProducerRecord<>(topicName, registerRequest)


The personalized producer is accountable for composing our concept to KafkaBroker. Also, it is advised to give some time for consumers to cope with messages effectively. As we see, the message was not just eaten by the listener, but also stored in the MongoDB collection.


As we can see, recent methods for integration assessments are pretty quick to employ and keep in tasks. There is no issue in preserving just device assessments and counting on all traces coated as a signal of code/logic quality. Now the concern is, should really we use an Embedded resolution or TestContainers? I suggest 1st of all concentrating on the phrase “Embedded”. As a excellent integration exam, we want to get an practically ideal duplicate of the manufacturing ecosystem with all qualities/attributes provided. In-memory answers are great, but mostly, not adequate for substantial small business tasks. Certainly, the gain of Embedded products and services is the effortless way to put into practice such tests and preserve configuration, just when something occurs in memory.
TestContainers at the to start with sight may possibly glance like overkill, but they give us the most important characteristic, which is a different atmosphere. We really don’t have to even depend on existing docker illustrations or photos – if we want we can use personalized ones. This is a enormous improvement for likely test situations.
What about Jenkins? There is no explanation to be fearful also to use TestContainers in Jenkins. I firmly advise examining TestContainers documentation on how conveniently we can established up the configuration for Jenkins brokers.
To sum up – if there is no blocker or any unwelcome situation for making use of TestContainers, then really do not wait. It is generally good to keep all providers managed and secured with integration exam contracts.

Next Post

Get the Form Backend That Will Make Your Life As a Developer a Lot Easier

There is no much better way to establish successful organization-making sorts than with a kind backend. With it, you can established up a dashboard, configure spam protection, and even use Vehicle responders. You can even mail submissions to a distinct e mail address, with out the want to share your […]