En publicaciones anteriores hemos hablado sobre comunicar microservicios con Apache Kafka, sobre las APIs de Kafka, sobre Kafka Streams y sobre muchas otras cuestiones relacionadas con esta tecnología.

¿Y si te dijera que existe una herramienta para transferir datos entre prácticamente cualquier sistema externo y Kafka? ¿Te imaginas poder importar y exportar datos desde o hacia cualquier base de datos, sistema de ficheros, almacenes clave-valor, índices de búsqueda, etc.? ¿A que estaría bien y sería muy útil?

Pues sí, efectivamente, lo has adivinado, esa herramienta existe y (como seguro que también has intuido) se llama Kafka Connect. Es un componente opensource de Apache Kafka y utiliza conectores para mover grandes conjuntos de datos.

Existen dos tipos de conectores: conectores Source (origen) y conectores Sink (destino). Los primeros importan datos de los sistemas externos para publicarlos en el clúster de Kafka e internamente utilizan la API Producer. Y los segundos consumen datos del clúster Kafka para exportarlos hacia sistemas externos e internamente utilizan la API Consumer.

Kafka Connect hace básicamente lo mismo que vimos con los productores y consumidores para introducir y sacar datos de Kafka pero está orientado a facilitar la integración sin tener que desarrollar productores o consumidores ad hoc.

Existen una gran variedad de conectores ya hechos y listos para utilizar e integrar casi cualquier sistema externo. Si tenemos un caso de uso común, como mover datos desde una base de datos a Kafka, seguramente existirá un conector ya preparado para que lo utilicemos. Y generalmente tendremos tanto la versión Sink como la versión Source.

¿Dónde podemos encontrar conectores preconstruidos?

Los conectores se diseñan para ser escalables y reutilizables. Además, en caso de necesitarlo, la API Connect nos permite también que nos creemos nuestro propio conector.

Además cuenta con otras características importantes. Por defecto, es distribuido y escalable. Permite agregar más workers para escalar el clúster. Aprovechando las capacidades de Kafka, permite **integrar sistemas de streaming de datos y batch. **

Gestiona de forma automática el offset ayudando a controlar el procesamiento de los commit del offset, lo que nos evita implementar a mano esa parte tan propensa a errores. Gracias a las transformaciones es muy sencillo realizar ligeras modificaciones sobre mensajes individuales.

¿Cómo se ejecuta Kafka Connect?

En el siguiente diagrama podemos ver que tenemos nuestro clúster de Kafka y el demonio de Kafka Connect se ejecuta de forma independiente (podría ejecutarse en un servidor diferente). Y se levantan los conectores Sink y Source por separado lo que proporciona mayor escalabilidad y resiliencia para importar y exportar datos a y hacia sistemas externos.

¿Cómo se usa Kafka Connect?

Veamos con un ejemplo muy básico cómo utilizar Kafka Connect. Lo primero que haremos será crear un topic donde publicaremos nuestros eventos:

kafka-topics --bootstrap-server localhost:9092 --create --topic topic_connect --partitions 1 --replication-factor 1

Crearemos unos conectores muy sencillos que simplemente leerán y escribirán datos en ficheros locales. Para ello creamos los ficheros de prueba:

touch entrada.txt

touch salida.txt

chmod 777 salida.txt

Y dentro del fichero de entrada escribiremos texto de prueba que pueda ser leído.

Para crear el conector vamos a interactuar con la API REST de Kafka Connect: https://localhost:8083/connectors

En primer lugar crearemos un conector de tipo Source para coger datos de una fuente externa e introducirlos en Kafka:

curl -X POST http://localhost:8083/connectors \
> -H ‘Accept: */*’  \
> -H ‘Content-Type: application/json’ \
> -d ‘{
>    “name”:”file_source_connector”,
>    “config”: {
>        “connector.class”: “org.apache.kafka.connect.file.FileStreamSourceConnector”,
>        “topic”: “topic_connect”,
>        “file”: “/home/user/entrada.txt”,
>        “value.converter”: “org.apache.kafka.connect.storage.StringConverter”
>    }
> }’

Con el curl anterior estamos enviando la configuración del conector en formato JSON. En primer lugar figura el nombre. Y en el bloque de configuración, en la primera propiedad, le indicamos la clase que implementa el conector que, en este caso, realizará la lectura de un fichero como fuente externa.

A continuación especificamos el topic al que importaremos datos y el nombre del fichero que vamos a leer. Y por último le indicamos que usaremos un convertidor a String. Una vez ejecutemos el comando nuestro conector se creará e importará datos en el topic.

Antes de echar un vistazo al topic podemos hacer una llamada al API de Kafka Connect para ver la configuración del conector que acabamos de crear:

curl http://localhost:8083/connectors/file_source_connector

También podemos ver el estado actual del conector y los posibles mensajes de error que puedan existir invocando a este endpoint:

curl http://localhost:8083/connectors/file_source_connector/status

Para comprobar si los datos se han enviado correctamente al topic podemos levantar un consumidor por consola:

kafka-console-consumer --bootstrap-server localhost:9092 --topic topic_connect --from-beginning

En la salida nos mostrará las mismas líneas que hayamos escrito en nuestro fichero.

Vayamos ahora con el ejemplo de conector Sink. El curl es prácticamente igual al anterior solo que ahora usaremos como clase del conector la versión Sink del mismo: FileStreamSinkConnector. Y que el fichero que le indiquemos será el de salida.

curl -X POST http://localhost:8083/connectors \
> -H ‘Accept: */*’  \
> -H ‘Content-Type: application/json’ \
> -d ‘{
>    “name”:”file_sink_connector”,
>    “config”: {
>        “connector.class”: “org.apache.kafka.connect.file.FileStreamSinkConnector”,
>        “topic”: “topic_connect”,
>        “file”: “/home/user/salida.txt”,
>        “value.converter”: “org.apache.kafka.connect.storage.StringConverter”
>    }
> }’

Una vez creado el conector podemos abrir el fichero de salida donde se habrán escrito los datos.

Para borrar los conectores sólo tenemos que llamar al API con el método DELETE y el nombre del conector:

curl -X DELETE http://localhost:8083/connectors/file_source_connector

curl -X DELETE http://localhost:8083/connectors/file_sink_connector

Hemos visto una pequeña introducción a Kafka Connect, a sus diferentes tipos de conectores (sink y source). También hemos visto cuales son sus principales características. Finalmente hemos visto un ejemplo muy simple de cómo se crean conectores mediante su API REST.

Si este ejemplo te ha sabido a poco y quieres saber cómo usarlo con Debezium para conectar con una base de datos MySQL en este post puedes ver cómo hacerlo.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.