En los últimos años el número de aplicaciones a desarrollar por las empresas ha aumentado considerablemente con la llegada de las arquitecturas basadas en microservicios.

Uno de los aspectos más relevantes es la comunicación entre ellos, o la necesidad de tener que integrarse con otros sistemas enviando o recibiendo información.

En estos casos, estas comunicaciones deberán ser rápidas, seguras y fiables con una alta disponibilidad.

Una de las soluciones para solventar este tipo de casos han supuesto el uso de tecnologías basadas en colas de mensajes, las cuales permiten la comunicación asíncrona, lo que significa que los puntos de conexión que producen y consumen los mensajes interactúan con la cola, no entre sí.

Además, ayudan a simplificar de forma significativa la escritura de código para aplicaciones desacopladas, mejorando el rendimiento, la fiabilidad y la escalabilidad.

A la hora de utilizar un sistema de colas de mensajes, se ha hecho muy popular el uso de ActiveMQ y RabbitMQ. Sin embargo, a la hora de enfrentarnos a sistemas que requieren la transmisión de datos a tiempo real encontramos Apache Kafka como una de nuestras mejores soluciones.

¿Qué es Apache Kafka?

Apache Kafka es un sistema de transmisión de datos distribuido con capacidad de escalado horizontal y tolerante a fallos. Gracias a su alto rendimiento nos permite transmitir datos en tiempo real utilizando el patrón de mensajería publish/subscribe. En este podcast profundizamos más en el universo de Apache Kafka.

Kafka fue creado por LinkedIn y actualmente es un proyecto open source mantenido por Confluent, empresa que está bajo la administración de Apache. Sus principales funcionalidades son:

Desde Apache recomiendan el uso de Kafka generalmente en dos tipos de aplicaciones:

Estructura

Topics, mensajes y particiones

Un topic es un flujo de datos sobre un tema en particular. Podemos crear tantos topics como queramos y estos serán identificados por su nombre. Los topics pueden dividirse en particiones en el momento de su creación.

Cada elemento que se almacena en un topic se denomina mensaje. Los mensajes son inmutables y son añadidos a una partición determinada (específica definida por la clave del mensaje o mediante round-robin en el caso de ser nula) en el orden el que fueron enviados, es decir, se garantiza el orden dentro de una partición pero no entre ellas.

Cada mensaje dentro de una partición tiene un identificador numérico incremental llamado offset. Aunque los mensajes se guarden en los topics por un tiempo limitado (una semana por defecto) y sean eliminados, el offset seguirá incrementando su valor.

Cada mensaje dentro de una partición tiene un identificador numérico incremental llamado offset.

Brokers y Topics

Un clúster de Kafka consiste en uno o más servidores denominados Kafka brokers. Cada broker es identificado por un ID (integer) y contiene ciertas particiones de un topic, no necesariamente todas.

Además, permite replicar y particionar dichos topics balanceando la carga de almacenamiento entre los brokers. Esta característica permite que Kafka sea tolerante a fallos y escalable.

Particiones por topic en tres brokers.

Zookeeper

Se trata de un servicio centralizado imprescindible para el funcionamiento de Kafka, al cual envía notificaciones en caso de cambios como: creación de un nuevo topic, caída de un broker, levantamiento de un broker, borrado de topics, etc.

Su labor principal es gestionar los brokers de Kafka, manteniendo una listado con sus respectivos metadatos y facilitar mecanismos para health checking. Además, ayuda en la selección del broker líder para las distintas particiones de los topics.

Topic replication

Los topics deberán tener un factor de replicación > 1 (normalmente 2 y 3), de esta forma si un broker se cae, otro broker puede servir los datos.

En cada momento sólo puede haber un broker líder para cada partición de un topic. Sólo el líder puede recibir y servir datos de una partición, mientras tanto los otros brokers sincronizarán sus datos. Si este se cae, se cambia el líder.

En cada momento sólo puede haber un broker líder para cada partición de un topic.

API

Una vez comentada la estructura de Apache Kafka, vamos a ver cómo se interactúa con él mediante cuatro API’s:

API de Apache Kafka. Imagen recuperada de la documentación de Kafka.
API de Apache Kafka. Imagen recuperada de la documentación de Kafka.

Producers

Permite que una aplicación pueda publicar mensajes de un topic de Kafka de forma asíncrona. Los productores automáticamente saben a qué broker y a qué partición deben escribir.

En el caso de que un broker se caiga, el productor sabe cómo recuperarse y seguirá escribiendo en el resto. Los productores envían los mensajes con clave (string, número, etc) o sin ella.

Si la clave es nula se enviarán en round robin entre los brokers. Si no es nula, todos los mensajes con esa clave se enviarán siempre a la misma partición.

Además, para confirmar que los mensajes han sido correctamente escritos en Kafka se podrá configurar la recepción de un ack, ya sea por la recepción del mensaje por parte broker líder o por todos los brokers réplica.

Consumers

Con su uso podemos suscribirnos a un topic de Kafka y consumir sus mensajes para poder tratarlos en nuestra aplicación. Podemos crear un consumidor o un grupo de consumidores.

La diferencia entre ellos es que el grupo de consumidores permite el consumo de mensaje de forma paralela, es decir, si un nodo de ese grupo consume un mensaje el resto no lo hará.

Esto es útil a la hora de tener más de una instancia de un microservicio corriendo en nuestro sistema. Cada consumidor del grupo de consumidores leerá de una partición exclusiva.

Si hay más consumidores que particiones, algunos de los consumidores estarán inactivos, para solucionar esto es recomendable tener el mismo número de particiones que de consumidores dentro de un grupo.

En el caso de que un broker de los que está leyendo se caiga, los consumidores saben cómo recuperarse. Los datos son leídos en orden dentro de cada partición pero no entre ellas. Kafka almacena los offsets de los grupos de consumidores cuando estos leen los datos.

Los offsets son almacenados en un topic de Kafka denominado “consumer_offsets”. Cuando un consumidor de un grupo lee datos de Kafka, se actualiza el offset. Si un consumidor se cae, cuando vuelva a ser levantado seguirá leyendo datos desde donde se quedó anteriormente.

Stream Processors

Se trata de una librería para crear aplicaciones que nos permite consumir un stream de datos de un topic para poder realizar modificaciones sobre los mensajes y escribir en otro topic actuando como productor, es decir, la entrada y la salida de datos son almacenados en el cluster de Kafka.

Combina la simplicidad del desarrollo de aplicaciones en lenguaje Java o Scala con los beneficios de la integración con el cluster de Kafka. Entre sus características destacan su alta capacidad de procesamiento de mensajes por segundo, su escalabilidad y una alta tolerancia a fallos.

Connectors

Se tratan de componentes listos para usar que nos permiten simplificar la integración entre sistemas externos y el cluster de Kafka. Podemos crear y ejecutar productores o consumidores reutilizables que conectan los topics de Kafka a las aplicaciones o sistemas externos, como por ejemplo una base datos.

Además, algunos permiten realizar modificaciones simples sobre los mensajes que irán a los topics de Kafka. Se configuran mediante ficheros properties o a través de su API REST y entre sus características destacan ser distribuidos y escalables.

Existen muchos conectores para distintos sistemas, en este link podéis encontrar más información.

Caso práctico

Vamos a crear un caso práctico para mostrar el funcionamiento de Apache Kafka en una arquitectura de microservicios. En este ejemplo crearemos un microservicio que utilizará un cliente de Twitter para obtener tweets de un tema determinado.

Estos tweets se enviarán a un topic de Apache Kafka y serán consumidos por otro microservicio que se encargará de almacenarlos en una base de datos Elasticsearch.

De esta forma podremos comprobar cómo funciona un productor y un consumidor en microservicios Java bajo el framework de Spring.

Gestión de Apache Kafka

Antes de iniciar las tareas de desarrollo, tenemos que preparar Kafka para comenzar nuestro caso práctico. Lo primero será crear el topic en Kafka sobre el cual guardaremos nuestros tweets, así como establecer su configuración.

Para ello, utilizaremos la CLI (en mi caso utilizo la terminal de Ubuntu) para acceder al directorio donde tenemos instalado Apache Kafka y arrancaremos nuestro Zookeeper:

./bin/zookeeper-server-start.sh config/zookeeper.properties

Cuando Zookeeper haya arrancado, podremos arrancar Apache Kafka ejecutando el comando:

./bin/kafka-server-start.sh config/server.properties

Una vez que hemos arrancado Kafka correctamente, ejecutaremos el siguiente comando para crear nuestro topic al que denominaremos “tweets”:

./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic tweets --create --partitions 1 --replication-factor 1

Como podemos observar, hemos creado el topic haciendo referencia a nuestro zookeeper (ya que es el componente encargado de gestionar los topics) y lo hemos creado en una sola partición con un factor de replicación igual a 1.

Estos valores cambiarían si deseamos crear una copia del contenido del topic por seguridad, por ejemplo para entornos productivos se recomienda un factor de replicación mayor o igual a 3. Para comprobar que hemos creado el topic correctamente podemos listar los topic creados:

kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

Productor

Lo primero que deberemos hacer para crear nuestro productor será añadir las dependencias necesarias, en este caso haremos uso de la librería de Kafka para Spring Boot (desde spring initializr con la dependencia ‘Spring for Apache Kafka’)..

Posteriormente, procederemos a aplicar las properties necesarias para establecer la configuración de nuestro microservicio con Apache Kafka.

Configuración

Dividiremos las properties para la integración con Apache Kafka y la configuración de nuestro productor en tres apartados:

Configuración básica e integración con Kafka

Cabe destacar la dirección IP y el puerto de nuestro servidor de Apache Kafka, así como la clase encargada de serializar la clave y el valor de nuestros mensajes. Y, por supuesto, el nombre del topic que hemos creado anteriormente, al que volcaremos los mensajes desde nuestro productor.

Productor seguro

A continuación describiremos una serie de situaciones que pueden comprometer la integridad de nuestros datos y las soluciones que son convenientes aplicar para que el envío de los mensajes por parte de nuestro productor sea seguro.

Puede ocurrir que el productor introduzca mensajes duplicados en Kafka por algún problema en la red. Para solucionar este problema configuraremos nuestro productor para que sea idempotente y así de esta forma Kafka detectará si un mensaje le está llegando por duplicado y no lo almacenará.

Para ello activaremos la property “spring.kafka.producer.properties.enable.idempotence” con valor igual a “true”.

Por otro lado, activaremos los acks usando la property “spring.kafka.producer.acks”. Para confirmar la recepción de mensajes de nuestra partición líder indicando el valor igual a 1 o 0 si no queremos confirmación (por defecto).

Para los casos en los que un topic tiene un factor de replicación mayor a 1, es posible que se desee que ack confirme la recepción del mensaje por parte de todas las particiones, en ese caso estableceremos el valor “all”.

En nuestro caso, tenemos un topic con un factor de replicación igual a 1 pero dado que queremos crear un productor idempotente Kafka nos obliga a establecer el valor a “all”, ya que si no nos dará un error en la creación.

Si al tratar de enviar un mensaje a Kafka no obtenemos el ack, es posible que nos interese realizar un reintento del envío. Para ello podemos establecer el número máximo de reintentos mediante la property “spring.kafka.producer.retries”.

Mejora del rendimiento

Para mejorar el rendimiento, lo primero que haremos será establecer un tipos de compresión de mensajes, entre los cuales se incluyen GZIP, Snappy, LZ4 y ZStandard.

Dado que estamos enviando mensajes en formato JSON el más apropiado será el tipo Snappy, ya que es el más óptimo para mensajes de texto.

Para obtener más información sobre los tipos de compresión y su funcionamiento, recomiendo que consulteis el siguiente artículo.

Kafka intenta enviar los mensajes tan pronto como sea posible por defecto, de este modo pueden acumularse mensajes para enviar mientras otros están siendo procesados.

Con el fin de aumentar el rendimiento mejorando la eficiencia, Kafka nos permite aplicar un procesamiento por lotes configurable, lo cual puede resultar una solución interesante.

Para ello estableceremos el tamaño del bloque a 32 KB (se recomienda entre 32 y 64 KB para mejorar el rendimiento) e introduciremos un pequeño margen de tiempo de 20 ms para esperar a que los mensajes se incluyan en un bloque listo para enviar.

En el caso de que pase ese tiempo y no haya llegado otro mensaje, el bloque se enviará al topic.

Si no ha llegado otro mensaje, el bloque se enviará al topic.

Servicio del productor

Nuestro productor será un microservicio que utilizará la librería hbc-core para conectarse a Twitter mediante una cuenta de Twitter Developer para obtener tweets deportivos que contengan las palabras NBA y NFL. Para ello estableceremos la siguiente configuración:

Configuración que establecemos para Twitter.

Una vez configurado nuestro cliente de Twitter crearemos un servicio que se ejecutará cada 10 segundos e insertará 10 tweets en el topic “tweets” en cada ejecución.

Cabe destacar el uso de la clase KafkaTemplate cuya instancia contiene un productor de Kafka obteniendo la configuración mencionada en el apartado anterior a través de nuestro fichero de properties.

Esta instancia nos proporcionará los métodos necesarios para enviar mensajes a nuestro topic indicando en su declaración la clave y el valor de nuestros mensajes.

Esta instancia nos proporciona los métodos necesarios para enviar mensajes a nuestro topic.

El resultado obtenido de la ejecución del servicio es el siguiente:

Resultado que hemos obtenido en la ejecución.

Al tratarse de un microservicio que únicamente se encarga de obtener datos de un sistema externo (Twitter) para volcarlos en Apache Kafka sin realizar modificaciones sobre ellos, este microservicio podría sustituirse por un Connector de Kafka para Twitter.

De esta forma reduciremos el tiempo de desarrollo utilizando un componente ya desarrollado específicamente para este objetivo.

Consumidor

Al igual que hicimos al crear nuestro productor, añadiremos la dependencia de Kafka para Spring Boot en el pom de nuestro consumidor.

Además, añadiremos la anotación @EnableKafka para habilitar la detección de los métodos anotados con @KafkaListener para poder consumir menajes de Kafka.

Añadiremos la anotación @EnableKafka para habilitar la detección de los métodos anotados.

Configuración

Además de la configuración del servidor y de la base de datos Elasticsearch, estableceremos las properties necesarias para el consumo de mensajes. Entre ellas hay que destacar:

Permite detectar el tipo o la clase sobre la que deben descomponer los mensajes del topic.

Servicio del consumidor

Una vez realizada la configuración de nuestro microservicio para conectar con Kafka, vamos a crear el método que se encargará de consumir los mensajes de un topic.

Para ello crearemos una clase como servicio de Spring y anotaremos el método encargado de recibir los mensajes con la anotación @KafkaListener pasando como parámetro el nombre del topic.

Crearemos una clase como servicio de Spring y anotaremos el método encargado de recibir los mensajes con la anotación @KafkaListener

Al recibir los mensajes hemos realizado dos operaciones: parsear el mensaje recibido como JSON a un POJO y enviar el mensaje a Elasticsearch mediante su API REST para almacenarlo.

Encapsular el tweet en un POJO nos permite reducir la información de este a únicamente los campos que necesitamos para nuestra lógica de negocio, además de poder realizar cualquier operación sobre el objeto.

Sin embargo, a la hora de almacenarlo en nuestra base de datos hemos optado por almacenar el JSON completo.

A continuación se muestra una captura con los resultados obtenidos del microservicio al consumir mensajes de Kafka:

Captura con los resultados obtenidos del microservicio al consumir mensajes de Kafka.

Por otro lado, mostramos el resultado del JSON almacenado en nuestra base de datos Elasticsearch:

Resultado del JSON almacenado en nuestra base de datos Elasticsearch:

Al igual que hemos comentado para el caso del productor, este ejemplo de microservicio realizando el papel de un consumidor en el ecosistema de Apache Kafka podría haberse sustituido por un conector de Elasticsearch que leyese mensajes del topic indicado y los almacenase directamente en base de datos.

Conclusiones

En este post hemos realizado una introducción de los aspectos principales de Apache Kafka, así como una explicación de sus principales componentes para una comprensión rápida del producto.

Además, hemos podido comprobar cómo el proceso de integración con nuestros proyectos con Spring es algo sencillo y nos ofrece una gran variedad de opciones a la hora de configurarlo.

Actualmente lo estamos utilizando en nuestro proyecto, siendo una pieza clave en la evolución del mismo, convirtiéndose en uno de los principales mecanismos de comunicación entre nuestros microservicios.

¿Quieres seguir aprendiendo sobre Kafka?

En este post hemos hablado sobre comunicar microservicios con Apache Kafka, pero tenemos mucho más sobre este apasionante universo:

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.