Las empresas están adaptando su cultura empresarial y el desarrollo de aplicaciones a un enfoque data-driven, cuyo pilar básico es la gestión y explotación del dato en tiempo real.

En este contexto, las arquitecturas software responden a esta demanda con el uso de sistemas distribuidos, desacoplados y altamente conectados en los que Apache Kafka es una pieza fundamental.

Este desacoplamiento llega a varios niveles y no solo se centra en el ámbito tecnológico, sino que también afecta al timing de los proyectos y a su desarrollo. Las aplicaciones distribuidas suelen ser implementadas por diferentes equipos y en ocasiones en distinto momento:

El primer caso tiene una cronología más natural, pero en el segundo es necesario realizar una gestión que implique un desarrollo contract-first para que la construcción de la aplicación consumidora y la que genera la información estén alineadas. Este enfoque también es recomendable en el primer caso, pero para un desarrollo en paralelo es fundamental.

¿Qué entendemos por contract-first? Es un enfoque práctico en el cual los usuarios de negocio y desarrolladores se ponen de acuerdo en la estructura de información que se maneja en un servicio antes de implementarla.

¿Por qué es necesario un desarrollo contract-first? Porque permite trabajar de manera independiente, asegura la consistencia de los datos expuestos a través de los servicios, permite a distintos equipos colaborar y evolucionar el esquema de una forma controlada y, además, gracias al ecosistema de herramientas en torno a las especificaciones de contrato se puede ahorrar tiempo en el desarrollo posterior del servicio.

En el mundo de las arquitecturas orientadas a eventos ya se explicó en un post anterior la necesidad del uso de esquemas y porque resulta crítico un componente eficaz para su gobernanza como es el Schema Registry de Kafka. Además, se profundizó mediante un ejemplo cómo se puede evolucionar y versionar dichos esquemas a lo largo del tiempo para mantener la compatibilidad.

En este caso, daremos un paso más utilizando el esquema definido como pieza central para:

  1. Establecer la documentación que permita un gobierno del evento de Kafka mediante AsyncAPI.
  2. Mockear una aplicación productora de eventos siguiendo el esquema definido en AsyncAPI mediante Microcks.

AsyncAPI

¿Qué es?

AsyncAPI es una iniciativa open source cuyo propósito es ofrecer una especificación para la definición de APIs asíncronas que permita un mejor gobierno de las arquitecturas orientadas a eventos (EDA). Fue creada para describir las APIs independientemente del protocolo que utilicen: MQTT, AMQP, Kafka, Websockets, etc.

Tiene como objetivo final trabajar con las arquitecturas EDA de una forma tan sencilla como con APIs Rest. De hecho, esta especificación para el mundo asíncrono es análoga a OpenAPI en el mundo síncrono. Ambas iniciativas se basan en los mismos principios:

La estructura de AsyncAPI está conformada por las siguientes secciones:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 1

Las diferencias fundamentales entre AsyncAPI y OpenAPI a nivel de especificación son las siguientes:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 2

¿Cómo lo utilizamos en nuestro ejemplo?

El objetivo de este primer paso es definir nuestro topic de Kafka con la estructura de datos que se va a enviar en él de una forma estandarizada con AsyncAPI.

Para hacerlo, en nuestro ejemplo, vamos a definir un evento de Kafka muy sencillo que represente los cambios de estado de un pedido siguiendo estos pasos:

  1. Definir la estructura del evento

En este caso, al igual que hicimos en el post anterior, vamos a definir la estructura de la información en Avro, aunque también sería posible hacerlo en Json y protobuf. El esquema de datos del evento sería el siguiente:

{
  "type": "record",
  "name": "order_status",
  "doc": "Order status event.",
  "fields": [
    {
      "name": "id",
      "type": "string",
      "doc": "Status order unique identifier"
    },
    {
      "name": "status",
      "type": "enum",
      "symbols": [
        "RECIBIDO",
        "ENVIADO",
        "ENTREGADO",
        "DEVUELTO"
      ],
      "doc": "Order status"
    },
    {
      "name": "description",
      "type": "string",
      "doc": "Additional information about a change in order"
    },
    {
      "name": "orderId",
      "type": "string",
      "doc": "Order unique identifier on which a status change has occurred"
    },
    {
      "name": "updateDate",
      "type": "string",
      "doc": "Date on which there has been a change in the order status"
    }
  ]
}
}
  1. Generar especificación AsyncAPI

Para documentar topics de Kafka a través de AsyncAPI hay que tener en cuenta la terminología, es decir, cómo se relacionan los conceptos de Kafka con la estructura de la especificación:

Es decir, vamos a describir un topic que representa el estado del pedido a través de un channel de AsyncAPI, por el cual se envía un message con la estructura de datos definida en Avro. Para nuestro ejemplo tendríamos los siguientes datos:

Teniendo en cuenta esto, nuestra especificación AsyncAPI de ejemplo sería el siguiente:

asyncapi: 2.0.0
info:
  title: Order Status Service
  version: '1.0.0'
  description: |
    Manages changes in order status.
  license:
    name: Apache 2.0
    url: https://www.apache.org/licenses/LICENSE-2.0
  contact:
    name: Noelia Martin
    url: https://www.paradigmadigital.com/
    email: noelia@paradigmadigital.com

servers:
  production:
    url: localhost:9092
    protocol: kafka
    protocolVersion: '1.0.0'
    description: I run a production server on my laptop

channels:
  orders/status_topic:
    subscribe:
      operationId: orderStatusEvent
      summary: Message with the details about a change in order status
      tags:
        - name: orders
        - name: status
      description: Informs about a change in order status
      message:
        $ref : '#/components/messages/OrderStatus'

components:
  messages:
    OrderStatus:
      name: orderStatus
      title: Order status event
      contentType: avro/binary
      schemaFormat: 'application/vnd.apache.avro+yaml;version=1.9.0'
      payload:
        type: record
        doc: User information
        fields:
          - name: id
            type: string
            doc: Status order unique identifier
          - name: status
            type: enum
            symbols : ["RECIBIDO", "ENVIADO", "ENTREGADO", "DEVUELTO"]
            doc: Order status.
          - name: description
            type: string
            doc: Additional information about a change in order
          - name: orderId
            type: string
            doc: Order unique identifier on which a status change has occurred
          - name: updateDate
            type: string
            doc: Date on which there has been a change in the order status
      examples:
        - Recibido:
          summary: Example for a created order
          payload:
            id: '{{randomString(32)}}'
            status: RECIBIDO
            descripcion: El pedido ha sido recibido
            orderId: '{{randomString(32)}}'
            updateDate: '{{now()}}'
       - Enviado:
          summary: Example for a sent order
          payload:
            id: '{{randomString(32)}}'
            status: ENVIADO
            descripcion: El pedido ha sido enviado
            orderId: '{{randomString(32)}}'
            updateDate: '{{now()}}'
       - Entregado:
          summary: Example for a delivered order
          payload:
            id: '{{randomString(32)}}'
            status: ENTREGADO
            descripcion: El pedido ha sido entregado
            orderId: '{{randomString(32)}}'
            updateDate: '{{now()}}'

Microcks

¿Qué es?

Microcks es una herramienta open source para el mocking y testing de APIs basada en kubernetes y que permite la virtualización de servicios a partir de un contrato API.

Sus características fundamentales son:

Se diferencia de otro tipo de herramientas de testing y mocking en lo siguiente:

1. Microcks vs SoapUI:

2. Microcks vs Postman:

La arquitectura interna de Microcks esta formada por los siguientes elementos:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 3

Y en el mundo asíncrono, ¿qué aporta esta herramienta? Microcks es capaz de generar eventos simulados con una frecuencia determinada y que sigan el esquema establecido en el contrato AsyncAPI sin escribir una línea de código. De esta forma, los consumidores pueden empezar a trabajar en su aplicación procesando eventos con la estructura establecida sin que la aplicación que genera los datos esté implementada.

¿Cómo lo utilizamos en nuestro ejemplo?

El objetivo de este paso es mockear nuestro topic de Kafka siguiendo la estructura de datos definida con el contrato AsyncAPI.

Para hacerlo, en nuestro ejemplo, utilizaremos el contrato AsyncAPI generado en el paso previo para que Microcks produzca eventos mockeados siguiendo estos pasos:

  1. Instalar Microcks

Microcks está completamente contenerizado y permite su despliegue en cualquier plataforma tanto on-premise como cloud de una forma flexible y sencilla.

Por defecto, ofrece diferentes opciones para su instalación, en nuestro caso se opta por usar la versión completamente contenida de Microcks a través de docker compose.

Para su instalación simplemente hay que ejecutar estos comandos desde consola:

$ git clone https://github.com/microcks/microcks.git
$ cd microcks/install/docker-compose
$ docker-compose -f docker-compose.yml -f docker-compose-async-addon.yml up -d

Tras unos minutos la instalación termina y se pueden ver todos los componentes de Microcks explicados anteriormente ejecutándose:

$ docker ps

Un ejemplo de la salida del comando anterior sería:

CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
04bf627052ce quay.io/microcks/microcks-async-minion:latest "/deployments/run-ja…" 2 minutes ago Up 5 seconds 8080/tcp, 0.0.0.0:8081->8081/tcp microcks-async-minion
df732fbb532a obsidiandynamics/kafdrop "/kafdrop.sh" 2 minutes ago Up About a minute 0.0.0.0:9000->9000/tcp docker-compose-kafdrop-1
ceb03b19d359 quay.io/microcks/microcks:latest "/deployments/run-ja…" 2 minutes ago Up About a minute 0.0.0.0:8080->8080/tcp, 8778/tcp, 0.0.0.0:9090->9090/tcp, 9779/tcp microcks
38454e402512 strimzi/kafka:0.17.0-kafka-2.4.0 "sh -c 'bin/kafka-se…" 2 minutes ago Up About a minute 0.0.0.0:9092->9092/tcp, 0.0.0.0:19092->19092/tcp microcks-kafka
4d72d33beeb3 jboss/keycloak:14.0.0 "/opt/jboss/tools/do…" 2 minutes ago Up 2 minutes 8443/tcp, 0.0.0.0:18080->8080/tcp microcks-sso
b2a958d4f330 mongo:3.4.23 "docker-entrypoint.s…" 2 minutes ago Up 2 minutes 27017/tcp microcks-db
e8d4ddecd4c7 strimzi/kafka:0.17.0-kafka-2.4.0 "sh -c 'bin/zookeepe…" 2 minutes ago Up 2 minutes 0.0.0.0:2181->2181/tcp microcks-zookeeper
2a51080045f7 quay.io/microcks/microcks-postman-runtime:latest "docker-entrypoint.s…" 2 minutes ago Up 2 minutes 3000/tcp microcks-postman-runtime

Y se puede acceder a la consola web en: http://localhost:8080, tras loguearnos con los credenciales por defecto: admin/microcks123 viendo lo siguiente:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 4
  1. Importar especificación AsyncAPI

Desde la sección importers podemos cargar nuestro ejemplo AsyncAPI generado en el paso anterior:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 5

Tras lo cual aparece nuestra API cargada en la sección API | Services con los detalles de la misma:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 6
  1. Generar mocks

Entrando en los detalles de la API veremos:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 7

Para comprobar cómo se publican esos mensajes de ejemplo a Kafka simplemente tenemos que entrar a ver los logs del contendor de microcks-async-minion de la siguiente forma:

$ docker logs 04bf627052ce

Un ejemplo de la salida del comando anterior sería:

2021-10-20 09:34:37,288 INFO [io.git.mic.min.asy.pro.ProducerManager] (QuarkusQuartzScheduler_Worker-19) Producing async mock messages for frequency: 3
2021-10-20 09:34:37,289 INFO [io.git.mic.min.asy.pro.KafkaProducerManager] (QuarkusQuartzScheduler_Worker-19) Publishing on topic {OrderStatusService-1.0.0-orders-status_topic}, message: {"id":"Y0sfrsgdG5X60hF10t6zS5TzWTKES6cS","status":"RECIBIDO","descripcion":"El pedido ha sido recibido","orderId":"5iVhlQGd4mTqz1e6GV2w3YjmEDahNPPB","updateDate":"1634722477289"}
2021-10-20 09:34:37,292 INFO [io.git.mic.min.asy.pro.KafkaProducerManager] (QuarkusQuartzScheduler_Worker-19) Publishing on topic {OrderStatusService-1.0.0-orders-status_topic}, message: {"id":"lynCpCApif3maBLcLpJalwWixlS0Kyqr","status":"ENVIADO","descripcion":"El pedido ha sido enviado","orderId":"LxP69nN3dNu3m0LZZUTuGjVzqMybcmj7","updateDate":"1634722477292"}
2021-10-20 09:34:37,295 INFO [io.git.mic.min.asy.pro.KafkaProducerManager] (QuarkusQuartzScheduler_Worker-19) Publishing on topic {OrderStatusService-1.0.0-orders-status_topic}, message: {"id":"No8qfLSI8FEVFWs3HAzL5kQStMwbV91R","status":"ENTREGADO","descripcion":"El pedido ha sido entregado","orderId":"Ei3ggcWC1N29vJAKyg3jtMOBEqbR7yRn","updateDate":"1634722477295"}``

En nuestro ejemplo, por simplicidad, hemos usado el propio broker de kafka contenido en Microcks de forma que podemos comprobar cómo llegan los mensajes con la estructura que necesitamos consumiéndolos desde el propio contenedor microcks-kakfa con los siguientes comandos:

$ docker exec -it 38454e402512 /bin/sh
sh-4.2$ cd bin/
sh-4.2$ ./kafka-topics.sh --bootstrap-server kafka:19092 --list
OrderStatusService-1.0.0-orders-status_topic
__consumer_offsets
microcks-services-updates
sh-4.2$ ./kafka-console-consumer.sh --bootstrap-server kafka:19092 --topic OrderStatusService-1.0.0-orders-status_topic
{"id":"kUvwSHcQAOqbKYO2Vc52TnOtJcGmDB1u","status":"RECIBIDO","descripcion":"El pedido ha sido recibido","orderId":"aap1Axg4pEfKI85NnCdwS4Io6FFkMiFW","updateDate":"1634723386289"}
{"id":"k7ymZtjW0j5YWcfWV9WJt0oF4bAhzDx1","status":"ENVIADO","descripcion":"El pedido ha sido enviado","orderId":"z9wDnkaeENxiufqZnPBUxMnvYhtU7AmX","updateDate":"1634723386293"}
{"id":"OLpMElMWhX8xtVfqv5ojX9RukFqfEjX7","status":"ENTREGADO","descripcion":"El pedido ha sido entregado","orderId":"jlHqt1nlxQhhQhYnJT520jYIg9maXNsJ","updateDate":"1634723386295"}
{"id":"ajjPNe2rRoNwqMbaBYpK0CavJ8Et5yRI","status":"RECIBIDO","descripcion":"El pedido ha sido recibido","orderId":"hNpScVZ8mO4q94sfaYdfZtoHjATGpFZU","updateDate":"1634723389289"}
{"id":"xcHvLltHNCKBcQoSrXAhLoCsnvVG3LSH","status":"ENVIADO","descripcion":"El pedido ha sido enviado","orderId":"XHOTz17nehW55lUMVZnruaKfnhIINjM7","updateDate":"1634723389295"}
{"id":"r8SeDdXwPa3SbjWZjgptIoPQgX8Kh3z4","status":"ENTREGADO","descripcion":"El pedido ha sido entregado","orderId":"q9y5Wry1b8vmwh95ieBXKkS3Lzsd6oYx","updateDate":"1634723389299"}
^CProcessed a total of 6 messages

Si se quieren deshabilitar o modificar la frecuencia de los mocks simplemente hay que editar las propiedades de la API desde la consola:

Cómo mockear eventos en Kafka con AsyncAPI y Microcks 8
  1. Eliminar entorno de pruebas

Para eliminar todos los componentes generados en el entorno de pruebas simplemente es necesario ejecutar el siguiente comando:

$ docker-compose -f docker-compose.yml -f docker-compose-async-addon.yml down

Conclusiones

El desarrollo actual de aplicaciones ha puesto las arquitecturas basadas en eventos en el centro de atención y Kafka es el corazón de dichas arquitecturas convirtiéndose en un componente prácticamente omnipresente.

Pero Kafka delega la responsabilidad del procesamiento y validación de las estructuras de datos a los desarrolladores para mantener su alto rendimiento y escalabilidad. Compartir esta información en equipos pequeños y proyectos acotados puede hacerse de forma sencilla, pero si el alcance aumenta a un mayor número de usuarios y a un proyecto con una mayor envergadura puede convertirse en una pesadilla. Como respuesta a este desafío surge AsyncAPI.

Utilizar la especificación AsyncAPI permite documentar los sistemas basados en eventos manteniendo la coherencia, la eficiencia y la gobernanza de los componentes. Esta especificación está en proceso de convertirse en el estándar de mercado para la definición de contratos de servicios asíncronos y las herramientas de API Management empiezan a incorporarlo a su stack, como por ejemplo Mulesoft.

Además, AsyncAPI ofrece un ecosistema de herramientas que permiten acelerar el desarrollo, ayudando con algunas tareas tediosas como generadores de código, documentación y test. Entre estas herramientas destaca Microcks.

Una de las mayores ventajas del enfoque contract-first es utilizar los contratos definidos para poder paralelizar el desarrollo de las aplicaciones consumidoras y generadoras de datos. Microcks nos ayuda con esta tarea al ofrecer una plataforma que aprovecha la especificación AsyncAPI y mediante los ejemplos definidos en ella publica eventos que siguen la estructura definida. Esto permite no solo el desarrollo en paralelo, sino también hacer pruebas de contrato eficientes.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.