Cuando trabajamos con un sistema de colas implementado a través de Kafka, muchas veces nos encontramos con la necesidad de hacer una réplica de datos o simplemente hacer un traspaso de información de un cluster a otro.

En ese momento nos preguntamos, ¿cómo podemos llevar esto a cabo sin rompernos la cabeza y que el resultado sea eficaz y eficiente?

Kafka, en sí, nos provee de una herramienta bastante útil para hacer replicado de datos sin que esto suponga un gran esfuerzo más que el coste de la configuración inicial. Esta herramienta es Kafka Mirror Maker.

¿Qué es el Kafka Mirror Maker?

Es una herramienta (script) diseñada para transferir datos de uno o más tópicos entre un cluster origen y otro destino. Dicho script se encarga de levantar un proceso que hace las veces de consumidor y productor a la vez (recolectando los mensajes de los tópicos indicados en el cluster origen y llevándolos al cluster destino).

Como hemos dicho el script tiene la responsabilidad de desempeñar tanto de consumidor como de productor, por este motivo es imprescindible indicar la configuración de cada una de estas piezas para llevar a cabo la ejecución del mismo.

Arquitectura básica

Lo que sigue a continuación es una muestra de arquitectura básica para la utilización de la herramienta Kafka Mirror Maker.

A continuación mostramos un ejemplo básico de configuración para conectar replicar datos de un cluster a otro mediante Kafka Mirror Maker.

Fichero de configuración para el consumidor

Primero definimos los parámetros de configuración el hilo consumidor:

# sourceClusterConsumer.config file #
zookeeper.connect=:\[,:\]\*
group.id=mirror-maker-group

En la propiedad zookeeper.connect debe indicarse la o las direcciones IP’s y puertos del zookeeper en el cluster origen.

El group.id será utilizado por consumidor para gestionar el offset en los tópicos a replicar.

Fichero de configuración para el productor

Ahora definimos los parámetros de configuración para el hilo productor:

#targetClusterProducer.config file #
bootstrap.servers=:\[,:\]\*

La propiedad bootstrap.servers debe indicar la o las direcciones ip’s y puertos del zookeeper (kafka broker) en el cluster destino.

Finalmente, para lanzar el script basta con indicar las rutas de los ficheros de configuración que hemos definido previamente, así como también el valor para la propiedad --whitelist, en la cual indicaremos los tópicos a ser replicados (esta puede ser una lista separada por comas o bien puede definirse como un patrón de una expresión regular).

$ bin/kafka-mirror-maker.sh --consumer.config sourceClusterConsumer.config
\--producer.config targetClusterProducer.config --whitelist=".\*"

El script lo encontraremos en el directorio bin dentro de la ruta de instalación de kafka.

Voilà! Ya podemos iniciar nuestro proceso de replicación de tópicos entre dos clusters kafka.

Configurando Kafka Mirror Maker

Como hemos podido observar anteriormente, la comunicación entre dos brokers de kafka a través de Mirror Maker es muy sencilla de conseguir, ya que basta con conocer las direcciones IP y puertos de los servidores (origen y destino), conjuntamente con el listado de los tópicos (colas) que se desea replicar.

Sin embargo cuando hablamos de entornos productivos, es importante tener en cuenta una serie de criterios que nos permitirán hacer una correcta configuración de la herramienta (script) y que a su vez vaya acorde a nuestras necesidades. Todo lo anterior nos genera la siguiente interrogante: ¿cómo saber cuál es la mejor configuración?

Pues, la respuesta es, depende, ya que la mejor solución va directamente relacionado a nuestras prioridades. Es decir, si necesitamos explotar el rendimiento (esto en aplicaciones donde la transferencia de datos se debe hacer en tiempo real), o bien tener máxima fiabilidad y consistencia de datos, entre otros aspectos.

En la siguiente sección hablaremos sobre un conjunto de consideraciones y buenas prácticas a tener en cuenta al momento de realizar replicación de tópicos sin dejar de lado la calidad y el buen desempeño.

Máximo rendimiento vs integridad de los datos

Existe una serie de propiedades relevantes con las que podemos jugar para intentar mejorar el rendimiento o bien para bajar la tasa de pérdida de mensajes, pero como sabemos, no se puede tener todo lo en la vida, con cual cualquier cambio para mejorar el rendimiento puede afectar a la integridad de los datos y viceversa.

Por ejemplo, sobre la configuración del consumidor es posible establecer la propiedad auto.commit.enabled=false, la cual asegura que los mensajes se marcarán como procesados únicamente cuando se haya recibido el respectivo ACK. Este atributo es fundamental para minimizar el porcentaje de pérdida de mensajes. Sin embargo desde el punto de vista del rendimiento, no es la configuración más recomendada, ya que aumenta el tráfico de paquetes en la red y añade latencia.

Con respecto al productor, a continuación listamos las propiedades que pueden tener mayor impacto sobre el desempeño y tratamiento de datos:

  1. batch.size**:** El tamaño del buffer es un atributo importante a definir, con él es posible optimizar el envío de mensajes, ya que se realizan peticiones por bloques de mensajes, y lo es más aún cuando trabajamos en combinación con algún tipo de compresión. La compresión suele ser más rápida y mejora el rendimiento cuando los bloques de mensajes son más grandes (el ratio de compresión también mejora). Sin embargo, a mayor tamaño del buffer la latencia también será mayor, ya que se necesitará más tiempo para completar el bloque.
  2. linger.ms**:** Tiempo en milisegundos para acumular mensajes en el buffer. Para cuando el buffer es muy grande este tiempo tenderá a ser mayor para que el mismo pueda llenarse a su máxima capacidad. Esto está muy bien para trabajar con mayor compresión de datos, pero se paga el precio ganando latencia, con lo cual hay que tener mucho cuidado al momento de asignar el valor a este atributo, porque un valor muy alto puede afectar directamente al rendimiento.
  3. max.in.flight.requests.per.connection: Representa el número máximo de solicitudes no confirmadas que el cliente enviará en una sola conexión antes de bloquear. Para casos de uso donde la integridad del dato es importante, el valor recomendado de dicho atributo es 1. Cabe destacar que esta configuración no es la óptima escenarios donde el rendimiento juega un papel fundamental, pero por otro lado sí es una buena forma de reducir el ratio en cuanto a pérdida de datos. El rendimiento mejorará a medida de que el valor de dicha configuración sea mayor, pero no podrá garantizarse que no existan problemas de ordenamiento e integridad de los mensajes.
  4. acks: Si lo que queremos tener es alto rendimiento, este atributo debe ser establecido como acks=0, lo que es igual a decir que no se enviarán paquetes de confirmación de recepción de mensajes. Evidentemente, esto aumentaría la probabilidad de pérdida de datos. Por el contrario, si lo que queremos es disminuir el ratio de pérdida de mensajes, entonces el valor recomendado para este atributo es acks=-1, ya que no se confirmará la recepción del mensaje hasta que no sea notificado a todos los nodos que conforman el cluster.
  5. retries: El número de reintentos (retries) sólo tiene sentido cuando el broker destino pasa a través de un balanceador de carga. Por otro lado podemos decir que, a mayor número de reintentos más garantía hay de que no haya pérdida de mensajes aunque esto tiene penalización en cuanto al rendimiento, ya que entre cada reintento y otro se estará consumiendo tiempo añade retardo en la transmisión de datos.
  6. compression.type: La compresión es una parte importante del trabajo del productor, ya que la velocidad dependerá del tipo de compresión seleccionado, además del ancho de banda requerido para la transferencia de datos.

La selección del tipo de compresión dependerá de nuestra necesidad, es decir, con gzip ganamos en tasa de compresión, pero perdemos en rendimiento.

Por otro lado, si usamos lz4 obtendremos mejor rendimiento, pero la capacidad de compresión será más baja. También se puede configurar un tipo de compresión media como la snappy, la cual nos ofrece un balance entre la capacidad de compresión el rendimiento. Cuando no tenemos claro qué es lo que más nos conviene es una buena alternativa porque la penalización en cualquiera de los dos casos es mucho menor.

Siempre es recomendable usar algún tipo de compresión, ya que con ello haremos un mejor uso del espacio de almacenamiento de datos. Además de sacar mayor provecho del ancho de banda de la red.

Como ya mencionamos, la compresión va directamente relacionada a la capacidad de realizar envíos por lotes, es decir, a mayor cantidad de lotes, conseguiremos una compresión eficiente.

Una forma de mejorar el proceso de compresión es incrementando el número de hilos productores, esto a su vez se verá reflejado de forma positiva en el rendimiento del proceso de réplica de datos.

Elección de la configuración óptima

Es un proceso bastante empírico que consiste en ir realizando con diversas combinaciones de los parámetros de configuración y volumen de datos hasta dar con los valores que se ajusten mejor a nuestras necesidades.

En dicho proceso de selección nos puede ser bastante útil el uso de la herramienta kafka-producer-perf-test.sh, la cual nos permitirá medir el rendimiento y el comportamiento general de la aplicación. Dicha herramienta la podemos encontrar dentro de la suite de comandos de Kafka.

Con respecto a los tópicos

  1. Es un requisito indispensable que el tópico a clonar exista en cluster, bien sea porque lo creemos de forma manual o bien porque dicho cluster tenga habilitada la opción auto.create.topics.enable=true.
  2. Los tópicos deben estar configurados con igual número de particiones así como también el tipo de compresión de datos (si éste la tuviera), de esta manera es posible sacar mejor rendimiento del proceso Kafka Mirror Maker. Por ejemplo, si el tópico dataWarehouse tiene 10 particiones y utiliza una compresión de datos gzip en el cluster origen, entonces deberá tener la misma configuración en el cluster destino.

Acerca de Kafka Mirror Maker

Podría ser una buena alternativa tener algún proceso (demonio watchdog) que monitorice el estado del Kafka Mirror Maker y en caso de que éste por alguna razón se cayera, el demonio se encargaría de restablecer el servicio.

Lo recomendable es que dicho número no supere al número de particiones por tópico, de otra forma el excedente serían hilos sin carga de trabajo o mejor dicho workers ociosos. En caso de que el número fuese menor, no habría ningún problema, ya que los hilos o consumidores se repartirán la carga entre sí.

El número de hilos para los consumidores se define a través de la propiedad num.streams, la misma debe ser indicada como parámetro del script. Por ejemplo, si el número de consumidores deseados es 4 entonces la propiedad se establecerá a --num.streams=4.

Recuerda: Por temas de mejor rendimiento, siempre se recomienda que el proceso Kafka Mirror resida en el cluster destino. Esto permitiría reducir la latencia.

Esta configuración dependerá siempre de lo que se considere más conveniente, acorde a la necesidad del negocio, velocidad y continuidad de trabajo vs. persistencia y consistencia de datos**.**

Conclusión

Hemos conocido una herramienta bastante útil y potente en lo que respecta al proceso de replicación de datos de una cola Kafka de un cluster a otro.

Es un proceso, a mi parecer, bastante sencillo de configurar y de usar, que no supone mayores dolores de cabeza, cosa que se agradece muchísimo.

El truco para hallar la mejor configuración radica en evaluar cuáles son nuestras prioridades: preservación de datos, alto rendimiento, además del tipo de entorno al que vaya a ser aplicado.

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.