En esta ocasión me gustaría ampliar el conocimiento de un post anterior de mi compañero Raúl Martínez, sobre Testcontainers, entornos de testing efímeros, en el que nos presenta una bonita introducción a Testcontainers, lo que me permitirá ser más directo.

Un poco de trasfondo

Una vez hemos construido las pruebas unitarias de nuestro sistema, llega el momento de trabajar en el siguiente piso de la pirámide de los test, los test de integración.

A la hora de realizar test de integración necesitamos piezas y herramientas para observar cómo se comporta nuestro desarrollo en un entorno real. Dicho entorno ha de estar siempre disponible y las pruebas no han de interferir entre sí, lo que supone un reto. Dichos entornos pueden resultar en máquinas virtuales o físicas con determinados servicios en versiones y configuraciones concretas.

Para ello, y en el caso concreto de Kafka podemos contar con la herramienta EmbeddedKafka, que nos ayuda a crear test potentes con un Kafka autocontenido que nos independiza de un servicio externo.

Sin embargo, nos vamos a decantar por Testcontainers, principalmente por los siguientes motivos:

Escenario

Vamos a proponer un escenario con dos microservicios que interactúan con un Kafka. Uno de ellos, va a producir un mensaje en un topic de Kafka; y el otro, se va a suscribir y procesar el mismo.

Para mantener el escenario lo más sencillo posible, el mensaje va a ser una simple cadena de texto. De esta manera, nos centraremos en aprender a probar cada uno de los sistemas en un entorno real.

El mensaje va a ser una simple cadena de texto.

Objetivo

Crear test de integración para microservicios que trabajan con Kafka con las siguientes características:

En resumen, queremos combinar las ventajas que nos aporta trabajar con una herramienta como lo es jUnit y las que nos aporta trabajar con contenedores.

¿Qué necesitamos?

Vamos a continuar con la premisa de hacer las cosas lo más fácil posible. Para ello vamos a crear dos microservicios utilizando Spring Boot con Spring for Kafka y las extensiones de Testcontainers.

Añadiremos las dependencias de Testcontainers. De esta manera podemos generar contenedores con las herramientas que necesitamos en nuestros escenarios, en este caso Kafka.

Por otro lado, integramos el ciclo de vida de dichos contenedores a nuestras pruebas junto a jUnit para poder valernos de sus capacidades de validación.

Los microservicios de nuestro caso de estudio

1 Productor

Código productivo

EventProducer.java

@Component
@Slf4j
public class EventProducer {

  @Autowired
  private KafkaTemplate<Integer, String> kafkaTemplate;

  public void sendEvent(String event) {
    int key = (int)System.currentTimeMillis();
    try {
      kafkaTemplate.sendDefault(key, event).get();
    } catch (...) {
      …
    }
  }
}

application.yaml

spring:
  profiles:
    active: dev
---
spring:
  profiles: dev
  kafka:
    producer:
      bootstrap-servers: localhost:29092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    template:
      default-topic: example-topic

Código de pruebas

En el paquete de test (src/test) vamos a dar de alta nuestros casos de prueba, configuración de los contenedores y un consumidor de prueba.

TestBeansConfigurer.java

Aquí preparamos un consumidor que utilizaremos únicamente para los casos de prueba. De esta manera trabajamos de forma desacoplada, dicho consumidor va a hacer las veces de otro microservicio en nuestros test.

@Configuration
public class TestBeansConfigurer {

  @Autowired
  KafkaProperties properties;
  
  private static final String TOPIC_NAME= "example-topic";

  @Bean
  Consumer<String, String> testConsumer() {
    final Consumer<String, String> consumer =
        new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
            StringDeserializer::new, StringDeserializer::new).createConsumer();

    consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    return consumer;
  }
}

AbstractIntegrationTest.java

Donde vamos a configurar y lanzar los contenedores que necesitemos.

...
import org.junit.jupiter.api.TestInstance;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
...
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
...

@SpringBootTest
@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
@ActiveProfiles("test")
public abstract class AbstractIntegrationTest {

  static KafkaContainer kafkaContainer =
      new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));

  @DynamicPropertySource
  static void kafkaProperties(DynamicPropertyRegistry registry) {
    kafkaContainer.start();
    registry.add("spring.kafka.properties.bootstrap.servers", kafkaContainer::getBootstrapServers);
    registry.add("spring.kafka.consumer.properties.auto.offset.reset", () -> "earliest");

  }
}

EventProducerApplicationConsumerIt.java

Donde definimos un caso de prueba muy simple. En él vamos a hacer un POST a nuestro endpoint, que a su vez publicará la cadena de texto en Kafka. Finalmente, mediante el “test consumer” podemos verificar que aquel que escuche recibe el mensaje esperado.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class EventProducerApplicationProducerIt extends AbstractIntegrationTest {

  private String topicName = "example-topic";
  
  @Autowired
  TestRestTemplate restTemplate;
  
  @Autowired
  Consumer<String, String> testConsumer;
  
  @Test
  void produce() {
    // GIVEN
    String bodyContent = "body";
    HttpEntity<String> request = new HttpEntity<String>(bodyContent);

    // WHEN
    ResponseEntity<String> res = restTemplate.exchange("/generate", HttpMethod.POST, request, String.class);
    
    // THEN
    assertEquals(HttpStatus.CREATED, res.getStatusCode());
    ConsumerRecord<String, String> resultRecord = KafkaTestUtils.getSingleRecord(testConsumer, topicName, 10000);
    assertEquals(bodyContent, resultRecord.value());
  }
}

application.yaml

En la configuración de nuestros casos de prueba añadimos el consumidor que nos ayuda a verificar el código productivo.

spring:
  profiles:
    active: test
---
spring:
  profiles: test
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    template:
      default-topic: example-topic  
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: example-group

2 Consumidor

El consumidor espera un mensaje en el topic de Kafka para procesarlo.

Código productivo

EventConsumer.java

En el consumidor tenemos una clase “EventRecorderService”, lógica de negocio aplicada a cada uno de los eventos que recibimos. En este caso algo tan simple como escribirlo en el log.

@Component
@RequiredArgsConstructor
public class EventConsumer {
  
  private final EventRecorderService eventRecorderService;

  @KafkaListener(topics = {"example-topic"})
  public void onMessage(ConsumerRecord<Integer,String> record) {
    eventRecorderService.save(record.value());
  }
  
}

application.yaml

spring:
  profiles:
    active: dev
---  
spring:
  profiles: dev
  kafka:
    consumer:
      bootstrap-servers: localhost:29092
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: example-group
    template:
      default-topic: example-topic  

Código de pruebas

EventProducerApplicationConsumerIt.java

En el caso de prueba vamos a generar un producto, cuya implementación podemos clonar del caso anterior. Con él vamos a publicar un evento en el contenedor de Kafka.

El consumidor va a recibir el mensaje y va a delegar su procesado al servicio “EventRecorderService”. Podemos hacer un “Spy” sobre dicho servicio y verificar que lo estamos invocando con el mismo contenido que hemos publicado.

@SpringBootTest
class EventProducerApplicationConsumerIt extends AbstractIntegrationTest {
  
  @Autowired
  EventProducer eventProducer;
  
  @SpyBean
  EventRecorderService eventRecorderService;

  @Test
  void consume() throws InterruptedException {
    // GIVEN
    String bodyContent = "body";
    
    // WHEN
    eventProducer.sendEvent(bodyContent);

    // THEN
    await().atMost(FIVE_SECONDS).untilAsserted(() -> {
      verify(eventRecorderService, times(1)).save(bodyContent);
    });
    
  }
  
}

application.yaml

spring:
  profiles:
    active: test
---   
spring:
  profiles: test
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer 
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: example-group
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer 
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    template:
      default-topic: example-topic

Conclusiones

Hemos sido capaces de probar microservicios que trabajen con Kafka. Además, podemos realizar las mismas pruebas en cualquier implementación de Kafka (publish/subscriber, stream de datos…). Mediante contenedores de Kafka podemos probar cómo interactuamos o reaccionamos a los eventos que en él se publican.

El entorno creado para los casos de prueba se está realizando con la misma pieza de software y con la misma versión que en los entornos productivos, la misma ventaja que nos proporciona trabajar con contenedores. Algo que no podríamos lograr al 100% con una versión embebida.

Es posible replicar el mismo esquema de pruebas con otras herramientas que puedan trabajar como pub/sub de eventos, tales como AMQ, Google Pubsub, Redis… inicializando y configurando sus contenedores.

Anexo: Schema registry simplificado

Como sabemos que los mensajes que vamos a intercambiar, siempre van a ser estructuras complejas, normalmente cuando trabajamos con Kafka definimos un esquema que nuestros datos deben cumplir. Nuevamente, quiero mencionar el siguiente post de mi compañera Noelia Martín en el que se nos presenta Schema Registry y cómo gobernar eventos con Schema Registry y Avro.

Volviendo a nuestro tema principal, el de crear pruebas de integración, es posible configurar un Schema Registry mediante Testcontainers como si el de producción se tratase. No obstante, podemos simplemente utilizar un endpoint mock si no necesitamos mucho más.

Solo tenemos que configurar la propiedad “schema.registry.url” de la siguiente manera:

spring:
  kafka:
    ....
    client-id: ${Client_id}
    ....
    properties:
      ....
      schema.registry.url: mock://testUrl
      auto.register.schemas: true
      specific.avro.reader: true
      ....

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.