Este artículo forma parte de una serie dedicada a mostrar distintas alternativas para la construcción de aplicaciones basadas en el procesamiento de flujos de datos en Kafka. En este primer artículo podéis ver cómo trabajar con las soluciones de streaming en Kafka.

Para continuar, hablaremos sobre Kafka Streams DSL, una de las alternativas que vamos a mostrar para la creación de este tipo de aplicaciones. Partiremos de un breve enfoque teórico que nos ayude a comprender cómo funciona esta tecnología para, posteriormente, aplicar los conocimientos implementando un caso de uso.

Contextualizando Kafka Streams DSL

Es importante destacar que Kafka Streams es una tecnología pensada y diseñada para simplificar la construcción de soluciones para el procesamiento continuo de flujos de datos.

Se introdujo dentro del ecosistema Kafka en el 2016, concretamente en su versión 0.10, con el nombre de Streams API y, desde su inicio, ya ofrecía dos maneras de implementar dichas soluciones: procesamiento a bajo nivel (Processor API) o procesamiento a alto nivel (DSL).

En este artículo vamos a centrarnos en el procesamiento a alto nivel (DSL), y en el próximo estudiaremos el procesamiento a bajo nivel (Processor API).

¿Qué es y cómo funciona Kafka Streams DSL?

Kafka Streams es una librería Java muy ligera con la que podemos trabajar con flujos de datos (como los streams de eventos) en tiempo real, desarrollando servicios o componentes en los que la entrada y la salida está en Kafka.

De forma muy sencilla, al igual que con otras librerías Java, tenemos a nuestra disposición todas las capacidades de Kafka Streams, por lo que desarrollar una aplicación con esta librería no es diferente a desarrollar cualquier otra aplicación Java.

No vamos a entrar en detalles de todas las posibilidades que ofrece la librería porque ya puedes encontrar mucha información sobre ella en la red, incluidas fuentes como el blog de Paradigma, aunque sí vamos a profundizar en algunos aspectos interesantes que nos ayuden a comprender cómo funciona esta tecnología:

Identificador de aplicación

Cuando implementamos una aplicación usando Kafka Streams, veremos que dentro de los diferentes parámetros de configuración solo hay dos obligatorios: bootstrap.servers y application.id.

El primer parámetro resulta obvio, ya que se necesita indicar dónde está Kafka. Pero el segundo, que en principio parece un mero formalismo, tiene más importancia de la que parece. El identificador de aplicación se utiliza, entre otras cosas, para establecer el nombre del grupo de consumidores asociado a la aplicación. Pero, ¿por qué esto es tan importante?

Si echamos un vistazo a las características de Kafka, un grupo de consumidores es un conjunto de consumidores que consume datos de forma coordinada de las diferentes particiones de los topics de Kafka.

Cuando decimos “de forma coordinada” quiere decir que a cada consumidor del grupo se le asigna una o varias particiones y, esta asignación, se mantiene a lo largo del tiempo mientras el consumidor exista.

De esta forma, un consumidor siempre recibirá datos de las mismas particiones sin que otros consumidores del mismo grupo accedan a ellas.

Este concepto es muy importante de cara al siguiente punto: la escalabilidad.

Escalabilidad, un detalle a tener en cuenta

Como hemos comentado anteriormente, Kafka Streams no deja de ser una aplicación Java más que desplegamos en una plataforma. Si necesitamos más capacidad, es habitual escalar de forma horizontal, añadiendo más instancias de nuestra aplicación.

Entonces, ¿sería igual para aplicaciones implementadas con Kafka Streams?

En el ecosistema Kafka (y, por tanto, en Kafka Streams), hay que tener en cuenta un detalle muy importante para realizar la escalabilidad horizontal: el diseño de topics y particiones.

Kafka Streams, al ser 100% Kafka native, funciona de forma similar a cómo funcionan los consumidores y grupos de consumidores en Kafka. De forma muy resumida: una aplicación implementada con Kafka Streams crea un grupo de consumidores y asigna unas particiones (siempre las mismas) a cada consumidor. Es decir, a cada instancia de la aplicación Kafka Streams.

En el escenario inicial, tenemos un topic (topic A) que contiene dos particiones. Una única instancia maneja los mensajes de ambas particiones del topic. En un momento dado, se necesita más capacidad y se añade una instancia más. En este momento, cada partición se asigna a una instancia.

En este punto es donde hay que tener en cuenta el diseño realizado respecto a las particiones, ya que como solo hay dos, se ha llegado al máximo nivel de escalado. Entonces, ¿qué ocurre si añadimos más instancias? Nada.

Si añadimos más instancias, no habrá ningún efecto en el sistema porque el grupo de consumidores asociado a la aplicación no asignará ninguna partición a la nueva instancia.

Tolerancia a fallos

Kafka Stream se apoya directamente en las capacidades de tolerancia a fallos provistos por la propia plataforma Kafka, por lo que se beneficia de las capacidades de replicación de los datos en los topics.

Una vez que un mensaje llega un topic, puede ser replicado dependiendo de la configuración establecida (replication factor). En un entorno productivo, lo habitual es que el factor de replicación de un topic sea mayor que 1, para que los mensajes del topic y sus particiones estén replicados en los diferentes broker.

Si el broker líder de la partición no está disponible, otro broker en el que se hayan replicado los datos se asigna como líder y, por tanto, no habría una pérdida de datos ni de servicio.

Además, Kafka Streams crea una serie de topics internos para manejar la información que necesita almacenar (change logs topics). Como esos topics están replicados, mantiene también una buena tolerancia a fallos.

Implementando un caso de uso con Kafka Streams DSL

Una vez definidos los conceptos más relevantes de Kafka Streams DSL, no hay mejor forma de entender su funcionamiento que a través de implementar un caso de uso con esta tecnología.

Todo el código correspondiente al artículo lo puedes encontrar en el siguiente repositorio.

Recordando el caso de uso

En el artículo inicial de la serie se explica en detalle el caso de uso de control de fraude en tiempo real que queremos implementar con Kafka Streams DSL.

A modo de resumen: tenemos un escenario en el que una organización necesita detectar en tiempo real las operaciones de intento de fraude en las transacciones con tarjeta, para evitar problemas derivados de esta práctica.

Esta organización maneja gran cantidad de movimientos debido a las operaciones que los clientes realizan diariamente con las tarjetas desde distintas fuentes, y el proceso batch actual implementado para la detección de fraude ha quedado obsoleto, impidiendo reaccionar a tiempo para controlar las operaciones fraudulentas.

¿Cómo podemos reducir el tiempo de detección de fraude, tendiendo a tiempo real, con Kafka Streams DSL? Bien, para ello vamos a seguir el diseño definido durante toda la serie de artículos, donde lo único que va a cambiar será la forma en la que se implementa el nuevo proceso de detección de fraude, en esta ocasión con Kafka Streams DSL.

De esta forma, vamos a ser capaces de detectar el fraude en tiempo real para, a posteriori, tomar medidas al respecto con esa información.

Implementando la solución

Gracias a la abstracción que nos proporciona la librería, implementar el proceso de detección de fraude con Kafka Streams DSL resulta relativamente sencillo, siempre que tengamos algunos conceptos claros.

Uno de estos conceptos a tener en cuenta cuando trabajamos con Kafka Streams, y en general con flujos de datos, es el de topología.

Una topología está formada por un conjunto de nodos o procesadores y las relaciones entre sí, formando un gráfico acíclico, el cual resuelve un problema mediante flujos de información, compuestos por nodos y sus relaciones. Los nodos (o procesadores) tienen la función de realizar acciones sobre la información que reciben, y las relaciones muestran el flujo de información entre nodos.

Ahora, vamos a definir una topología acorde al caso de uso que tratamos en este artículo, teniendo en cuenta los casos de fraude que queremos detectar.

Como podemos apreciar en la imagen anterior, el proceso de fraude va a leer los movimientos de las tarjetas que se están produciendo, procesará estos movimientos teniendo en cuenta las reglas de negocio para detectar movimientos fraudulentos e informará sobre los posibles casos de fraude que se hayan podido dar.

Teniendo esto claro, vamos a ver cómo se puede implementar cada bloque con Kafka Streams DSL.

Recogida de información

Siguiendo la topología definida, este bloque se centra en los nodos que van a realizar las acciones necesarias para preparar la información. El siguiente paso será el procesamiento y análisi

La manera con la que conseguimos implementar la lógica para definir estos nodos resulta relativamente sencilla, gracias a la abstracción que proporciona la librería de Kafka Streams DSL.

Map<String, KStream<String, Movement>> movementTypes = streamsBuilder
    .stream(config.getMovementsTopic(), Consumed.with(Serdes.String(), movementSerde).withTimestampExtractor(new MovementTimestampExtractor()))
    .map((k,v) -> KeyValue.pair(v.getCard(), v))
    .split(Named.as("type-"))
    .branch((k,v) -> v.getOrigin() == ONLINE_MOVEMENT, Branched.as("online"))
    .defaultBranch(Branched.as("physical"));

Es un código sencillo, pero vamos a destacar algunos detalles para entender bien el proceso.

Extractor de tiempo

Cuando trabajamos con Kafka, topics y mensajes, tenemos que tener en cuenta el proceso que siguen, desde su creación hasta que se consumen. De ahí la importancia de las diferentes marcas de tiempo asociadas al evento.

En sistemas orientados a eventos, sobre todo si el número de eventos es elevado, puede ocurrir que el evento A se cree antes que el evento B, pero B se publique antes que A en un topic. Incluso puede ocurrir que, a pesar de haberse publicado antes B que A, se procese A antes que B.

En la imagen anterior se muestran algunas de estas situaciones que podrían pasar en nuestro caso de uso.

A la izquierda, se produce un movimiento en el cajero pero, por algún motivo, no se llega a enviar al servicio que va a producir el evento y, por lo tanto, este no se produce.

Sin embargo, sí se generan otros dos eventos, que corresponden a movimientos posteriores. Más tarde, el cajero sí puede enviar la información del primer movimiento (movimiento 1) y el productor puede generar el evento.

En este caso, si usamos como marca de tiempo el momento de creación del evento, el tratamiento de fraude no sería correcto.

A la derecha, el cajero sí logra enviar la información a los productores pero, el productor que va a publicar el evento correspondiente al primer movimiento, no puede hacerlo.

Los otros productores sí logran publicar sus eventos y, posteriormente, el primer productor consigue publicar el suyo. Al igual que en el caso anterior, si tomamos como marca de tiempo el momento de ingesta del evento, el tratamiento de fraude no sería correcto.

Debido a esto, y en este caso en concreto, tenemos que tomar como marca de tiempo el momento en el que se generó realmente el movimiento de la tarjeta. Para ello, en el objeto que define ese movimiento, definimos un campo createdAt que será el que usemos en nuestro proceso para analizar el fraude.

De esta manera, aunque falle la generación o la publicación del evento, el tratamiento de fraude sí es correcto porque se usa ese campo que permite ordenar los mensajes dentro de una ventana temporal.

Para implementar este extractor personalizado en el proceso, simplemente hay que crear una clase que implemente la interfaz TimestampExtractor.

public class MovementTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
        Movement movement = (Movement) consumerRecord.value();
        return iso8601ToEpoch(movement.getCreatedAt());
    }

    …

}

Este método será llamado a la hora de consumir los eventos del origen, tal como hemos visto anteriormente.

Mapear a objeto interno

Dentro del flujo del proceso, y con carácter divulgativo, hemos querido añadir un paso en el que realizamos una transformación del objeto que se recibe directamente del topic origen a un objeto interno.

…
.map((k,v) -> KeyValue.pair(v.getCard(), v))
…

El objetivo de esta transformación es modificar la clave que viene de origen para asegurarnos que corresponde al identificador de tarjeta que posteriormente usaremos a la hora de aplicar las reglas de fraude.

Este paso nos serviría como ACL para no propagar la estructura del objeto asociado al topic y depender de su versionado completamente sino que, ante un cambio de versión del objeto asociado al topic origen, solo tendríamos que modificar esta transformación.

Separar canales de origen

Para la topología definida, se necesita separar los movimientos recibidos por los distintos canales de origen, ya que cada uno de ellos va a tener unas reglas para el control de fraude diferentes.

De nuevo, con carácter divulgativo, nos sirve para mostrar la característica de Kafka Streams DSL de separar un stream en varios substreams dependiendo de la condición que se defina.

.split(Named.as("type-"))
.branch((k,v) -> v.getOrigin() == FraudCheckerConfig.ONLINE_MOVEMENT, Branched.as("online"))
.defaultBranch(Branched.as("physical"));

  console.log("Hola!");

De esta forma, a partir de un flujo de datos generamos dos, bifurcando los movimientos según el origen de ellos.

Para no añadir mayor complejidad a la lectura, a partir de este punto se van a mostrar únicamente la parte de código necesaria para el procesamiento de los movimientos de origen ONLINE, ya que son similares a los otros orígenes.

Procesado de la información y aplicación de reglas de negocio

Una vez se tienen los datos ya preparados, se implementan las tareas necesarias para aplicar las reglas de negocio que detectan si existe la posibilidad de movimientos fraudulentos o no.

Para implementar la lógica para definir estos nodos nos ayudamos de nuevo en la abstracción que proporciona la librería de Kafka Streams DSL.

KStream<String, FraudCase> onlineFraudMovements = movementTypes.get("type-online")
                .peek((k,v) -> log.debug("[ONLINE BRANCH] Processing movement: " + v))
                .groupByKey(Grouped.with(Serdes.String(), movementSerde))
                .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(config.getSessionInactivityGap())))
                .aggregate(
                        MovementsAggregation::new,
                        (k, movement, movementsAggregation) -> FraudCheckerUtils.aggMovement(movement, movementsAggregation),
                        (k, i1, i2) -> FraudCheckerUtils.aggMerge(i1, i2),
                        Materialized.with(Serdes.String(), movementsAggregationSerde)
                )
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().shutDownWhenFull()))
                .toStream()
                .filter((k,v) -> FraudCheckerUtils.isOnlineFraud(v))
                .map((k,v) -> KeyValue.pair(k.key(), FraudCheckerUtils.movementsAggregationToFraudCase(v)))
                .peek((k,v) -> log.debug("Online fraud case detected associated to card: {}", v.getCard()));

Al igual que hemos hecho antes, vamos a analizar los detalles del código:

Agrupar por clave

Como podemos ver, el primer paso es agrupar por clave (groupByKey), que si recordamos la transformación en el apartado anterior, es el identificador de tarjeta.

Esto es necesario porque vamos a crear una agregación, de modo que podamos tener todos los movimientos de una tarjeta durante un rango de tiempo definido y analizar si puede haber fraude.

Agregación de operaciones

Antes de hablar de la agregación como tal, tenemos que hablar de la ventana de sesión. Para manejar los movimientos que se hacen durante un periodo de tiempo, vamos a trabajar con una ventana temporal en la que se producen los movimientos.

En este post no vamos a entrar en los detalles de las diferentes ventanas que podemos usar porque daría para un post completo. Vamos a decir que optamos por una ventana de sesión. Las ventanas de sesión son ventanas de duración variable, determinadas por periodos de actividad (sesión), seguidos de periodos de inactividad. Cuando la duración del periodo de inactividad supera un valor marcado, la sesión termina.

Este comportamiento encaja con el comportamiento asociado a las operaciones con tarjetas. En el momento que se envía el primer mensaje asociado a un movimiento de una tarjeta, se inicia sesión y estará activa mientras haya otras operaciones siempre y cuando no se supere el periodo de inactividad:

Es similar a una sesión web, en la que mientras estés haciendo acciones en la web la sesión sigue activa. Si estás un tiempo sin hacer nada, la sesión se cierra (inactivity gap).

Además, en este caso hemos optado por consumir toda la sesión (untilWindowCloses) antes de poder disponer de los datos de la agregación. Aunque en un escenario real se podría optar por bloquear la tarjeta con el primer movimiento sospechoso, en este post (y volviendo al carácter didáctico del mismo) optamos por consumir la sesión. De esta forma, si hay fraude, tendremos todos los movimientos de una tarjeta durante esa sesión, para que puedan ser analizados posteriormente:

…
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(config.getSessionInactivityGap())))
…               .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded().shutDownWhenFull()))
…

En Kafka Streams, una vez que hemos establecido la ventana que vamos a usar, las agregaciones se realizan durante este periodo y por clave. Es decir, vamos a tener agregados los movimientos por tarjeta.

Filtrado de casos

Cuando se cierra la sesión, se procesan los objetos para filtrar solo aquellos que realmente estén asociados a casos sospechosos. Para ello, dentro de la operación filter, se llama al método isFraud comentando en el punto anterior. De este modo solo nos quedamos con los sospechosos.

…
.filter((k,v) -> FraudCheckerUtils.isOnlineFraud(v))
…

Definidas e implementadas estas etapas, simplemente quedaría unir los casos de fraude de los distintos orígenes y publicarlo en el topic destino.

Salida de información

Una vez que se ha cerrado la ventana de sesión y se han filtrado solo los casos potencialmente fraudulentos, se preparan los datos para publicarlos en el topic de salida.

No sería necesario hacerlo de forma conjunta, pero, en el caso del artículo, vamos a unir los diferentes casos para publicarlos en un mismo topic.

onlinePotentialFraudCases.merge(physicalPotentialFraudCases)
                         .map(this::potentialFraudCase2FraudCase)
                         .to("fraud-cases", Produced.with(Serdes.String(), fraudCaseSerde));

Hacemos merge del flujos de datos de casos online con los casos físicos, transformamos al objeto de salida con la estructura final y lo publicamos en el topic de salida.

Al igual que hicimos en la recogida de datos, establecemos una transformación antes de publicar en el topic de salida para actuar como ACL y evitar un acoplamiento fuerte con la estructura de los objetos del topic.

Testing con Kafka Streams DSL

Cuando trabajamos como casos de uso de naturaleza asíncrona y orientados a un flujo de información que se debe analizar de forma continua para detectar los patrones exigidos por el caso de uso, probar las soluciones puede ser complejo o tedioso.

No os preocupéis porque en este post vamos a ofrecer dos formas para implementar casos de prueba que permitan asegurar la calidad de nuestro desarrollo. Una está orientada a tests unitarios y otra a test de integración o end-to-end.

Test unitarios: TopologyTestDriver

Dentro del ecosistema de Kafka Streams disponemos de una librería de utilidades de tests que nos facilita bastante la tarea:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams-test-utils</artifactId>
  <scope>test</scope>
</dependency>

Esta librería nos da acceso a TopologyTestDriver. Mediante TopologyTestDriver podemos escribir casos de prueba de forma fácil, ya que, sin necesidad de provisionar una instancia de Kafka, nos proporciona capacidades para simular el entorno en el que se debe ejecutar nuestra topología.

Además, simula los topics de entrada y salida que maneja la topología y proporciona métodos tanto para mandar a los topics el flujo de datos que necesitemos para nuestros tests, como métodos para poder leer de los topics de salida y comprobar los resultados. Además, dispone de métodos para manejar el tiempo de reloj pudiendo, de esta forma, simular escenarios en los que se manejan espacios temporales para el procesado de información.

La ejecución de estos tests es muy ligera y rápida porque, como hemos indicado, no necesita una instancia de Kafka levantada. La siguiente imagen muestra, a modo resumen, qué es TopologyTestDriver:

No vamos a entrar en detalle sobre TopologyTestDriver en este post ya que se alargaría demasiado pero, si tienes interés, puedes echar un vistazo a los casos de prueba que hemos definido en el repositorio.

Test de integración: Testcontainers

TopologyTestDriver es una herramienta muy buena para generar casos de tests sencillos, pero tiene una limitación importante: a día de hoy solo permite simular topics de una única partición. Además, no simula latencias, cacheos, etc. que sí se producen en una instancia real de Kafka.

Para solventar estas carencias, necesitamos implementar otra serie de tests más pesados, que sí corran sobre una instancia de Kafka. Tenemos diferentes alternativas como usar un Kafka embebido o utilizar Testcontainers.

En este post vamos a optar por esta última, ya que nos parece la más interesante porque nos apoyamos en toda la potencia del uso de contenedores y aproximamos todo lo posible el caso de prueba al entorno real.

El uso de TestContainers es muy sencillo, simplemente añadimos las siguientes dependencias a nuestro proyecto:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.17.3</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.17.3</version>
    <scope>test</scope>
</dependency>

Después, indicamos en nuestro test que vamos a usar TestContainers y el “contenedor” que queremos usar para los tests:

@Testcontainers
@SpringBootTest
@DirtiesContext
public class FraudCheckerTestContainers {

    @Container
    private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.2.1"));
    —
    …

    @Test
    public void shouldNotDetectFraudCases() {
    ….

Una vez que hemos configurado el contenedor que nos proporcionará la instancia de Kafka, a diferencia con TopologyTestDriver que nos facilita métodos para publicar y consumir eventos, con Testcontainers tendremos que crear nuestros consumidores y productores y, además, gestionar el envío de los mensajes de forma manual.

Como hemos hecho para TopologyTestDriver, no vamos a entrar en detalle de cómo usar TestContainers para desarrollar casos de prueba orientados a streaming, pero si tienes más interés, puedes echar un vistazo a los casos de prueba que hemos definido en el repositorio.

Conclusiones

Hemos visto cómo implementar un caso de detección de fraude utilizando una solución como Kafka Streams. Kafka Streams es una librería muy fácil de incorporar y que nos proporciona un montón de posibilidades dentro del mundo Kafka a la hora de resolver casos de uso que necesitan procesar streams de eventos en tiempo real.

En posteriores artículos, implementaremos este mismo caso utilizando otras soluciones como Processor API o ksqlDB y haremos una comparativa de todas estas tecnologías.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.