Hoy en el blog os presentamos un artículo que os puede servir de guía para un primer acercamiento a la programación reactiva con el marco de trabajo Spring.

Este post los hemos dividido en dos partes: primero, nos centramos en la parte más teórica y luego un ejemplo práctico. A continuación, encontraréis unas breves notas de nuestra experiencia trabajando con el paradigma reactivo y el código de una aplicación CRUD.

Algo de documentación

¿Qué es Webflux y Data R2DBC?

Como respuesta a la necesidad de un stack web no bloqueante, Spring ha creado estos módulos:

¿Qué debo tener en cuenta antes de decidirme por RX de Spring?

Antes de lanzarte, te recomiendo que reflexiones sobre:

Casos de uso

Algunos de los escenarios en los cuales se pueden implementar RX de forma incremental que no requieran de maratónicas migraciones de servicios legados, pueden ser:

Ejemplo práctico

¿Qué hace el proyecto poc-reactive?

Es un CRUD con un requerimiento particular de almacenar en una colección los registros eliminados y evitar que sean incluidos nuevamente. Tenéis acceso a la base de código en el repositorio.

Modelos

IdNameDTO

{
 "id": "string",
 "name": "string"
}

ErrorResponse

{
 "code": "string",
 "message": "string",
 "details": [
   "string"
 ],
 "level": "String"
}

Destripando el código

Controller

public Flux<IdNameDTO> getAll() {
   return idNameService.getAll()
           .map(IdNameDTO::from);
}

La clase Flux es similar a stream, retorna de 0 a n elementos.

El método map(), para la transformación de datos, como en stream, es ejecutado valor a valor.

public Mono<IdNameDTO> getById(@PathVariable Long id) {
   return idNameService.getById(id)
           .switchIfEmpty(Mono.error(ServiceException.notFound(id)))
           .map(IdNameDTO::from);
}

La clase Mono, similar a Optional, retorna de 0 o 1 elemento.

El método switchIfEmpty(), si la respuesta está vacía, en el ejemplo se emite un evento de error que también es posible retornar una respuesta por defecto o llamar un flujo alternativo.

El método estático error() de la clase Mono para generar eventos de error.

public Mono<IdNameDTO> insertOne(@Valid @RequestBody
Mono<IdNameDTO> payload) {
   return payload
           .flatMap(it -> {
               if (previous.contains(it.getName()))
                   return Mono.error(ServiceException.newInstance4XX("No puede volver a ingresar un registro previamente eliminado"));

               return Mono.just(it);
           }).map(IdNameDTO::toEntity)
           .flatMap(idNameService::insert)
           .map(IdNameDTO::from);
}

El body es aceptado como un objeto simple de java o encapsulado como stream de datos (Mono o Flux).

El método flatMap() permite hacer llamadas asíncronas, en este caso transmite una excepción si fue previamente eliminado.

public Mono<Void> deleteById(Long id) {
   return Mono.just(id)
           .doOnNext(unused ->
                   idNameService.getById(id)
                           .map(IdName::getName)
                           .doOnNext(previous::add)
                           .subscribe(s -> log.info("added {}", s),
                                   e -> log.error("error {}", e.getMessage()))
           ).flatMap(idNameService::deleteById);
}

doOnNext(), método de mutación a los datos, en este caso como bloque de control.

subscribe(), método para ejecutar flujos asíncronos desatendidos, este caso consulta en BBDD por el id a eliminar y respalda su valor en una lista.

default Flux<T> getAllAsStream() {
   return getAll()
           .delayElements(Duration.ofMillis(20));
}

Spring puede transmitir los datos como una pila de eventos, si el cliente soporta esta funcionalidad puede recibir una respuesta incremental.

delayElements(), añade una espera de 20 milisegundos entre elemento y elemento de la respuesta

default Flux<String> getPrevious() {
   return Flux.fromIterable(previous);
}

Genera un objeto flux desde una lista.

Service

private final Retry catchingQueryTimeoutException = Retry.fixedDelay(3, Duration.ofSeconds(2))
       .filter(it -> it instanceof PrematureCloseException || it instanceof QueryTimeoutException)
       .doAfterRetry(retrySignal ->
           log.info("iteration: {}, cause: {} {}",
retrySignal.totalRetriesInARow(),
retrySignal.failure().getClass().toString(),
retrySignal.failure().getMessage())
       ).onRetryExhaustedThrow((retrySpec, retrySignal) -> retrySignal.failure());

Objeto para el failSave, define el número de reintentos y un delay entre ellos, aplica solo cuando la excepción sea una instancia de QueryTimeoutException o PrematureCloseException, doAfterRetry añade una traza después de lanzar un reintento y transmite a la capa superior la excepción cuando se agota la cantidad de reintentos con onRetryExhaustedThrow().

public Mono<IdName> insert(IdName payload) {
   return idNameRepository.save(payload)
           .retryWhen(catchingQueryTimeoutException);
}

retryWhen, se ejecuta si falla el método save y vuelve a ejecutarlo cumpliendo lo descrito en el objeto Retry.

@Table
public class IdName implements Persistable<Long> {

   @Id
   @GeneratedValue
   private Long id;
   @Column
   private String name;

   @Override
   @Transient
   public boolean isNew() {
       return id == null;
   }
}

La declaración de entidades con r2dbc requiere de algo de ayuda para diferenciar un registro nuevo de uno existente, en caso de no implementar isNew() el método save del repository siempre construirá sentencias de insert.

Las anotaciones @Table y @Column por las limitaciones actuales de r2dbc no poseen muchos atributos de configuración.

Para la construcción de las estructuras de BBDD (tablas, secuencias, vistas, etc.), puedes hacer uso de queries de inicialización (schema.sql y data.sql), puesto que la característica de construcción aún no está disponible.

Conclusión

La decisión de Spring de alejarse de un paradigma procedimental y ceñirse a uno funcional, para los que están acostumbrados a la asincronía con JavaScript, puede parece que es un desacierto. Sin embargo, viendo la reciente evolución de Java, considero que es una decisión enfocada en reducir su verbosidad.

La comunicación asíncrona de alto rendimiento, resiliente y de tiempo real tiene una gran influencia en nuestro día a día. Con su adopción se pueden tener mejoras en costos de servicios y experiencia de usuario.

¿Te interesa todo lo relacionado con el ecosistema Sping? No te pierdas entonces este post en el que te contamos las novedades que se presentaron en el Sping IO 22.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.