Secuencial, secuencial. Yo también era así, viviendo tranquilo entre mis bucles, mis listas, mis collections; si quería transformar una lista for; si quería filtrar for, todo era predecible. Hasta que un día me soltaron: “eso se puede hacer con RxJava” y es aquí donde empezó mi travesía entre lambdas, subscribe y una forma de componer lógica limpia, fluida y potente.

No hace falta ser un gurú para empezar, solo saber “adaptarte” y abrir la mente a un nuevo paradigma. En el siguiente post, podremos ver unos primeros pasos para dar dicho salto, pasando por los puntos generales de la programación reactiva hasta sumergirnos entre suscribe, control de errores y las operaciones más importantes como pueden ser el map, flatMap, etc.

Por qué cambiar: motivaciones reales

1 Limitación del pensamiento secuencial

La programación secuencial siempre funciona, hasta que llega un momento que no. Al enfrentarnos al procesamiento masivo de datos, llamadas concurrentes a diferentes servicios externos o controlar errores de forma masiva, puede convertir una lógica, aparentemente sencilla, en un arco de iglesia o en un gran laberinto de estructuras anidadas.

Lo que empieza con un simple for dentro de otro for, se acaba convirtiendo conforme avanzamos en nuestras integraciones, en una cadena de if, try-catch y verificaciones de estado que complican nuestro clean code. De pronto te ves con tres servicios distintos que consultar (por ejemplo: usuarios, pedidos e inventario) y lo haces uno detrás de otro:

try {
    final Usuario usuario = getUsuario(id);
    try {
        final List<Pedido> pedidos = getPedidos(usuario);
        try {
            final Inventario inv = getInventario(pedidos);
        } catch (final InventarioException e) {
            log.error("Error en inventario", e);
        }
    } catch (final PedidoException e) {
        log.error("Error en pedidos", e);
    }
} catch (final UsuarioException e) {
    log.error("Error en usuario", e);
}

Acabamos agregando complejidad. Lo que iba a ser una simple consulta, acaba siendo un bloque de try-catch anidados, pudiendo caer en la duplicación de lógica, complicaciones para testear… eso sin contar con nuestro queridísimo NullPointerException. Intentamos forzar un modelo de paso a paso en un mundo lleno de eventos, flujos y condiciones variables.

2 Pensar en flujos, no en pasos

RxJava nos propone una nueva forma de pensar y de razonar para solucionar nuestros problemas: trata los datos como flujos. Esto significa que tenemos que transformarlos, filtrarlos y gestionarlos de forma declarativa. Es decir, no pienso en lo que viene ahora o después (o lo que viene primero) sino en ¿qué hago y cómo reacciono cuando llegue el momento?”.

3 Cuándo usar RxJava

RxJava tiene diferentes contextos en los que es recomendable utilizarlo:

¿Por qué RxJava?

4 Características importantes

RxJava y la programación reactiva en general se basan en los siguientes principios:

map(x=>10*x)

Primeros pasos con RxJava

Como venimos de un mundo 100% imperativo, la clave, como casi en toda la programación, es empezar poco a poco.

Creación de flujos

Observable saludo = Observable.just("Hola", "desde", "RxJava"); saludo.subscribe(System.out::println);

Operadores esenciales

Observable.just("Java", "Kotlin", "Scala")
    .filter(lang -> lang.length() > 4)
    .map(String::toUpperCase)
    .subscribe(System.out::println);

Tenemos un ejemplo con algo más cercano a nuestro día a día:

Observable.just(
        new Usuario("Ana", true, "ana@email.com"),
        new Usuario("Luis", false, "luis@email.com"),
        new Usuario("Marta", true, "marta@email.com")
    )
    .filter(Usuario::isActivo)
    .map(usuario -> new ResumenUsuario(usuario.getNombre(), usuario.getEmail()))
    .subscribe(this::enviarResumen);

class ResumenUsuario {
    private String nombre;
    private String email;

    public ResumenUsuario(String nombre, String email) {
        this.nombre = nombre;
        this.email = email;
    }

    // getters, toString, etc.
}

void enviarResumen(ResumenUsuario resumen) {
//Imaginemos una llamada externa
    clienteAuditoria.enviar(resumen);
}

Al igual que en la programación imperativa, las lambdas seguirán siendo nuestras amigas, así como los principales operadores: map, filter y flatMap. Siendo estos partes de una laaaaaarga lista, como podemos observar en la documentación oficial de RxJava.

operadores esenciales map

De hecho, flatMap nos ayuda a transformar flujos anidados o asincrónicos, siendo uno de los pilares principales de la programación reactiva:

Observable.just("123", "456")
    .flatMap(this::buscarUsuarioPorId)
    .subscribe(System.out::println);

Observable<Usuario> buscarUsuarioPorId(String id) {
    return Observable.fromCallable(() -> {
        return new Usuario(id, "Nombre" + id);
    });
}
Operadores esenciales: flatmap

Control de hilos

Con las operaciones que nos ofrece la librería de RxJava Scheduler podemos hacer un control de los hilos.

observable
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(...);

Utilizamos subscribeOn para elegir el hilo de la suscripción y observeOn para elegir el hilo donde se observan los resultados.

Control de errores

Como hemos indicado, el control de errores también se simplifica pero es esencial (y ya veréis cuando tengáis que debuggear) para controlar bien nuestros hilos, así como reaccionar ante errores.

Observable.just("http://api")
    .flatMap(this::llamadaHttp)
    .retry(3)
    .onErrorReturnItem("valor por defecto")
    .subscribe(System.out::println);

De esta forma se acaban los try-catch eternos. El retry nos indica las veces que tiene que reintentarlo en caso de fallo, mientras que el onErroReturnItem actúa como un fallback para emitir un valor en caso de que los retry acaben fallando. Esto es muy útil para evitar que el flujo se interrumpa de manera directa y poder manejar situaciones donde el sistema pueda continuar con algún valor por defecto o conocido, por ejemplo:

Observable.just(new Usuario("Juan", "juan@email.com"))
    .flatMap(this::crearUsuario)
    .retry(3)
    .onErrorReturnItem(new Usuario("usuario_desconocido", "no-email@dominio.com"))
    .subscribe(usuario -> guardarEnCache(usuario));


// Simulación de llamada a un servicio que crea un usuario
Observable<Usuario> crearUsuario(Usuario usuario) {
    return apiClient.crearUsuario(usuario); }

void guardarEnCache(Usuario usuario) {
    cacheRepository.save(usuario);
}

Para que veamos la diferencia entre programación reactiva y secuencial en este ejemplo, ese mismo código con el enfoque secuencial quedaría así:

public void createUserSequential() {
   final Usuario usuario = new Usuario("Juan", "juan@email.com");
  final Usuario usuarioCreado = intentarCrearUsuarioConReintentos(usuario, 3).orElseGet(() -> new Usuario("usuario_desconocido", "no-email@dominio.com"));
    guardarEnCache(usuarioCreado);
}

private Optional<Usuario> createUserWithRetry(Usuario usuario, int intentosMax) {
    return IntStream.range(0, intentosMax)
        .mapToObj(i -> {
            try {
                return Optional.of(apiClient.crearUsuario(usuario));
            } catch (Exception e) {
                return Optional.<Usuario>empty();
            }
        })
        .filter(Optional::isPresent)
        .map(Optional::get)
        .findFirst();
}


void guardarEnCache(Usuario usuario) {
    cacheRepository.save(usuario);
}

El control de errores nos podría dar para otro post, así que si quieres indagar aún más te dejo algunas pinceladas en este artículo: Basic RxJava error handling.

Todo esto tenemos que condensarlo y tenerlo en mente teniendo cuenta el siguiente pensamiento: en vez de ejecutar pasos, definimos un flujo. En vez de controlar todo el estado, reaccionamos a los datos.

Cómo evitar algunos errores comunes en RxJava

Ya que yo me he tropezado en mis pequeños comienzos en RxJava, unos pequeños consejos para evitarlos (o que si os pasa, reaccionéis a tiempo!).

Usar subscribe() para “hacer cosas importantes”

Debemos evitar poner lógica de negocio en los subscribe(). Esto huele a programación imperativa. Lo ideal es que toda la lógica vaya en el pipeline y dejar el subscribe() solo para el consumo final del mismo.

Observable.just(1, 2, 3)
    .map(x -> x * 2)
    .subscribe(this::guardarEnBaseDeDatos);

Mezclar flujos y estados compartidos

Nuestro pilar es que la programación reactiva se basa en la inmutabilidad, por lo tanto, si vemos que necesitamos algo mutable, habrá que darle una vuelta ya que hay otra forma de expresarlo.

List<String> resultados = new ArrayList<>();

observable
    .map(this::transformar)
    .subscribe(resultados::add);

Elegir RxJava sin motivo

Como todo en esta vida de la ingeniería de Software, no elegimos una solución simplemente por ser novedosa. RxJava no es el martillo para clavar todos los clavos. Si tu app no necesita asincronía, no utilices RxJava. Utiliza RxJava únicamente cuando la reactividad nos simplifique la vida y no si no las complica.

Conclusión

Este salto desde la programación secuencial a la reactiva con RxJava puede parecer abrumador al principio, pero con esfuerzo y dedicación nos dará una nueva mentalidad y forma de afrontar algunos problemas en nuestro trabajo. Conseguiremos un pensamiento claro, más expresividad y mayor capacidad para manejar flujos más complejos de datos.

Como dice mi madre: “quien mucho abarca, poco aprieta”, así que no intentemos dominar todo de golpe: experimenta, falla y aprende. RxJava dejará de ser alguien “extraño” a convertirse en tu mejor amigo en muchas arquitecturas que utilices. No trates de escribir código reactivo, piensa de forma reactiva.

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete