Hace unos días os contamos en qué consiste Debezium, principales características, componentes y funcionamiento. Ahora llega el momento de aterrizar todo lo aprendido y os mostramos cómo se configura e implementa una solución de CDC con el conector Debezium para una base de datos MySQL.

Para ello, vamos a usar el tutorial que proporciona Debezium, ya que ofrece una serie de imágenes Docker que simplifican la configuración del entorno.

Preparar el entorno

Dada la necesidad de los conectores Debezium de apoyarse de otras tecnologías que permitan explotar sus características, se van a iniciar los servicios necesarios para que el conector Debezium MySQL pueda funcionar correctamente:

docker run -it --rm \
--name zookeeper \
-p 2181:2181 -p 2888:2888 -p 3888:3888 \
debezium/zookeeper:1.5
docker run -it --rm \
--name kafka \
-p 9092:9092 \
--link zookeeper:zookeeper \
debezium/kafka:1.5
docker run -it --rm \
--name mysql \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=debezium \
-e MYSQL_USER=mysqluser \
-e MYSQL_PASSWORD=mysqlpw \
debezium/example-mysql:1.5

La imagen habilita y configura el servicio binlog que permite leer los logs transaccionales de la base de datos . Además, la imagen viene preparada con un modelo de datos sobre el que se van a realizar las pruebas.

docker run -it --rm \
--name connect \
-p 8083:8083 \
-e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
--link zookeeper:zookeeper \
--link kafka:kafka \
--link mysql:mysql \
debezium/connect:1.5

Una vez levantados los servicios y la base de datos necesarios, ya se tiene el entorno preparado para implementar una solución con el conector Debezium MySQL.

El siguiente paso es registrar el conector en Kafka Connect para monitorizar los cambios producidos en la base de datos.

Registrar el conector

Para registrar un conector, primero se tiene que definir la configuración asociada al conector para una base de datos MySQL:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",  
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",  //
    "database.server.name": "dbserver1",  
    "database.include.list": "inventory",  
    "database.history.kafka.bootstrap.servers": "kafka:9092",  
    "database.history.kafka.topic": "schema-changes.inventory"  
  }
}

Existen algunos aspectos de la configuración que resultan interesantes de mencionar:

La configuración mostrada es una configuración básica de un conector Debezium. A esta configuración se le pueden añadir todo tipo de transformaciones ofrecidas por Kafka Connect que permiten formatear los eventos de la forma requerida por cada caso de uso.

Una vez definida la configuración, ya se puede registrar el conector usando la API que proporciona Kafka Connect:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @conector-mysql.json

Si el conector se ha registrado con éxito, empezará a monitorizar la base de datos según la configuración definida y se podrá interactuar con el modelo de datos para ver el comportamiento del sistema.

De esta manera rápida y sencilla, se tiene implementada una solución de CDC para un servidor MySQL con Debezium.

Probando el sistema

Para probar el funcionamiento de la solución implementada, simplemente se necesita un cliente MySQL que permite conectarse a la base de datos para realizar alguna modificación, y un consumidor para Apache Kafka que consuma los eventos del topic asociado al cambio realizado.

El modelo de datos que contiene la base de datos de prueba es el siguiente.

Hay que tener en cuenta que los topics, según vimos en el artículo anterior, se han creado con la nomenclatura por defecto de forma automática en Apache Kafka. En este caso, son los siguientes:

dbserver1 (para cambios de estructura)
dbserver1.inventory.products (tabla products)
dbserver1.inventory.products_on_hand (tabla products_on_hand)
dbserver1.inventory.customers (tabla customers)
dbserver1.inventory.orders (tabla orders)
dbserver1.inventory.addresses (tabla addresses)
dbserver1.inventory.geom (tabla geom)

En este punto, ya se tienen todas las herramientas disponibles para poder empezar y verificar el funcionamiento de la solución implementada.

Analizando los eventos de Debezium

La idea de implementar una solución de CDC es poder reaccionar a los cambios que se producen en la base de datos de forma inmediata para acometer algún caso de uso. Para ello, hay que tratar con los eventos que producen los conectores Debezium y actuar en consecuencia.

En este apartado, se van a analizar los eventos que el conector Debezium MySQL envía a Apache Kafka, con la finalidad de entender su estructura y poder explotar esa información de la forma que se requiera.

Estructura del evento

Todos los eventos que genera el conector Debezium tienen una estructura similar, por lo que antes de ver las diferencias entre los distintos eventos, se va a analizar la estructura común.

El evento está compuesto por objetos principales, schema y payload.

{
  "schema": {},
  "payload": {}
}

El objeto schema contiene toda la información sobre la estructura del cambio, desde el esquema (campos y tipos) de la fila afectada antes y después del cambio, hasta la estructura de otro tipo de información que puede ser útil, como la operación de cambio, el conector o la fila modificada.

{
  "schema": {
    "type":"struct",
    "fields": [
      {
        "type": "schema",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "field1"
          }
          (...)
        ],
        "optional": true,
        "name": "dbserver.table1.database1.Value",
        "field": "before"
      },
      (...)
    ],    
    (...)
  }
}

El objeto payload contiene toda la información sobre los valores del cambio. Este objeto es el que más se suele explotar en los casos de uso que consumen esta información, ya que contiene toda la información de los valores de la fila anteriores al cambio, los posteriores al cambio, la operación que se ha realizado y la información complementaria tanto del conector como del propio cambio.

"payload": {
  "before": null,
  "after": {
    "field1": "test",
    "field2": 0.1
  },
  "source": {},
  "op": "c",
  "ts_ms": 1618681673399,
  "transaction": null
}

Un objeto complementa al otro y viceversa, lo que proporciona un gran detalle del cambio que se ha producido.

Tipos de evento

El conector Debezium MySQL detecta los cambios de datos a nivel de fila, lo que implica que va a transmitir eventos de cambio de operaciones INSERT, UPDATE y DELETE.

Para diferenciar las distintas operaciones, existe un campo op dentro del objeto payload que define el cambio realizado.

"payload": {
  "before": null,
  "after": {
    "field1": "test",
    "field2": 0.1
  },
  "source": {},
  "op": "c",
  "ts_ms": 1618681673399,
  "transaction": null
}

Las posibilidades del campo op son las siguientes:

Por otro lado, el conector Debezium también detecta los cambios de estructura (declaraciones DDL) que se aplican en la base de datos, lo que implica que también produce eventos en operaciones como CREATE TABLE, ALTER TABLE, etc.

{
  "payload": {
     "databaseName": "database1",
     "ddl": "CREATE TABLE table1 ...",
     "tableChanges":[...],
     "source": {}
  }
}

El evento contiene toda la información de la declaración DDL aplicada.

Conociendo esta información, al consumir un evento se sabría con exactitud el cambio que se ha producido en la base de datos, por lo que ya se puede acometer un caso de uso completo.

Caso de uso con Debezium y MySQL

Una vez analizado el funcionamiento principal de Debezium, se está en disposición de definir un caso de uso, a modo de ejemplo, que permita consumir los eventos generados por el conector y ofrecer una visión completa de una solución de CDC.

El caso de uso que se muestra a continuación consiste en replicar la información, en tiempo real, de una base de datos MySQL a una base de datos MongoDB.

Para ello, se implementa una aplicación Spring Boot que consume los eventos de Apache Kafka, actualizando la base de datos MongoDB acorde a los cambios producidos en la base de datos MySQL.

Para configurar el entorno, se va a seguir la manera que se ha visto en apartados anteriores, dando foco a la aplicación consumidora y en la nueva base de datos.

Para iniciar y configurar la nueva base de datos MongoDB, se va a hacer uso de una imagen Docker.

docker run -d \
--name mongo-local \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=admin1 \
mongo

Por tanto, el estado inicial de levantar todos los servicios necesarios es el siguiente:

La única parte que falta para completar un caso de uso de CDC sería integrar la aplicación.

Preparando datos

Antes de integrar la aplicación consumidora en el entorno configurado, y a modo de simplificar la demostración del caso de uso, se va a focalizar la prueba en la entidad products de la base de datos inventory de MySQL que proporciona la imagen utilizada.

Se recuerda el modelo de datos:

Los datos que contiene la entidad products en el momento inicial es:

Al inicializar la base de datos MongoDB, esta se crea vacía. Teniendo en cuenta que se quiere replicar la información, y que el conector Debezium detecta los cambios que se producen cuando empieza a monitorizar, ¿qué pasa con los datos ya existentes en la base de datos MySQL? ¿Se tienen que replicar de forma manual a MongoDB? ¿Qué alternativas hay?

Se verá en profundidad más adelante pero, a modo explicativo, el conector Debezium crea una instantánea de la base de datos que va a monitorizar y permite propagar los datos existentes como modificaciones realizadas. Es decir, los datos iniciales en MySQL serán propagados, en forma de evento de creación, a su topic respectivo para cada una de las tablas monitorizadas.

Por ello, en el momento que la aplicación de replicación se integre con Apache Kafka, consumirá dichos eventos y la base de datos MongoDB quedará perfectamente replicada.

Una vez conocida la entidad de prueba y los datos iniciales, se va a mostrar una forma de explotar los eventos para crear el caso de uso.

Replicando datos

Existen multitud de herramientas que permiten consumir eventos y/o interactuar con una base de datos MongoDB. El caso de uso que se va a implementar usa una aplicación Spring Boot, tecnología muy extendida a nivel productivo, que nos ofrece una gran capacidad de integración con la tecnología Apache Kafka y con MongoDB de forma sencilla y eficiente.

Lo primero que hay que implementar es un consumidor que sea capaz de consumir los eventos de Apache Kafka que el conector Debezium produce respecto a la entidad products.

Para ello, Spring Boot proporciona la librería, spring-kafka, que permite realizar la integración con Apache Kafka.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Esta librería permite crear un consumidor que escuche en un determinado topic para poder explotar la información deseada.

@KafkaListener(
 topics = {"${kafka.topic.product}"},
 groupId = "${kafka.group}",
 containerFactory = "kafkaListenerContainerFactory"
)

De esta forma, cuando la aplicación se inicie se conectará al topic definido y empezará a consumir los mensajes de cambio que lleguen.

El siguiente paso sería procesar el evento de la forma que se necesite e interactuar con MongoDB. Para ello, Spring Boot proporciona una librería, Spring Data, que facilita la integración con esta base de datos.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

Esta librería permite integrarse con la base de datos MongoDB a partir de CrudRepository de Spring Data, una librería con una serie de operaciones para facilitar el uso de la base de datos.

public interface ProductRepository extends CrudRepository<ProductEntity, Integer>{}

@Document("products")
@Getter
@Setter
@AllArgsConstructor
@Builder
public class ProductEntity{
 @Id
 private String _id;
 @Field("name")
 private String name;
 @Field("description")
 private String description;
 @Field("weight")
 private double weight;
}

En este caso, se consumirá los eventos del topic asociado a la entidad products de MySQL y realizará distintas operaciones dependiendo del tipo de operación que informe el evento.

Una vez la aplicación esté en funcionamiento, ya se tendría el caso de uso implementado.

Probando la solución

Para probar la solución implementada, se van a realizar cambios en la base de datos origen, de forma que se pueda validar que la réplica en tiempo real entre MySQL y MongoDB se ha realizado correctamente.

Se va a verificar que ocurre al actualizar una fila de la entidad products:

UPDATE products SET name='scooter REPLICA' where id=101;

Próximos pasos

Os invitamos a realizar vuestras propias pruebas para indagar mucho más en este tipo de tecnologías.

El código de la aplicación, que consume los eventos y replica la información a MongoDB, se puede ver en Github.

Para facilitar la construcción de la solución, se puede iniciar todo el entorno (app incluida) con el siguiente script.

Lecciones útiles aprendidas

¿Qué ocurre al iniciar el conector?

A la hora de iniciarse un conector Debezium, se genera una gran cantidad de registros en el servicio de Kafka Connect, en el que se muestra todo el proceso desde que el conector se crea hasta que empieza a leer del binlog del servidor MySQL (información de la configuración del binlog, del modelo de la base de datos, etc.)

Por defecto, una vez que el conector se inicia, toma una instantánea inicial del estado actual de la base de datos, que es de la que va a partir para detectar los cambios ocurridos. Normalmente, esto sucede cuando la base de datos se ha estado ejecutando durante algún tiempo y ha descartado los registros de transacciones que ya no son necesarios para la recuperación o replicación de transacciones.

Durante el proceso de arranque, también es capaz de detectar el modelo de la base de datos y crear los topic necesarios en Apache Kafka para poder enviar los eventos de cambio sobre las entidades.

¿Qué ocurre cuando se para o reinicia el servicio de Kafka Connect?

El conector Debezium puede capturar eventos de cambio incluso cuando no se está ejecutando. Si el servicio Kafka Connect se para, al reiniciarse producirá los eventos de las modificaciones que se han realizado en la base de datos mientras estaba parado.

Esto es debido a que el servicio Kafka Connect gestiona automáticamente las tareas de sus conectores registrados. Por tanto, si se desconecta, al reiniciarse iniciará cualquier tarea que no esté en ejecución.

Referencias

Cuéntanos qué te parece.

Enviar.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.