Event Driven para microservicios con Spring Cloud Stream

En las arquitecturas distribuidas basadas en microservicios, que implementen el framework de Spring, existe un proyecto llamado Spring Cloud Stream, que resulta muy útil cuando, por necesidad técnica y/o funcional, necesitamos consumir o producir algún tipo de mensaje.

Lo que más destaca es la sencillez en el momento de la configuración con el middleware deseado, la implementación del código y esa capa de abstracción que, con muy pocos cambios, nos permite modificar el sistema de mensajería teniendo un mínimo impacto en la aplicación.

Esto nos puede ser de ayuda cuando queremos consumir o producir mensajes, en arquitecturas que implementen patrones como Event-driven, Saga, Publish/Subscriber. Y sobre eso es de lo que vamos a hablar hoy en el blog.

Spring Cloud Stream nace de la unión de dos componentes importantes dentro del stack de Spring:

  • Spring Integration, el cual simplifica la complejidad de interacción con los componentes e implementa la mayoría de patrones de integración empresarial.
  • Y Spring Boot, que sirve para crear aplicaciones de manera rápida y sencilla.

Spring Cloud Stream ha ido añadiendo poco a poco componentes para la fácil integración con los principales proveedores de mensajería como RabbitMq, Apache Kafka, Amazon Kinesis, y otros.

Configuración

El primer paso que debemos dar para empezar la configuración es entrar en la interfaz a través de este link y generar un proyecto con las dependencias necesarias para nuestra necesidad.

Una vez dentro, debemos agregar los siguientes Starters* o dependencias:

  • Web: starter que agrupan las librerías orientadas a la comunicación web como el servidor tomcat, Spring mvc o Jackson para json.
  • Actuator: starter para supervisar la aplicación, métricas, tráfico, base de datos, etc.
  • RabbitMQ: librería para la conexión y envío de mensajes a RabbitMq.
  • Cloud Stream: starter que añade una capa de abstracción y cambiar de broker si es necesario sin cambiar nada de la implementación, solo configuración.
  • Lombok: librería que simplifica mucho código repetitivo en Java, etc con simples anotaciones.

* Starter: es un conjunto de librerías agrupadas en una sola dependencia y permite que se gestione centralizadamente todo lo necesario para iniciar lo requerido.

Si tienes Docker instalado, puedes ejecutar el comando “docker-compose up” con el siguiente yml y poder iniciar RabbitMQ en docker:

Si quieres tener todo el código, puedes obtenerlo en este repositorio.

Conceptos

Vemos a continuación los conceptos necesarios para entender cualquier implementación con Spring Cloud Stream:

Destination Binders

Son componentes internos de Spring Cloud Stream que realizan la configuración e integración necesaria con el sistema de mensajería elegida.

Estos componentes son responsables de la conectividad, delegación, enrutamiento, conversión del mensaje así como la invocación al código del usuario, ya sea de entrada, procesamiento o salida del mensaje.

Destination Binding

Básicamente es el encargado de identificar los tipos de Channel (Input | Output) de forma declarativa. Con esto realiza la tarea de “bridge” entre el sistema de mensajería elegido y la aplicación.

Para habilitar la configuración y definir nuestro servicio necesitamos la anotación @EnableBinding, que no es nada más que un @Configuration con metadata adicional, seguido de la o las clases que son de tipo Destination Binding.

Existen 3 interfaces que vienen predefinidas por Spring Cloud Stream y cada una cumple una función específica:

  • Sink: se utiliza para marcar el servicio que recibe mensaje por el Channel de entrada (input).
  • Source: se utiliza para enviar mensaje por el Channel de salida (output).
  • Processor: esta interfaz tiene su peculiaridad, ya que se utiliza cuando necesitamos marcar un servicio que soporte un Channel de entrada, realice algún tratamiento (si así lo requiere) y lo envíe por un Channel de salida.

Message

Es el conjunto de una estructura datos que va a viajar para que estos datos puedan comunicarse. El mensaje es un objeto wrapper compuesto por un header y un payload.

Channel

Es el medio por el cual viajan mensajes conteniendo información entre un Productor a un Consumidor. Puedes encontrar más información en esta documentación adicional.

Configuración de la Aplicación

Una vez comprendidos los conceptos anteriores, vamos a configurar la aplicación para poder iniciar nuestra aplicación Spring Boot.

1. Crear el aplication.yml

Podemos ver que se especifica los “bindings” con nombre “processorInput”, “processorOutput”, “sinkInput” y “sourceOuput” y también se especifica el “binder” con nombre “rabbit_local_paymentcard”.

2. Crear las interfaces

Se procede a crear la interfaz dependiendo del tipo que se desea, en este caso vamos a crear BasicPaymentProcessor.

3. Crear la clase config

Se crea la clase config con el nombre AppConfig.java, posterior a nivel de la clase anotamos @EnableBinding con los bindings deseados que se utilizarán en la aplicación.

4. Creación del handler

Creamos el servicio, que será de tipo Processor con el nombre BasicEventPaymentCardHandler.java, que agregará un interés al monto y posteriormente ejecutará el evento de salida.

5. Iniciar la aplicación

Llega el momento de iniciar la aplicación. Por default no se necesita crear previamente las colas, ya que si se realiza correctamente el yaml Spring Cloud Stream lo crea él mismo.

  • Se inicializa el servidor tomcat en el puerto 8080:
  • Se expone los endpoints de actuator:
  • Se crean los channel a utilizar:
  • Crea la conexión a RabbitMq:
  • Se crea un suscriptor detallado en el código y un consumidor:
  • Se inicia la aplicación:

Implementación

En estos momentos, Spring Cloud Stream está soportando las siguientes plataformas:

Para nuestro ejemplo, vamos a utilizar RabbitMQ. Enviaremos un mensaje en formato JSON, ya que el content-type por defecto es Json en las nuevas versiones.

  • Creación de la entidad

En nuestra aplicación vamos a tener un model que se llame “PaymentCard” y tenga las siguientes propiedades: id, description y mount.

  • Envío y recepción del mensaje
  • Nos metemos la consola de RabbitMq.
  • Vamos a la pestaña de Queues.
  • Accedemos a la cola de nombre queue.paymentcard.input.processorConsumers
  • En el apartado Publish message añadimos el siguiente JSON para que nuestra aplicación lo consuma:
{
  "id" : 1,
  "description" : "Pago de ordenador",
  "mount" : 600.50
}

Como se puede apreciar, se consumió el mensaje, pintó el log y nuevamente lo envió a un channel de salida.

Gestión de errores

Siempre existirán errores. Pero para solucionar los máximos posibles, Spring Cloud Stream, por defecto, utiliza una librería Spring Retry con el valor 3.

Eso quiere decir que realiza el re-queue 3 veces. Spring Cloud Stream diferencia dos tipos de errores:

  • Errores de Aplicación: tú controlas el error y lo gestionas con algún flujo de error dentro de la aplicación.
  • Errores de Sistema: es cuando gestiona el Binder (Rabbitmq, Kafa, etc..) mediante Dlq’s, re-queue, otros.

Integración

Es de fácil integración con otros componentes Spring, nombro algunos para que los puedas tener el cuenta:

1. Trazabilidad distribuida

La trazabilidad es algo muy importante en nuestros sistemas actuales. Al ser un proyecto que está dentro de stack de Spring Cloud, puede interactuar con la mayoría de proyectos.

Spring Cloud Sleuth es el componente que realiza la trazabilidad para el ecosistema Spring Cloud, al agregar dicho componente en un futuro proyecto nos va a poder brindar de todas sus funcionalidades de trazabilidad.

Si quieres saber más sobre trazabilidad, echa un vistazo a este post de nuestro blog en el que hablamos detalladamente de la implementación de Sleuth con Stream.

2. Actuator

Agregando el componente “Actuator” podemos visualizar diferentes métricas asociadas, como por ejemplo el estado de los bindings, etc., solo agregando la dependencia.

Desde la versión 2.0, Spring Cloud Stream, reduce el acoplamiento con los starters Web como Actuator integrandolos como opcionales

3. Indicador de Health

El indicador health para los bindings está siempre habilitado y se expone bajo el recurso de actuator /actuator/health, si queremos deshabilitar debemos detallar explícitamente con el valor a false.

Para que nos muestre información detallada de los binders debemos agregar show-details: ALWAYS.

No debemos olvidar que para habilitar el health son requisitos obligatorios agregar el starter Web y Actuator de spring.

Podemos probarlo ingresando en http://localhost:8080/actuator/

Métricas

De la misma forma que soporta la integración con las anteriores librerías y/o componentes, también proporciona la administración de las métricas para los bindings interactuando con la fachada de Micrometer que implementa por debajo a los principales sistemas de monitorización como Prometheus, Influx, etc.

Testing

De la misma forma que Spring Cloud Stream da soporte a diferentes binders, gestión de errores o la integración con otros componentes y/o librerías, también da soporte a los test para la aplicación creada sin conectarse a ningún sistema de mensajería.

Se debería agregar una dependencia adicional.

Comentaremos el test que hemos realizado de ejemplo.

  • Dejar un mensaje en el Channel de entrada (Input)
  • Colector de mensajes

Agregamos un Message Collector, que se utiliza para almacenar el mensaje de salida cuando el Basic Payment Processor ejecute el evento de salida:

  • Obtención y comparación del resultado

Para obtener el payload del evento de salida solo necesitamos apoyarnos en la variable messageQueue.

Necesitamos convertir la respuesta que es de tipo Object a Payment Card para lo cual realizamos una función que realizará la conversión de String a Payment Card con Jackson.

Como paso final voy a comparar los datos de respuesta y verificar si el monto ha sido procesado correctamente.

Y sí, procesó correctamente el evento y respondió los datos esperados.

Conclusiones

Con este artículo se pretende dar una breve introducción a los principales conceptos y cómo es la estructura actual de este componente de Spring.

Adicionalmente quisiera hacer hincapié en la fácil implementación, utilicé RabbitMq porque es la mensajería con la cual había trabajado anteriormente, pero disponemos de Kafka o Kinesis de Amazon.

También quiero resaltar la ventaja de utilizarlo ya que al ser una capa de abstracción hacia algún sistema de mensajería el impacto de cambiar a otro sistema es relativamente pequeño, es de bajo acoplamiento.

Referencias

Foto de djordan

Técnico Informático enfocado en las últimas tecnologías. Actualmente trabajo en Paradigma como desarrollador de software backend. Principalmente estoy interesado en Big Data, API's, Agile, Cloud, Seguridad, NoSql, buenas practicas y en compartir las tecnologías que puedo aprender. Compagino el trabajo con mi familia, lectura y natación.

Ver toda la actividad de Junior Daniel Jordan

Escribe un comentario