¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.¿Buscas nuestro logo?
Aquí te dejamos una copia, pero si necesitas más opciones o quieres conocer más, visita nuestra área de marca.
Conoce nuestra marca.dev
Noelia Martín 29/09/2021 Cargando comentarios…
En el post anterior se explicó el funcionamiento del Schema Registry con Kafka y los tipos de compatibilidades que ofrece desde un punto de vista teórico. En este post tenemos como objetivo entender mejor la evolución de los esquemas, pero de forma práctica interactuando con Kafka.
En este caso vamos a utilizar dos recursos: un cluster de kafka y Confluent. Como requisito previo es necesario tener instalado docker y docker compose.
Para configurar las herramientas simplemente se necesita:
1. Un cluster de kafka utilizando Docker Compose con la imagen docker de fast-data-dev en la que se incluye: Kafka, Schema Registry y Rest Proxy.
Este es el fichero docker-compose.yml:
version: '2'
services:
# kafka cluster.
kafka-cluster:
image: landoop/fast-data-dev:cp3.3.0
environment:
ADV_HOST: 127.0.0.1
RUNTESTS: 0
FORWARDLOGS: 0
SAMPLEDATA: 0
ports:
- 2181:2181 # Zookeeper
- 3030:3030 # Landoop UI
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, Kafka Connect ports
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker
Con este fichero simplemente se puede levantar el cluster con el comando:
docker-compose up
Al entrar desde un navegador a localhost:3030 podéis ver la consola de administración.
2. Utilizar Confluent para producir y consumir mensajes por línea de comandos. Para hacerlo utilizaremos docker una vez más, en este caso con esta imagen y para utilizarla simplemente es necesario ejecutar el siguiente comando:
docker run -it --rm --net=host confluentinc/cp-schema-registry:5.5.5 bash
Una vez levantado el contenedor podemos ejecutar los comandos:
Esta forma de producir y consumir mensajes de Kafka no es un método productivo, lo normal es tener clientes en un lenguaje como Java, pero sí sirve para entender los conceptos principales como es este caso.
1. El primer paso es tener arrancado el cluster de Kafka con el fichero de docker compose anterior.
2. Generar un esquema Avro en Schema Registry. Utilizaremos el mismo esquema de ejemplo de una línea de log del ejemplo del post anterior. Para hacerlo:
Ya tenemos nuestro primer esquema con su tipo de compatibilidad y vamos a probarlo con los comandos de Confluent.
3. Se crea desde un terminal un productor de datos con el esquema anterior, productor v1, que publica información en un topic llamado logline-forward con el siguiente comando:
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic logline-forward \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"logLineForward","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"sessionId","type":["null","string"],"default":null}]}'
4. Se crea desde un terminal un consumidor de datos con el esquema anterior, consumidor v1, que procesa información del mismo topic logline-forward con el siguiente comando:
kafka-avro-console-consumer --topic logline-forward \
--bootstrap-server 127.0.0.1:9092 \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"logLineForward,"fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"sessionId","type":["null","string"],"default":null}]}'
5. Se comienza a enviar datos desde el productor v1 y vemos cómo son leídos por el consumidor v1:
6. Se actualiza el esquema Avro de nuestra línea de log siguiendo los cambios permitidos para este tipo de compatibilidad en el Registry:
7. Se genera un productor de datos con el nuevo esquema, el productor v2, con el siguiente comando:
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic logline-forward \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"logLineForward","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"message","type":"string"}]}'
8. Se generan datos con el nuevo esquema desde el productor v2 y se observa como el antiguo consumidor es capaz de procesar los mensajes:
1. Para probar este modo de compatibilidad hay que repetir hasta el paso 3 del modo forward anterior, pero con dos salvedades:
2. Se actualiza el esquema Avro de nuestra línea de log siguiendo los cambios permitidos para este tipo de compatibilidad en el Registry:
3. Se genera un consumidor utilizando el esquema evolucionado, es decir, la versión 2 del esquema con el siguiente comando:
kafka-avro-console-producer \ --broker-list 127.0.0.1:9092 --topic logline-backward \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"logLineBackward","namespace":"test.avro.backward","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"sessionId","type":["null","string"],"default":null},{"name":"message","type":["null","string"],"default":null}]}'
4. Se envían datos desde el productor v1, el que tenía la versión de esquema previo, y se observa cómo el consumidor v2 con el esquema nuevo es capaz de procesarlo.
1. Para probar este modo de compatibilidad hay que repetir hasta el paso 5 del modo forward pero con dos salvedades:
En este punto tenemos configurados nuestro productor y consumidor con la versión inicial del esquema (v1) lo que se ha llamado productor v1 y consumidor v1.
2. Se actualiza el esquema Avro de nuestra línea de log siguiendo los cambios permitidos para este tipo de compatibilidad en el Registry:
En este punto ya tenemos la versión 2 de nuestro esquema.
3. Se genera un productor de datos con el nuevo esquema, el productor v2, con el siguiente comando:
kafka-avro-console-producer \
--broker-list 127.0.0.1:9092 --topic logline-full \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"logLineFull","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"message","type":["null","string"],"default":null}]}'
4. Se genera un consumidor de datos con el nuevo esquema, el productor v2, con el siguiente comando:
kafka-avro-console-consumer --topic logline-full \
--bootstrap-server 127.0.0.1:9092 \
--property schema.registry.url=http://127.0.0.1:8081 \
--property value.schema='{"type":"record","name":"logLineFull","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"message","type":["null","string"],"default":null}]}'
5. Se envían a Kafka datos desde el productor con la versión inicial del esquema, productor v1. Se observa como tanto el consumidor v1 del esquema inicial como el consumidor v2 del esquema evolucionado pueden procesar el mensaje:
6. Se envían datos a Kafka desde el productor con la nueva versión del esquema, productor v2. Se observa como tanto el consumidor v1 del esquema inicial como el consumidor v2 del esquema evolucionado pueden procesar el mensaje:
¿Qué tipo de compatibilidad uso?
¿Cómo genero los esquemas para evitar problemas de compatibilidad?
Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.
Estamos comprometidos.
Tecnología, personas e impacto positivo.
Cuéntanos qué te parece.