Do you consider device screening as not enough alternative for holding the application’s trustworthiness and stability? Are you worried that by some means or someplace there is a likely bug hiding in the assumption that unit assessments should really go over all circumstances? And also is mocking Kafka not more than enough for venture necessities? If even one solution is ‘yes’, then welcome to a awesome and easy guide on how to set up Integration Checks for Kafka making use of TestContainers and Embedded Kafka for Spring!
What is TestContainers?
TestContainers is an open up-supply Java library specialised in furnishing all desired solutions for the integration and testing of external sources. It suggests that we are equipped to mimic an genuine database, internet server, or even an event bus environment and handle that as a responsible place to examination app operation. All these extravagant attributes are hooked into docker photographs, described as containers. Do we want to exam the database layer with actual MongoDB? No problems, we have a check container for that. We can not also ignore about UI assessments – Selenium Container will do everything that we really have to have.
In our circumstance, we will concentration on Kafka Testcontainer.
What is Embedded Kafka?
As the name indicates, we are likely to deal with an in-memory Kafka occasion, completely ready to be utilized as a usual broker with whole features. It will allow us to operate with producers and buyers, as normal, making our integration assessments lightweight.
In advance of we start out
The strategy for our examination is easy – I would like to test Kafka shopper and producer employing two distinct methods and check how we can make the most of them in genuine conditions.
Kafka Messages are serialized utilizing Avro schemas.
Embedded Kafka – Producer Exam
The notion is uncomplicated – let us create a straightforward task with the controller, which invokes a company approach to thrust a Kafka Avro serialized message.
Dependencies:
dependencies
implementation "org.apache.avro:avro:1.10.1"
implementation("io.confluent:kafka-avro-serializer:6.1.")
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'
implementation('org.springframework.cloud:spring-cloud-stream:3.1.1')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.1')
implementation('org.springframework.boot:spring-boot-starter-net:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-examination-support:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
Also worthy of mentioning amazing plugin for Avro. Right here plugins part:
plugins
id 'org.springframework.boot' version '2.6.8'
id 'io.spring.dependency-management' version '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" version "1.3."
Avro Plugin supports schema auto-creating. This is a ought to-have.
Website link to plugin: https://github.com/davidmc24/gradle-avro-plugin
Now let’s define the Avro schema:
"namespace": "com.grapeup.myawesome.myawesomeproducer",
"sort": "history",
"name": "RegisterRequest",
"fields": [
"name": "id", "type": "long",
"name": "address", "type": "string", "avro.java.string": "String"
]
Our ProducerService will be focused only on sending messages to Kafka making use of a template, practically nothing enjoyable about that aspect. Major features can be done just working with this line:
ListenableFuture> upcoming = this.kafkaTemplate.ship("register-request", kafkaMessage)
We just cannot fail to remember about test qualities:
spring:
major:
let-bean-definition-overriding: legitimate
kafka:
client:
group-id: team_id
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.popular.serialization.StringDeserializer
worth-deserializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroDeserializer
producer:
car.sign up.schemas: accurate
important-serializer: org.apache.kafka.prevalent.serialization.StringSerializer
worth-serializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroSerializer
houses:
unique.avro.reader: accurate
As we see in the pointed out test houses, we declare a customized deserializer/serializer for KafkaMessages. It is highly proposed to use Kafka with Avro – don’t allow JSONs retain item structure, let us use civilized mapper and object definition like Avro.
Serializer:
public course CustomKafkaAvroSerializer extends KafkaAvroSerializer
public CustomKafkaAvroSerializer()
tremendous()
tremendous.schemaRegistry = new MockSchemaRegistryClient()
general public CustomKafkaAvroSerializer(SchemaRegistryClient customer)
super(new MockSchemaRegistryClient())
community CustomKafkaAvroSerializer(SchemaRegistryClient customer, Map props)
tremendous(new MockSchemaRegistryClient(), props)
Deserializer:
community course CustomKafkaAvroSerializer extends KafkaAvroSerializer
community CustomKafkaAvroSerializer()
super()
tremendous.schemaRegistry = new MockSchemaRegistryClient()
public CustomKafkaAvroSerializer(SchemaRegistryClient client)
tremendous(new MockSchemaRegistryClient())
public CustomKafkaAvroSerializer(SchemaRegistryClient customer, Map props)
super(new MockSchemaRegistryClient(), props)
And we have everything to commence creating our exam.
@ExtendWith(SpringExtension.course)
@SpringBootTest
@AutoConfigureMockMvc
@TestInstance(TestInstance.Lifecycle.Per_Course)
@ActiveProfiles("test")
@EmbeddedKafka(partitions = 1, subject areas = "sign up-ask for")
course ProducerControllerTest {
All we need to do is add @EmbeddedKafka annotation with detailed matters and partitions. Application Context will boot Kafka Broker with presented configuration just like that. Hold in thoughts that @TestInstance should really be utilized with distinctive thought. Lifecycle.Per_Class will avoid producing the same objects/context for just about every examination method. Well worth checking if assessments are far too time-consuming.
Consumer consumerServiceTest
@BeforeEach
void Set up()
DefaultKafkaConsumerFactory buyer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()
consumerServiceTest = purchaser.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject_Identify))
Here we can declare the exam consumer, dependent on the Avro schema return style. All Kafka qualities are presently furnished in the .yml file. That client will be made use of as a test if the producer essentially pushed a concept.
Below is the genuine exam technique:
@Check
void whenValidInput_therReturns200() throws Exception
RegisterRequestDto ask for = RegisterRequestDto.builder()
.id(12)
.address("tempAddress")
.construct()
mockMvc.conduct(
write-up("/sign-up-request")
.contentType("software/json")
.content material(objectMapper.writeValueAsBytes(request)))
.andExpect(status().isOk())
ConsumerRecord consumedRegisterRequest = KafkaTestUtils.getSingleRecord(consumerServiceTest, Subject_Name)
RegisterRequest valueReceived = consumedRegisterRequest.benefit()
assertEquals(12, valueReceived.getId())
assertEquals("tempAddress", valueReceived.getAddress())
Initial of all, we use MockMvc to conduct an motion on our endpoint. That endpoint uses ProducerService to force messages to Kafka. KafkaConsumer is made use of to verify if the producer worked as expected. And that is it – we have a thoroughly working check with embedded Kafka.
Take a look at Containers – Purchaser Examination
TestContainers are absolutely nothing else like independent docker photos completely ready for currently being dockerized. The pursuing check circumstance will be improved by a MongoDB impression. Why not keep our details in the databases ideal soon after everything happened in Kafka flow?
Dependencies are not significantly various than in the former instance. The following steps are essential for check containers:
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'
ext
established('testcontainersVersion', "1.17.1")
dependencyManagement
imports
mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
Let’s focus now on the Customer part. The test scenario will be easy – a single shopper company will be responsible for getting the Kafka message and storing the parsed payload in the MongoDB collection. All that we require to know about KafkaListeners, for now, is that annotation:
@KafkaListener(matters = "sign up-ask for")
By the functionality of the annotation processor, KafkaListenerContainerFactory will be dependable to build a listener on our approach. From this instant our strategy will react to any future Kafka information with the described matter.
Avro serializer and deserializer configs are the very same as in the previous take a look at.
Regarding TestContainer, we need to start off with the subsequent annotations:
@SpringBootTest
@ActiveProfiles("exam")
@Testcontainers
general public course AbstractIntegrationTest {
Through startup, all configured TestContainers modules will be activated. It implies that we will get access to the complete functioning atmosphere of the picked supply. As illustration:
@Autowired
non-public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
@Container
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)
As a outcome of booting the take a look at, we can assume two docker containers to start out with the provided configuration.

What is actually crucial for the mongo container – it presents us total accessibility to the database applying just a straightforward connection uri. With these a attribute, we are ready to choose a glimpse what is the existing condition in our collections, even all through debug manner and prepared breakpoints.
Get a look also at the Ryuk container – it functions like overwatch and checks if our containers have started accurately.
And in this article is the very last aspect of the configuration:
@DynamicPropertySource
static void dataSourceProperties(DynamicPropertyRegistry registry)
registry.increase("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
registry.incorporate("spring.kafka.purchaser.bootstrap-servers", kafkaContainer::getBootstrapServers)
registry.include("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
registry.add("spring.facts.mongodb.uri", mongoDBContainer::getReplicaSetUrl)
static
kafkaContainer.start out()
mongoDBContainer.start()
mongoDBContainer.waitingFor(Hold out.forListeningPort()
.withStartupTimeout(Duration.ofSeconds(180L)))
@BeforeTestClass
community void beforeTest()
kafkaListenerEndpointRegistry.getListenerContainers().forEach(
messageListenerContainer ->
ContainerTestUtils
.waitForAssignment(messageListenerContainer, 1)
)
@AfterAll
static void tearDown()
kafkaContainer.end()
mongoDBContainer.halt()
DynamicPropertySource presents us the solution to established all needed ecosystem variables all through the test lifecycle. Strongly desired for any config uses for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for every listener to get expected partitions through container startup.
And the last element of the Kafka take a look at containers journey – the main physique of the examination:
@Check
general public void containerStartsAndPublicPortIsAvailable() throws Exception
writeToTopic("sign up-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct())
//Wait around for KafkaListener
TimeUnit.SECONDS.slumber(5)
Assertions.assertEquals(1, taxiRepository.findAll().size())
personal KafkaProducer createProducer()
return new KafkaProducer<>(kafkaProperties.buildProducerProperties())
non-public void writeToTopic(String topicName, RegisterRequest... registerRequests)
try (KafkaProducer producer = createProducer())
Arrays.stream(registerRequests)
.forEach(registerRequest ->
ProducerRecord history = new ProducerRecord<>(topicName, registerRequest)
producer.send(report)
)
The personalized producer is accountable for composing our concept to KafkaBroker. Also, it is advised to give some time for consumers to cope with messages effectively. As we see, the message was not just eaten by the listener, but also stored in the MongoDB collection.
Conclusions
As we can see, recent methods for integration assessments are pretty quick to employ and keep in tasks. There is no issue in preserving just device assessments and counting on all traces coated as a signal of code/logic quality. Now the concern is, should really we use an Embedded resolution or TestContainers? I suggest 1st of all concentrating on the phrase “Embedded”. As a excellent integration exam, we want to get an practically ideal duplicate of the manufacturing ecosystem with all qualities/attributes provided. In-memory answers are great, but mostly, not adequate for substantial small business tasks. Certainly, the gain of Embedded products and services is the effortless way to put into practice such tests and preserve configuration, just when something occurs in memory.
TestContainers at the to start with sight may possibly glance like overkill, but they give us the most important characteristic, which is a different atmosphere. We really don’t have to even depend on existing docker illustrations or photos – if we want we can use personalized ones. This is a enormous improvement for likely test situations.
What about Jenkins? There is no explanation to be fearful also to use TestContainers in Jenkins. I firmly advise examining TestContainers documentation on how conveniently we can established up the configuration for Jenkins brokers.
To sum up – if there is no blocker or any unwelcome situation for making use of TestContainers, then really do not wait. It is generally good to keep all providers managed and secured with integration exam contracts.