Seguimos con la serie de artículos en el que se muestran distintas alternativas para la construcción de aplicaciones basadas en el procesamiento de flujos de datos en Kafka.

En esta ocasión, vamos a ver Kafka Streams Processor API, una alternativa a Kafka Streams DSL de bajo nivel. Para ello, partiremos de un breve enfoque teórico para, posteriormente, ver cómo aplicar dicha tecnología a un caso de uso.

Contextualizando Kafka Streams Processor API

Como comentamos en el artículo anterior, Kafka Streams se introdujo dentro del ecosistema Kafka en el 2016, concretamente en su versión 0.10, con el nombre de Streams API, Desde su inicio, ya ofrecía dos maneras de implementar dichas soluciones: procesamiento a bajo nivel (Processor API) o procesamiento a alto nivel (DSL).

En este artículo vamos a completar esta tecnología centrándonos en el estudio del procesamiento a bajo nivel (Processor API).

¿Qué es y cómo funciona Kafka Streams Processor API?

Ya vimos que Kafka Streams es una librería Java muy ligera con la que podemos trabajar con flujos de datos en tiempo real, desarrollando servicios o componentes en los que la entrada y la salida está en Kafka.

Kafka Streams Processor API mantiene la misma filosofía de concepto y uso que hemos visto con Kafka Streams DSL, pero dada su menor abstracción al DSL, ofrece una mayor flexibilidad para poder realizar acciones a bajo nivel.

Estas características nos van a permitir implementar casuísticas que con Kafka Streams DSL no podemos o que se volverían muy complejas.

Realizar programación a bajo nivel no quiere decir necesariamente que sea más complejo, sino que no vamos a disponer de ciertas abstracciones que nos proporcionan librerías de más alto nivel. Aunque hay que tener en cuenta algunas consideraciones.

Es un caso similar a los ORMs y las querys nativas SQL. Hay veces en que las abstracciones del ORM no son suficientes y tenemos que incluir algunas querys directamente.

Kafka Streams Processor API comparte las mismas características de escalabilidad, resiliencia y tolerancia a fallos que Kafka Streams DSL, ya que ambos trabajan directamente sobre Kafka. Por tanto, también tenemos que tener muy en cuenta el diseño de topics y particiones.

Como en el caso anterior, no nos vamos a centrar en todos los detalles que ofrece la librería, aunque sí que haremos foco en los puntos más importantes a tener en cuenta para desarrollar aplicaciones de este tipo.

Implementando un caso de uso con Kafka Streams Processor API

Vamos a aplicar el conocimiento adquirido sobre Kafka Streams Processor API implementando el caso de uso presentado en los artículos anteriores.

Puedes encontrar todo el código correspondiente al artículo en el siguiente repositorio.

Recordando el caso de uso

En el artículo inicial de la serie se explica en detalle el caso de uso de control de fraude en tiempo real que queremos implementar utilizando diferentes tecnologías. En esta ocasión, lo haremos con Kafka Streams Processor API.

A modo de resumen: nos encontramos en un escenario en el que una organización necesita detectar en tiempo real las operaciones de intento de fraude en las transacciones con tarjeta para evitar problemas derivados de esta práctica. Existen diferentes canales de entrada asociados a las operaciones con tarjeta y, actualmente, este caso de uso se resuelve mediante procesos batch. Este proceso batch no permite mejorar el tiempo que transcurre desde que un posible intento de fraude se detecta hasta que se notifica y gestiona.

¿Cómo podemos reducir el tiempo de detección de fraude, tendiendo a tiempo real, con Kafka Streams Processor API? Bien, al igual que con Kafka Streams DSL, vamos a seguir el diseño definido durante toda la serie de artículos, donde lo único que va a cambiar será la forma en la que se implementa el nuevo proceso de detección de fraude, en esta ocasión con Kafka Streams Processor API.

De esta forma, vamos a ser capaces de detectar el fraude en tiempo real para, a posteriori, tomar medidas al respecto con esa información.

Implementando la solución

En entornos reales, se suele optar por un enfoque híbrido Kafka Streams DSL combinado con Kafka Streams Processor API. En este artículo (que tiene fines didácticos) mantendremos el diseño que hicimos en el artículo anterior, pero no vamos a entrar tan en detalle respecto a la definición de la topología y los distintos nodos a desarrollar como en el artículo de Kafka Streams DSL, ya que todo lo visto en ese artículo es aplicable a Kafka Streams Processor API.

Vamos a recordar la topología definida para este caso de uso, acorde al diseño definido previamente y teniendo en cuenta los casos de fraude que se quieren detectar:

Como podemos apreciar en la imagen anterior, el proceso de fraude va a leer los movimientos de las tarjetas que se están produciendo, va a procesar estos movimientos teniendo en cuenta las reglas de negocio para detectar movimientos fraudulentos e informará sobre los posibles casos de fraude que se hayan podido dar.

A continuación, vamos a ver cada uno de los bloques de acciones representadas en los nodos y los detalles de su implementación con Kafka Streams Processor API.

Recogida de información

Siguiendo la topología definida, como en el caso de Kafka Streams DSL, este bloque se centra en los nodos que van a realizar las acciones necesarias para preparar la información para su posterior procesamiento y análisis.

Obviamente, la manera de implementar el código es diferente entre tecnologías. Kafka Streams Processor API va a requerir más código por regla general, pero a su vez tendremos más control.

La idea es que partimos de una topología (topology) a la que vamos a ir agregando operaciones de distintos tipos (source, processor, sink) que nos permitan implementar la lógica que necesitemos.

De esta forma, vamos a poder ir a un grano más fino en cada una de las operaciones si lo vemos necesario.

Lectura de movimientos y extractor de tiempo

Para llevar a cabo la lectura de los movimientos del topic correspondiente, asignamos un procesador de tipo source a la topología.

topology.addSource(Topology.AutoOffsetReset.EARLIEST, 
                   "Movement events source", 
                   new MovementTimestampExtractor(), 
                   Serdes.String().deserializer(), 
                   movementSerde.deserializer(), "movements");

De esta forma, vamos a ser capaces de leer los movimientos que lleguen al topic movements.

Al igual que con el Kafka Streams DSL, tenemos que asignar un extractor de tiempo para que se tenga en cuenta la marca de tiempo real del movimiento en vez de la marca de tiempo de inserción del mensaje en el topic.

public class MovementTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
        Movement movement = (Movement) consumerRecord.value();
        return iso8601ToEpoch(movement.getCreatedAt());
    }

    …
}

Si se requiere de más detalle sobre este punto, lo puedes encontrar en el artículo sobre Kafka Streams DSL, donde queda detallada la importancia de este concepto.

Mapear a objeto interno

Aquí encontramos una diferencia significativa respecto a Kafka Streams DSL, ya que no tenemos la capa de abstracción para realizar transformaciones sobre el flujo de datos.

.map((k,v) -> KeyValue.pair(v.getCard(), v))  // Kafka Streams DSL

Con Kafka Streams Processor API tenemos que crear nuestro propio processor para realizar la transformación necesaria. Un processor es una operación que puede aplicarse a una topología (como un source o un sink), que implementa la interfaz processor y también la lógica necesaria para, posteriormente, aplicarlo a una topología.

En nuestro caso, se usa para implementar la transformación que queremos acometer.

public class MapKeyProcessor implements Processor<String, Movement, String, Movement> {

    private ProcessorContext<String,Movement> context;

    public MapKeyProcessor() {}

    @Override
    public void init(ProcessorContext<String, Movement> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, Movement> record) {
        context.forward(new Record<>(record.value().getCard(), record.value(), record.timestamp()));
    }

    @Override
    public void close() {}
}

La parte más destacable está en el método process, donde se implementa la lógica de la transformación de cada evento “new Record<>(record.value().getCard(), record.value(), record.timestamp())“ y se envía a la próxima operación de la topología “context.forward()“.

Por último, para aplicar dicha transformación, solo hay que añadir el processor a la topología.

// stateless (generates new key and splits movements by origin)
topology.addProcessor("Map", MapProcessor::new,"Movement Source");

A partir de aquí, todas las operaciones que queramos implementar en la topología se harán de la misma manera (exceptuando las operaciones de source y sink, que tienen su propia abstracción).

La transformación podría servir como ACL para no propagar la estructura del objeto asociado al topic, como en el caso de Kafka Streams DSL. Sin embargo, ante un cambio de versión del objeto asociado al topic de movimientos, solo tendríamos que modificar esta transformación.

Separar canales de origen

Como hemos visto en el caso de Kafka Streams DSL, para la topología definida se necesita separar los movimientos recibidos por los distintos canales de origen, ya que cada uno de ellos va a tener unas reglas diferentes para el control de fraude.

Se puede pensar que podríamos haber incluido esta lógica en el processor anterior, y es cierto. Pero como el artículo tiene fin didáctico, vamos a mantener el diseño original.

Del mismo modo que la operación anterior, Kafka Streams DSL posee una operación en su capa de abstracción que permite hacer lo que necesitamos.

.split(Named.as("type-"))
.branch((k,v) -> v.getOrigin() == FraudCheckerConfig.ONLINE_MOVEMENT, Branched.as("online"))
.defaultBranch(Branched.as("physical"));

Ya hemos visto que en Kafka Streams Processor API tenemos que implementar un processor que nos permita implementar la lógica deseada.

public class SplitProcessor implements Processor<String, Movement, String, Movement> {

    private static final int ONLINE_MOVEMENT = 3;
    private ProcessorContext<String,Movement> context;

    @Override
    public void init(ProcessorContext<String, Movement> context) {
        this.context = context;
    }

    @Override
    public void process(Record<String, Movement> record) {
        if (record.value().getOrigin() == ONLINE_MOVEMENT){
            context.forward(record, "Online movements aggregator");
        } else {
            context.forward(record, "Physical movements aggregator");
        }
    }

    @Override
    public void close() {}
}

Es una clase muy sencilla, al igual que el procesador de mapeo anterior, pero incluimos un detalle: hacemos forward especificando el siguiente procesador al que va dirigido. Es decir, si el registro tiene como origen “online”, solo será dirigido al procesador asignado para movimientos online. En otro caso, irá al de movimientos físicos.

En principio, parece la misma estructura que el processor visto anteriormente, pero tiene una peculiaridad. A la hora de enviar el resultado a la próxima operación de la topología, se especifica la operación a la que va dirigido, permitiendo que los datos correspondientes a los distintos orígenes sean tratados por los processors encargados de cada uno de ellos.

Para finalizar, se aplica dicha operación a la topología.

// Split: physical and online movements processed independently
topology.addProcessor("Split", SplitProcessor::new,"Map");

Al igual que con Kafka Streams DSL, a partir de un flujo de datos generamos dos, bifurcando los movimientos según su origen.

Para no añadir mayor complejidad a la lectura, a partir de este punto se va a mostrar únicamente la parte de código necesaria para el procesamiento de los movimientos de origen ONLINE, ya que son similares a los otros orígenes.

Procesado de la información y aplicación de reglas de negocio

No hemos querido complicar la lógica de detección de fraude, por lo que los pasos que se realizan para detectarlo son similares en los distintos tipos de movimientos.

En este aspecto, sí que existe una mayor diferencia entre tecnologías, ya que la forma de implementar la agregación de movimientos mediante una ventana de sesión es muy diferente.

Agrupación, agregación de operaciones y ventana de sesión

Como es habitual, en Kafka Streams DSL tenemos disponibles las abstracciones para poder hacer esas transformaciones de forma directa.

.groupByKey(Grouped.with(Serdes.String(), movementSerde))                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(config.getSessionInactivityGap())))
.aggregate(MovementsAggregation::new,
          (k, movement, movementsAggregation) -> FraudCheckerUtils.aggMovement(movement, movementsAggregation),
          (k, i1, i2) -> FraudCheckerUtils.aggMerge(i1, i2),
           Materialized.with(Serdes.String(), movementsAggregationSerde))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().shutDownWhenFull()))
.toStream()

Con Kafka Streams Processor API no tenemos esas abstracciones y tenemos que construir estas operaciones con nuestras propias manos. Para ello, la agrupación y la agregación de movimientos en una ventana de tiempo concreta se van a implementar en el mismo processor.

En primer lugar, tenemos que tener un almacén (store) donde vayamos almacenando los movimientos asociados a una misma tarjeta, manteniendo un estado. Es decir, necesitamos un store de clave-valor, donde la clave es el identificador de la tarjeta y el valor es la lista de movimientos asociados a esa tarjeta.

StoreBuilder<KeyValueStore<String, MovementsAggregation>> onlineFraudStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("online-aggregator-store"),
                Serdes.String(), movementsAggregationSerde);

Teniendo en cuenta que no podemos almacenar movimientos de forma indefinida debido a las reglas de fraude que se van a aplicar, ¿cómo implementamos una sesión?

Esta es, sin duda, la parte más compleja de implementar esta solución con Kafka Streams Processor API. Para ello, vamos a recordar el concepto de ventana de sesión asociado a nuestro caso de uso que vimos en el artículo de Kafka Streams DSL.

Una sesión se inicia en el momento que se envía el primer mensaje correspondiente a un movimiento de una tarjeta, y estará activa mientras haya más movimientos asociados a ella y no se supere un tiempo definido entre movimientos (inactivity gap). En caso de superarse, se inicia una nueva sesión.

Este concepto es similar a una sesión web: mientras estés haciendo acciones en la web, la sesión sigue activa. Si estás un tiempo sin hacer nada, la sesión se cierra.

Para definir la sesión con Kafka Streams Processor API vamos a tener en cuenta este comportamiento y vamos a diseñar cómo va a actuar una sesión.

Una vez definido este algoritmo, no resulta difícil implementar la lógica necesaria para construirlo.

public void process(Record<String, Movement> currentOnlineMovement) {
        String key = currentOnlineMovement.key();

        log.info("Processing incoming movement with id <{}> and key <{}>", currentOnlineMovement.value().getId(), key);

        MovementsAggregation lastMovementStored = kvStore.get(key);

        if (kvStore.get(key) == null) {
            openNewSessionWindowWithTheNewMovement(currentOnlineMovement);
        } else {
            if (isSessionEndedForTheKey(key, currentOnlineMovement.timestamp())) {
                log.info("Detected session expired for movements associated to card <{}>. Closing session and emitting result", key);
                closeSessionAndEmitResult(key, lastMovementStored);
                openNewSessionWindowWithTheNewMovement(currentOnlineMovement);
            } else {
                updateAggregationStored(currentOnlineMovement, lastMovementStored);
            }
        }
    }

Este algoritmo basa su funcionamiento en el flujo continuo de movimientos pero, si no llegan más movimientos asociados a la tarjeta, ¿qué pasa con la sesión?

En el caso de Kafka Streams DSL, vimos que existe la necesidad de que haya tráfico de movimientos para manejar las sesiones, ya que si se abre una sesión asociada a una tarjeta y no llega después un movimiento que supere el inactivity gap, no se lanzará la lógica asociada al cierre de sesión y propagación del objeto.

Para solventar esto, una ventaja de Kafka Streams Processor API es que podemos construir funciones que se ejecuten de manera periódica. En nuestro caso, construiremos una función que se ejecute cada cierto tiempo, y que cierre las sesiones abiertas si no se han registrado movimientos nuevos.

Por lo tanto, estamos encontrando una característica de Kafka Streams Processor API que lo hace más potente que el Kafka Streams DSL para este tipo de casos.

Además, la implementación de esta función periódica (punctuator) dista de lo que estábamos viendo normalmente, ya que su declaración está en método init del processor.

@Override
    public void init(ProcessorContext<String, MovementsAggregation> context) {
        …
        …
        punctuator = this.context.schedule(Duration.ofSeconds(this.checkInactiveSessionsInterval), PunctuationType.WALL_CLOCK_TIME, this::closeInactiveSessions);

Cada vez que desencadena el trigger (intervalo) se llama al método closeInactiveSessions.

private void closeInactiveSessions(Long currentTimestamp) {
        log.info("[{}] Scheduled action: looking for inactive sessions....", currentTimestamp);
        try (KeyValueIterator<String, MovementsAggregation> iterator = this.kvStore.all()) {
            while(iterator.hasNext()) {
                KeyValue<String, MovementsAggregation> entry = iterator.next();
                if (isSessionEndedForTheKey(entry.key, currentTimestamp)) {
                    log.info("\t -> Inactive session found for card: <{}>. Closing session and emitting result", entry.key);
                    closeSessionAndEmitResult(entry.key, entry.value);
                }
            }
        }
        log.info("[{}] Scheduled action: looking for inactive sessions finished", currentTimestamp);
    }

Como podemos apreciar, el tipo del objeto punctuator es WALL_CLOCK_TIME. Esto quiere decir que se va a lanzar según el intervalo definido, independientemente de que lleguen mensajes al stream o no. Existe otro tipo, STREAM_TIME, en el que el reloj avanza solo si entran mensajes en el stream.

Como en los casos anteriores, añadimos el processor a la topología.

// Aggregation of movements during a session
topology.addProcessor("Online movements aggregator", this::initializeOnlineAggregation,"Split")
                .addStateStore(onlineFraudStoreBuilder,"Online movements aggregator");

De esta manera, ya tenemos implementada una forma de agrupar y realizar operaciones de agregación dada una ventana de tiempo con Kafka Streams Processor API.

Filtrado de casos

Cuando se cierra la sesión y se propaga el objeto que agrega los movimientos, tenemos que comprobar si cada una de las agregaciones constituye un caso de fraude. En Kafka Streams DSL, disponemos de la abstracción necesaria para realizar la operación.

.filter((k,v) -> FraudCheckerUtils.isOnlineFraud(v))

Con Kafka Streams Processor API, y de forma similar al resto, tenemos que crear nuestro propio processor para realizar el filtrado.

@Override
public void init(ProcessorContext<String, MovementsAggregation> context) {
    this.context = context;
}

@Override
public void process(Record<String, MovementsAggregation> record) {
    MovementsAggregation movementsAggregation = record.value();
    if (isFraud(movementsAggregation)){
        context.forward(record);
    }
}

@Override
public void close() {}

private boolean isFraud(MovementsAggregation movementsAggregation) {
    if (movementsAggregation.getMovements().size() > MULTIPLE_ONLINE_MOVEMENTS_IN_SHORT_PERIOD) {
        final Set<Movement> movements = movementsAggregation.getMovements();
        double totalAmount = movements.stream().mapToDouble((movement) -> movement.getAmount()).sum();
        return totalAmount > ALLOWED_ONLINE_AMOUNT_IN_SHORT_PERIOD;
    }

    return false;
}

Y para finalizar con la implementación del nodo, se aplica dicha operación a la topología.

// Filter: detect online fraud cases
topology.addProcessor("Online fraud cases filter", OnlineFilterProcessor::new, "Online movements aggregator");

De esta forma, ya tendríamos completa la implementación que nos permite aplicar las reglas de fraude para la detección de movimientos fraudulentos.

Salida de información

Una vez que se ha cerrado la ventana de sesión y se han filtrado solo los casos potencialmente fraudulentos, se preparan los datos para publicarlos en el topic de salida.

En Kafka Streams DSL, al igual que en todos los casos anteriores, tenemos disponibles las abstracciones para realizar estas operaciones.

onlinePotentialFraudCases.merge(physicalPotentialFraudCases)
                         .map(this::potentialFraudCase2FraudCase)
                         .to("fraud-cases", Produced.with(Serdes.String(), fraudCaseSerde));

Sin embargo, con Kafka Streams Processor API tenemos que crearlos. En este caso, vamos a aprovechar un único processor para unir los dos flujos de datos y para hacer la transformación al objeto que se enviará al topic.

@Override
public void init(ProcessorContext<String, FraudCase> context) {
    this.context = context;
}

@Override
public void process(Record<String, MovementsAggregation> record) {
    final FraudCase fraudCase = FraudCheckerUtils.movementsAggregationToFraudCase(record.value());
    context.forward(new Record<>(fraudCase.getCard(), fraudCase, record.timestamp()));
    context.commit();
}

@Override
public void close() { }

A continuación, se aplica dicha operación a la topología.

// Merge and map
topology.addProcessor("Merge fraud cases", MergeProcessor::new,"Online fraud cases filter", "Physical fraud cases filter");

Finalmente, añadimos una operación de sink para publicar el objeto de caso de fraude generado en el topic fraud-cases.

topology.addSink("Fraud cases sink",
                config.getFraudTopic(),
                Serdes.String().serializer(),
                fraudSerde.serializer(),
                "Merge fraud cases");

Y de esta forma, ya tenemos el proceso de fraude completo realizado con Kafka Streams Processor API.

Testing con Kafka Streams Processor API

La forma de probar los desarrollos con Processor API no difiere de Kafka Streams DSL. Además, las soluciones utilizando Kafka Streams suelen adoptar un enfoque híbrido (DSL + Processor API).

Por tanto, en este artículo no vamos a entrar en detalle sobre este punto, ya que está comentado en el artículo anterior de la serie.

Por otro lado, si quieres ver en la práctica cómo probar aplicaciones realizadas con Kafka Streams Processor API te animamos a que eches un vistazo al código del proyecto.

Conclusiones

Hemos visto cómo implementar un caso de detección de fraude utilizando una solución como Kafka Streams en su variante de Processor API. Durante el artículo hemos comprobado que trabajar a bajo nivel nos proporciona un abanico más amplio de posibilidades que utilizar las abstracciones que proporciona el DSL, pero también hemos comentado que tenemos que tener cuidado y no complicar demasiado los desarrollos haciendo “sobre-ingeniería” a bajo nivel.

En posteriores artículos, implementaremos este mismo caso de uso, utilizando otras soluciones como ksqlDB y, finalementee, haremos una comparativa de todas ellas.

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.