En este post explicamos cómo resolver el problema de devolver una respuesta síncrona en un sistema asíncrono cualquiera haciendo uso de Kafka, herramienta que, de por sí, tiene naturaleza asíncrona. Curioso, cuando menos.

Muchos, seguramente, ya conoceréis los principios básicos de Kafka y su potencia. Pero, si te pilla de nuevas o quieres reforzar lo que ya conoces, puedes revisar este artículo previo sobre microservicios con Apache Kafka.

Su característica principal es que se trata de un sistema de comunicación basado en el patrón publicador-suscriptor donde la comunicación se realiza de forma asíncrona. Esto implica que el productor puede enviar mensajes a Kafka quedando totalmente desacoplado del consumidor o consumidores. Los consumidores simplemente tienen que estar suscritos a un tópico de Kafka esperando que un evento o mensaje les llegue para procesarlo. Resulta un sistema muy potente para entornos que requieren ingesta de datos y un procesamiento posterior, que puede ser almacenaje, transformación, análisis, etc.

Planteemos el siguiente problema: tenemos un servicio para ofrecer a usuarios información sobre el índice de calidad del aire (ICA) en un determinado momento y ubicación. El usuario nos lanza la solicitud, nuestro servicio inicia una serie de peticiones a terceros, pasando estas por un servicio Kafka y… resulta que perdemos la sincronía y devolver una respuesta se torna inviable. Estas son las circunstancias en las que entrará en juego la solución que proponemos.

Patrón Petición-Respuesta

El patrón Petición-Respuesta busca resolver el sistema de comunicación más básico que existe: un emisor envía un mensaje, un receptor lo recibe y contesta al emisor. En nuestro caso, un sistema síncrono sobre asíncrono, tendremos un primer servicio que escribe en un tópico Kafka quedando a la espera de la llegada de una respuesta desde un segundo tópico. El servicio que genera la respuesta quedará escuchando al primer tópico y escribirá en el de respuesta. Para conseguirlo, lo que hace es enviar una cabecera denominada correlation_id que también deberá ser escrita, con el mismo valor, por el replicador, permitiendo al peticionario encontrar el mensaje y continuar la ejecución.

Implementación

Respuesta directa del consumidor

En el escenario propuesto, como se indica anteriormente, tenemos un servicio que solicita el índice de calidad del aire (ICA) a otro servicio, haciendo uso de un tópico en Kafka para comunicar ambos con la premisa de contestar con una respuesta síncrona.

Para montar el proyecto haremos uso de Spring Boot 2 y será necesario incluir la siguiente dependencia para acceder a las clases de Spring Kafka que vamos a utilizar:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.3.4.RELEASE</version>
</dependency>

El primer paso es montar la configuración de Kafka para el servicio que realiza la petición:


@Value("${kafka.group.id}")
private String groupId;

@Value("${kafka.reply.topic}")
private String replyTopic;

@Bean
public ReplyingKafkaTemplate<String, Ciudad, IndiceCalidadAire> replyingKafkaTemplate(ProducerFactory<String, Ciudad> pf,
       ConcurrentKafkaListenerContainerFactory<String, IndiceCalidadAire> factory) {
   ConcurrentMessageListenerContainer<String, IndiceCalidadAire> replyContainer = factory.createContainer(replyTopic);
   replyContainer.getContainerProperties().setMissingTopicsFatal(false);
   replyContainer.getContainerProperties().setGroupId(groupId);
   ReplyingKafkaTemplate<String, Ciudad, IndiceCalidadAire> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);

   return replyingKafkaTemplate;
}

Con la configuración montada, escribimos el código del solicitante.

@Value("${kafka.request.topic}")
private String requestTopic;

private final ReplyingKafkaTemplate<String, Ciudad, IndiceCalidadAire> replyingKafkaTemplate;

public IndiceCalidadAire getCalidadDelAireEnCiudad(Ciudad ciudad) throws InterruptedException, ExecutionException {

  ProducerRecord<String, Ciudad> record = new ProducerRecord<>(requestTopic, null, ciudad.getCodigo(), ciudad);
  RequestReplyFuture<String, Ciudad, IndiceCalidadAire> future = replyingKafkaTemplate.sendAndReceive(record);
  ConsumerRecord<String, IndiceCalidadAire> response = future.get();
  return response.value();
}

Ahora toca configurar el servicio que realiza la réplica del mensaje.

@Bean
public KafkaTemplate<String, IndiceCalidadAire> replyTemplate 
(ProducerFactory<String, IndiceCalidadAire> pf,
       ConcurrentKafkaListenerContainerFactory<String, IndiceCalidadAire> factory) {
   KafkaTemplate<String, IndiceCalidadAire> kafkaTemplate = new KafkaTemplate<>(pf);
   factory.getContainerProperties().setMissingTopicsFatal(false);
   factory.setReplyTemplate(kafkaTemplate);
   return kafkaTemplate;
}

Y, por último, el segundo servicio. Al que añadiremos la anotación @SendTo para que Spring haga su magia generando la respuesta y la configuración de consumidor de Kafka habitual.

@Value("${kafka.reply.topic}")
 private String replyTopic;

private final KafkaTemplate<String, IndiceCalidadAire> kafkaDecoupledProducer;

 @KafkaListener(topics = "${kafka.request.topic}"
       , groupId = "${kafka.group.id}")
 @SendTo
 public IndiceCalidadAire handle(@Payload Ciudad ciudad) {

     int puntuacion = ThreadLocalRandom.current().nextInt(0, 90);
LocalDateTime fecha = LocalDateTime.now();

return IndiceCalidadAire.builder()
     .codigo(ciudad.getCodigo())
     .puntuacion(puntuacion)
     .fecha(fecha)
     .build();
 }

Respuesta calculando correlation ID

Pongamos que nuestro segundo servicio necesita lanzar una petición a un servicio de un tercero, que este es asíncrono y recuperamos su respuesta en otro flujo distinto. Este escenario implica que no podemos consumir de Kafka y devolver la petición automáticamente. En este punto, nos vemos obligados a entender el funcionamiento interno y adaptarlo a nuestras necesidades.

La forma en que se consigue relacionar el mensaje de respuesta desde el segundo tópico es mediante el uso de una cabecera llamada correlation_id. Esta cabecera, haciendo uso del método sendAndReceive, se genera automáticamente y se debe enviar en la petición inicial y ser devuelta en la respuesta con el mismo valor. Si tomamos el control sobre la forma en que se genera este identificador, sobreescribiendo el comportamiento por defecto, tendremos la solución final en la mano.

Por suerte, estos cambios son algo que está contemplado dentro de la librería y sería tan sencillo como aplicar las siguientes modificaciones:

public IndiceCalidadAire getCalidadDelAireEnCiudad(Ciudad ciudad) throws InterruptedException, ExecutionException {

  ProducerRecord<String, Ciudad> record = new ProducerRecord<>(requestTopic, null, ciudad.getCodigo(), ciudad);

  String pkCiudad = ciudad.getCodigo();
  // Cambiar la estrategia de generación de correlationId, estableciendo el valor a nuestro antojo
  replyingKafkaTemplate.setCorrelationIdStrategy(a -> new CorrelationKey(pkCiudad.getBytes()));

  RequestReplyFuture<String, Ciudad, IndiceCalidadAire> future = replyingKafkaTemplate.sendAndReceive(record);
  ConsumerRecord<String, IndiceCalidadAire> response = future.get();
  return response.value();
}

El replicador se ve obligado a enviar la petición recibida al servicio del tercero:

private final MiClienteAsincrono clienteAsincrono;

@KafkaListener(topics = "${kafka.request.topic}", groupId = "${kafka.group.id}")
public void handle(@Payload Ciudad ciudad) {

   clienteAsincrono.send(ciudad);

}

En el servicio de respuesta debemos ser capaces de generar el mismo valor, por tanto, es importante que la respuesta enviada por el servicio de terceros también devuelva estos campos.

@Value("${kafka.reply.topic}")
private String replyTopic;

private final KafkaTemplate<String, IndiceCalidadAire> kafkaDecoupledProducer;

/**
* Este método hará las funciones de recibir los datos del servicio de terceros.
*/
private void receive(IndiceCalidadAire indiceCalidadAire) {
   String correlationId = indiceCalidadAire.getCodigo();

// Creando un ProducerRecord, podemos establecer el valor de la cabecera CORRELATION_ID

   ProducerRecord<String, IndiceCalidadAire> record = new ProducerRecord<>(replyTopic, null, indiceCalidadAire.getCodigo(), indiceCalidadAire);

   record.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, correlationId.getBytes()));

   kafkaDecoupledProducer.send(record);

}

Con estos últimos pasos ya habremos conseguido completar la comunicación y enviar una respuesta síncrona a nuestro solicitante.

Conclusiones

Como hemos podido ver, Spring y Kafka nos proporcionan un mecanismo bastante sencillo de implementar para poner solución a una tarea que, de tener que desarrollar nosotros de forma manual, nos resultaría bastante compleja de concebir y llevar a cabo.

Será importante saber y poder negociar las interfaces con las terceras partes involucradas para poder tener trazabilidad de la petición en todo momento, sin cruzar respuestas. También se deberá tener en cuenta, en caso de escalar el servicio, que la respuesta siempre vuelva a la instancia que originó la solicitud. Ante este último escenario, se sugiere considerar el uso de tópicos separados o distintas particiones para cada instancia del servicio que realiza la solicitud.

¡Manos a la obra!

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.