En las arquitecturas distribuidas basadas en microservicios, que implementen el framework de Spring, existe un proyecto llamado Spring Cloud Stream, que resulta muy útil cuando, por necesidad técnica y/o funcional, necesitamos consumir o producir algún tipo de mensaje.

Lo que más destaca es la sencillez en el momento de la configuración con el middleware deseado, la implementación del código y esa capa de abstracción que, con muy pocos cambios, nos permite modificar el sistema de mensajería teniendo un mínimo impacto en la aplicación.

Esto nos puede ser de ayuda cuando queremos consumir o producir mensajes, en arquitecturas que implementen patrones como Event-driven, Saga, Publish/Subscriber. Y sobre eso es de lo que vamos a hablar hoy en el blog.

Spring Cloud Stream nace de la unión de dos componentes importantes dentro del stack de Spring:

Spring Cloud Stream ha ido añadiendo poco a poco componentes para la fácil integración con los principales proveedores de mensajería como RabbitMq, Apache Kafka, Amazon Kinesis, y otros.

Configuración

El primer paso que debemos dar para empezar la configuración es entrar en la interfaz a través de este link y generar un proyecto con las dependencias necesarias para nuestra necesidad.

Una vez dentro, debemos agregar los siguientes Starters* o dependencias:

* Starter: es un conjunto de librerías agrupadas en una sola dependencia y permite que se gestione centralizadamente todo lo necesario para iniciar lo requerido.

Si tienes Docker instalado, puedes ejecutar el comando “docker-compose up” con el siguiente yml y poder iniciar RabbitMQ en docker:

Si quieres tener todo el código, puedes obtenerlo en este repositorio.

Conceptos

Vemos a continuación los conceptos necesarios para entender cualquier implementación con Spring Cloud Stream:

Destination Binders

Son componentes internos de Spring Cloud Stream que realizan la configuración e integración necesaria con el sistema de mensajería elegida.

Estos componentes son responsables de la conectividad, delegación, enrutamiento, conversión del mensaje así como la invocación al código del usuario, ya sea de entrada, procesamiento o salida del mensaje.

Destination Binding

Básicamente es el encargado de identificar los tipos de Channel (Input | Output) de forma declarativa. Con esto realiza la tarea de “bridge” entre el sistema de mensajería elegido y la aplicación.

Para habilitar la configuración y definir nuestro servicio necesitamos la anotación @EnableBinding*,* que no es nada más que un @Configuration con metadata adicional, seguido de la o las clases que son de tipo Destination Binding.

Existen 3 interfaces que vienen predefinidas por Spring Cloud Stream y cada una cumple una función específica:

Message

Es el conjunto de una estructura datos que va a viajar para que estos datos puedan comunicarse. El mensaje es un objeto wrapper compuesto por un header y un payload.

Channel

Es el medio por el cual viajan mensajes conteniendo información entre un Productor a un Consumidor. Puedes encontrar más información en esta documentación adicional.

Configuración de la Aplicación

Una vez comprendidos los conceptos anteriores, vamos a configurar la aplicación para poder iniciar nuestra aplicación Spring Boot.

1 Crear el aplication.yml

Podemos ver que se especifica los “bindings” con nombre “processorInput”, “processorOutput”, “sinkInput” y “sourceOuput” y también se especifica el “binder” con nombre “rabbit_local_paymentcard”.

2 Crear las interfaces

Se procede a crear la interfaz dependiendo del tipo que se desea, en este caso vamos a crear BasicPaymentProcessor.

3 Crear la clase config

Se crea la clase config con el nombre AppConfig.java, posterior a nivel de la clase anotamos @EnableBinding con los bindings deseados que se utilizarán en la aplicación.

4 Creación del handler

Creamos el servicio, que será de tipo Processor con el nombre BasicEventPaymentCardHandler.java, que agregará un interés al monto y posteriormente ejecutará el evento de salida.

5 Iniciar la aplicación

Llega el momento de iniciar la aplicación. Por default no se necesita crear previamente las colas, ya que si se realiza correctamente el yaml Spring Cloud Stream lo crea él mismo.

Implementación

En estos momentos, Spring Cloud Stream está soportando las siguientes plataformas:

Para nuestro ejemplo, vamos a utilizar RabbitMQ. Enviaremos un mensaje en formato JSON, ya que el content-type por defecto es Json en las nuevas versiones.

En nuestra aplicación vamos a tener un model que se llame “PaymentCard” y tenga las siguientes propiedades: id, description y mount.

{
  "id" : 1,
  "description" : "Pago de ordenador",
  "mount" : 600.50
}

Como se puede apreciar, se consumió el mensaje, pintó el log y nuevamente lo envió a un channel de salida.

Gestión de errores

Siempre existirán errores. Pero para solucionar los máximos posibles, Spring Cloud Stream, por defecto, utiliza una librería Spring Retry con el valor 3.

Eso quiere decir que realiza el re-queue 3 veces. Spring Cloud Stream diferencia dos tipos de errores:

Integración

Es de fácil integración con otros componentes Spring, nombro algunos para que los puedas tener el cuenta:

1 Trazabilidad distribuida

La trazabilidad es algo muy importante en nuestros sistemas actuales. Al ser un proyecto que está dentro de stack de Spring Cloud, puede interactuar con la mayoría de proyectos.

Spring Cloud Sleuth es el componente que realiza la trazabilidad para el ecosistema Spring Cloud, al agregar dicho componente en un futuro proyecto nos va a poder brindar de todas sus funcionalidades de trazabilidad.

Si quieres saber más sobre trazabilidad, echa un vistazo a este post de nuestro blog en el que hablamos detalladamente de la implementación de Sleuth con Stream.

2 Actuator

Agregando el componente “Actuator” podemos visualizar diferentes métricas asociadas, como por ejemplo el estado de los bindings, etc., solo agregando la dependencia.

Desde la versión 2.0, Spring Cloud Stream, reduce el acoplamiento con los starters Web como Actuator integrandolos como opcionales

3 Indicador de Health

El indicador health para los bindings está siempre habilitado y se expone bajo el recurso de actuator /actuator/health, si queremos deshabilitar debemos detallar explícitamente con el valor a false.

Para que nos muestre información detallada de los binders debemos agregar show-details: ALWAYS.

No debemos olvidar que para habilitar el health son requisitos obligatorios agregar el starter Web y Actuator de spring.

Podemos probarlo ingresando en http://localhost:8080/actuator/

Métricas

De la misma forma que soporta la integración con las anteriores librerías y/o componentes, también proporciona la administración de las métricas para los bindings interactuando con la fachada de Micrometer que implementa por debajo a los principales sistemas de monitorización como Prometheus, Influx, etc.

Testing

De la misma forma que Spring Cloud Stream da soporte a diferentes binders, gestión de errores o la integración con otros componentes y/o librerías, también da soporte a los test para la aplicación creada sin conectarse a ningún sistema de mensajería.

Se debería agregar una dependencia adicional.

Comentaremos el test que hemos realizado de ejemplo.

Agregamos un Message Collector, que se utiliza para almacenar el mensaje de salida cuando el Basic Payment Processor ejecute el evento de salida:

Para obtener el payload del evento de salida solo necesitamos apoyarnos en la variable messageQueue.

Necesitamos convertir la respuesta que es de tipo Object a Payment Card para lo cual realizamos una función que realizará la conversión de String a Payment Card con Jackson.

Como paso final voy a comparar los datos de respuesta y verificar si el monto ha sido procesado correctamente.

Y sí, procesó correctamente el evento y respondió los datos esperados.

Conclusiones

Con este artículo se pretende dar una breve introducción a los principales conceptos y cómo es la estructura actual de este componente de Spring.

Adicionalmente quisiera hacer hincapié en la fácil implementación, utilicé RabbitMq porque es la mensajería con la cual había trabajado anteriormente, pero disponemos de Kafka o Kinesis de Amazon.

También quiero resaltar la ventaja de utilizarlo ya que al ser una capa de abstracción hacia algún sistema de mensajería el impacto de cambiar a otro sistema es relativamente pequeño, es de bajo acoplamiento.

Referencias

Cuéntanos qué te parece.

Enviar.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.