En el post anterior se explicó el funcionamiento del Schema Registry con Kafka y los tipos de compatibilidades que ofrece desde un punto de vista teórico. En este post tenemos como objetivo entender mejor la evolución de los esquemas, pero de forma práctica interactuando con Kafka.

Configurando el entorno

En este caso vamos a utilizar dos recursos: un cluster de kafka y Confluent. Como requisito previo es necesario tener instalado docker y docker compose.

Para configurar las herramientas simplemente se necesita:

1. Un cluster de kafka utilizando Docker Compose con la imagen docker de fast-data-dev en la que se incluye: Kafka, Schema Registry y Rest Proxy.

Este es el fichero docker-compose.yml:

version: '2'
services:
  # kafka cluster.
  kafka-cluster:
    image: landoop/fast-data-dev:cp3.3.0
    environment:
      ADV_HOST: 127.0.0.1         
      RUNTESTS: 0                 
      FORWARDLOGS: 0             
      SAMPLEDATA: 0              
    ports:
      - 2181:2181                 # Zookeeper
      - 3030:3030                 # Landoop UI
      - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
      - 9581-9585:9581-9585       # JMX Ports
      - 9092:9092                 # Kafka Broker

Con este fichero simplemente se puede levantar el cluster con el comando:

docker-compose up

Al entrar desde un navegador a localhost:3030 podéis ver la consola de administración.

2. Utilizar Confluent para producir y consumir mensajes por línea de comandos. Para hacerlo utilizaremos docker una vez más, en este caso con esta imagen y para utilizarla simplemente es necesario ejecutar el siguiente comando:

docker run -it --rm --net=host confluentinc/cp-schema-registry:5.5.5 bash


Una vez levantado el contenedor podemos ejecutar los comandos:

Esta forma de producir y consumir mensajes de Kafka no es un método productivo, lo normal es tener clientes en un lenguaje como Java, pero sí sirve para entender los conceptos principales como es este caso.

Compatibilidad forward

1. El primer paso es tener arrancado el cluster de Kafka con el fichero de docker compose anterior.

2. Generar un esquema Avro en Schema Registry. Utilizaremos el mismo esquema de ejemplo de una línea de log del ejemplo del post anterior. Para hacerlo:

Ya tenemos nuestro primer esquema con su tipo de compatibilidad y vamos a probarlo con los comandos de Confluent.

3. Se crea desde un terminal un productor de datos con el esquema anterior, productor v1, que publica información en un topic llamado logline-forward con el siguiente comando:

kafka-avro-console-producer \
    --broker-list 127.0.0.1:9092 --topic logline-forward \
    --property schema.registry.url=http://127.0.0.1:8081 \
    --property value.schema='{"type":"record","name":"logLineForward","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"sessionId","type":["null","string"],"default":null}]}'

4. Se crea desde un terminal un consumidor de datos con el esquema anterior, consumidor v1, que procesa información del mismo topic logline-forward con el siguiente comando:

kafka-avro-console-consumer --topic logline-forward \
    --bootstrap-server 127.0.0.1:9092 \
    --property schema.registry.url=http://127.0.0.1:8081 \
    --property value.schema='{"type":"record","name":"logLineForward,"fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"sessionId","type":["null","string"],"default":null}]}'

5. Se comienza a enviar datos desde el productor v1 y vemos cómo son leídos por el consumidor v1:

6. Se actualiza el esquema Avro de nuestra línea de log siguiendo los cambios permitidos para este tipo de compatibilidad en el Registry:

7. Se genera un productor de datos con el nuevo esquema, el productor v2, con el siguiente comando:

kafka-avro-console-producer \
    --broker-list 127.0.0.1:9092 --topic logline-forward \
    --property schema.registry.url=http://127.0.0.1:8081 \
    --property value.schema='{"type":"record","name":"logLineForward","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"message","type":"string"}]}'

8. Se generan datos con el nuevo esquema desde el productor v2 y se observa como el antiguo consumidor es capaz de procesar los mensajes:

Conclusiones:

Compatibilidad backward

1. Para probar este modo de compatibilidad hay que repetir hasta el paso 3 del modo forward anterior, pero con dos salvedades:

2. Se actualiza el esquema Avro de nuestra línea de log siguiendo los cambios permitidos para este tipo de compatibilidad en el Registry:

3. Se genera un consumidor utilizando el esquema evolucionado, es decir, la versión 2 del esquema con el siguiente comando:

kafka-avro-console-producer \ --broker-list 127.0.0.1:9092 --topic logline-backward \
    --property schema.registry.url=http://127.0.0.1:8081 \
    --property value.schema='{"type":"record","name":"logLineBackward","namespace":"test.avro.backward","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"sessionId","type":["null","string"],"default":null},{"name":"message","type":["null","string"],"default":null}]}'

4. Se envían datos desde el productor v1, el que tenía la versión de esquema previo, y se observa cómo el consumidor v2 con el esquema nuevo es capaz de procesarlo.

Conclusiones:

Compatibilidad completa o full

1. Para probar este modo de compatibilidad hay que repetir hasta el paso 5 del modo forward pero con dos salvedades:

En este punto tenemos configurados nuestro productor y consumidor con la versión inicial del esquema (v1) lo que se ha llamado productor v1 y consumidor v1.

2. Se actualiza el esquema Avro de nuestra línea de log siguiendo los cambios permitidos para este tipo de compatibilidad en el Registry:

En este punto ya tenemos la versión 2 de nuestro esquema.

3. Se genera un productor de datos con el nuevo esquema, el productor v2, con el siguiente comando:

kafka-avro-console-producer \
    --broker-list 127.0.0.1:9092 --topic logline-full \
    --property schema.registry.url=http://127.0.0.1:8081 \
    --property value.schema='{"type":"record","name":"logLineFull","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"message","type":["null","string"],"default":null}]}'

4. Se genera un consumidor de datos con el nuevo esquema, el productor v2, con el siguiente comando:

kafka-avro-console-consumer --topic logline-full \
    --bootstrap-server 127.0.0.1:9092 \
    --property schema.registry.url=http://127.0.0.1:8081 \
    --property value.schema='{"type":"record","name":"logLineFull","fields":[{"name":"ip","type":"string"},{"name":"url","type":"string"},{"name":"referrer","type":"string"},{"name":"userAgent","type":"string"},{"name":"message","type":["null","string"],"default":null}]}'

5. Se envían a Kafka datos desde el productor con la versión inicial del esquema, productor v1. Se observa como tanto el consumidor v1 del esquema inicial como el consumidor v2 del esquema evolucionado pueden procesar el mensaje:

6. Se envían datos a Kafka desde el productor con la nueva versión del esquema, productor v2. Se observa como tanto el consumidor v1 del esquema inicial como el consumidor v2 del esquema evolucionado pueden procesar el mensaje:

Conclusiones:

Consejos para mantener la compatibilidad

¿Qué tipo de compatibilidad uso?

¿Cómo genero los esquemas para evitar problemas de compatibilidad?

Cuéntanos qué te parece.

Enviar.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.