Seguimos con la serie de artículos en el que se muestran distintas alternativas para la construcción de aplicaciones basadas en el procesamiento de flujos de datos en Kafka.

Esta vez, vamos a centrarnos en ksqlDB, otra alternativa que facilita la creación de este tipo de aplicaciones. Para ello, nos introduciremos en esta tecnología e implementaremos un caso de uso que nos permita entender su naturaleza y qué puede aportar respecto a otras tecnologías.

Contextualizando ksqlDB

ksqlDB es una tecnología que fue lanzada por Confluent en el año 2017, alrededor de un año después de que Kafka Streams se introdujera en el ecosistema Kafka. En un principio, el objetivo con el que fue diseñado ksqlDB fue el mismo con el que fue diseñado Kafka Streams, el de simplificar el proceso de creación de soluciones para el procesamiento de flujos de datos. No sorprende, ya que ksqlDB está basado en Kafka Streams.

Esto nos puede hacer dudar sobre la necesidad de ksqlDB, debido a la ya existencia de una tecnología como Kafka Streams que ya cumple, y de forma eficaz, con ese mismo objetivo. Entonces, ¿por qué ksqlDB?

Bien, más adelante hablaremos más profundamente de las distintas características que le hace tan especial. Sin embargo, es importante destacar que a medida que ha ido evolucionando ksqlDB, quedó patente que sus objetivos eran bastante más ambiciosos que los de Kafka Streams.

No solo se centra en simplificar la creación de soluciones para el procesamiento de flujos de datos, sino también en la forma en la que estas soluciones se integran con otros sistemas, ya sean internos o externos a Kafka, y todo ello a través de una interfaz muy amigable. Esto facilita que todo tipo de usuarios puedan aprovechar las características del ecosistema Kafka, independientemente de su nivel de experiencia.

¿Esto quiere decir que Kafka Streams y ksqlDB son tecnologías incompatibles? Para nada. Ambas son excelentes herramientas para el procesamiento de flujos de datos y se complementan entre sí bastante bien, dadas las posibilidades que ofrecen tanto una como otra.

En próximos artículos publicaremos una comparativa entre las distintas posibilidades que ofrece el ecosistema Kafka para la creación de soluciones para el procesamiento de flujos de datos, donde hablaremos de forma más extendida sobre esto.

¿Qué es y cómo funciona ksqlDB?

Una vez contextualizado el origen, vamos a ver en qué consiste esta tecnología.

ksqlDB es una base de datos para streaming de eventos de código abierto, basada en Kafka Streams y que, como hemos comentado anteriormente, simplifica la forma en la que se construyen, implementan y se mantienen las aplicaciones para el procesamiento de flujos de datos.

Para conseguirlo, es capaz de integrar dos componentes del ecosistema Kafka (Kafka Streams y Kafka Connect) en un único sistema y ofrecer una interfaz SQL de alto nivel para interactuar con dichos componentes.

Es interesante señalar que ksqlDB se beneficia tanto de la estabilidad como de la madurez de ambos componentes, ya que son productos muy consolidados y con unas prestaciones muy interesantes. Pero no solo eso, también proporciona todos los beneficios de una interfaz más amigable.

Arquitectura

Teniendo en cuenta que ksqlDB se basa en Kafka Streams, os invitamos a revisar los artículos mencionados anteriormente para poder tener una visión, con un grado de abstracción menor, de cómo funciona dicha tecnología. Aquí podéis ver Kafka Streams DSL y en este post Kafka Streams Processor API.

En nuestro caso, vamos a centrarnos de una forma resumida en los componentes específicos de ksqlDB, de manera que quede claro el objetivo de cada componente y su funcionamiento.

Como hemos adelantado, estos componentes son los encargados de ofrecer esa mayor abstracción respecto a la tecnología de Kafka Streams y una interfaz más amigable para la construcción de este tipo de aplicaciones.

El componente ksqlDB Server es el componente principal de la tecnología, ya que contiene el motor (engine) y la API Rest.

Las distintas instancias de ksqlDB Server son las que nos van a permitir comunicarnos con el clúster de Kafka. Al igual que otros componentes del ecosistema, podemos escalar dichas instancias sin la necesidad de parar el sistema.

El otro componente que completa ksqlDB es ksqlDB CLI, que proporciona una interfaz por línea de comandos que permite a los usuarios interactuar con el motor de ksqlDB Server a través de la API Rest de ksqlDB Server. De esta forma, puede desarrollar las distintas aplicaciones a través de declaraciones SQL.

La CLI de ksqlDB está diseñada para que sea fácil e intuitiva, por lo que resulta familiar a los usuarios de aplicaciones de base de datos, como PostgreSQL, MySQL, etc.

Existe la posibilidad de tener otro tipo de clientes que permitan hacer uso de ksqlDB a través de su API, como la interfaz web que facilita Confluent en su Control Center, o incluso una desarrollada por usuarios.

Implementando un caso de uso con ksqlDB

Una vez presentados los distintos componentes de ksqlDB y lo que aporta cada uno, no hay mejor forma de entender su funcionamiento que a través de implementar un caso de uso con esta tecnología.

Como hemos comentado anteriormente, este artículo es parte de una serie de artículos sobre soluciones para el procesamiento de flujos de datos, por lo que solamente nos vamos a centrar en esta parte en concreto de ksqlDB, obviando la parte de integración con otros sistemas. Por ello, vamos a ver cómo se implementa con ksqlDB el caso de uso visto en los anteriores artículos de la serie: la detección de fraude en tiempo real.

Recordando el caso de uso

En el artículo inicial de la serie se explica al detalle el caso de uso de control de fraude en tiempo real que queremos implementar con ksqlDB.

En resumen, nos encontramos en un escenario en el que una organización necesita detectar en tiempo real las operaciones de intento de fraude en las transacciones con tarjeta para evitar problemas derivados de esta práctica.

Esta organización maneja una gran cantidad de movimientos debido a las operaciones que los clientes realizan diariamente con sus tarjetas desde distintas fuentes, y el proceso batch actual implementado para la detección de fraude ha quedado obsoleto, impidiendo reaccionar a tiempo para controlar las operaciones fraudulentas.

¿Cómo podemos reducir el tiempo de detección de fraude, tendiendo a tiempo real, con ksqlDB? Bien, para ello vamos a seguir el diseño definido durante toda la serie de artículos, donde lo único que va a cambiar será la forma en la que se implementa el nuevo proceso de detección de fraude, esta vez con ksqlDB.

De esta forma, vamos a ser capaces de detectar el fraude en tiempo real para, a posteriori, tomar medidas al respecto con esa información.

Implementando la solución

¿Cómo hacemos este proceso de detección de fraude con ksqlDB? La realidad es que es bastante más sencillo de lo que parece, ya que la capacidad de abstracción que nos proporciona ksqlDB es muy útil para poder implementar procesos de este tipo.

Como se ha definido en el diseño de la solución, el proceso de fraude va a leer los movimientos que se están produciendo en las tarjetas, va a procesar estos movimientos teniendo en cuenta las reglas de negocio para detectar movimientos fraudulentos e informará sobre los posibles casos de fraude que se hayan podido dar.

Teniendo esto claro, vamos a ver cómo se pueden implementar estas soluciones con ksqlDB.

Recogida de información

El proceso de fraude comienza por recuperar todos los movimientos que se están produciendo a través de las tarjetas de crédito. Como muestra el diseño definido, la información de estos movimientos llegan en tiempo real a un topic de Kafka llamado movements.

Por consiguiente, lo primero que tenemos que hacer con ksqlDB es generar una sentencia SQL en ksqlDB CLI (o en el cliente que más nos guste) para crear un stream que lea los movimientos.

Para ello, usamos la sintaxis de ksqlDB para construir dicha sentencia teniendo en cuenta la información del topic que necesitamos obtener.

CREATE STREAM stream_movements (id STRING, card STRING, amount DECIMAL(5,2), origin INTEGER, site STRING, device STRING, createdAt STRING) WITH (KAFKA_TOPIC = 'movements', VALUE_FORMAT = 'JSON');

De esta forma, ya tendríamos un proceso stream_movements que se encarga de leer constantemente el topic anterior.

A partir de aquí ya estamos preparados para realizar el siguiente paso, que es la detección de fraude aplicando las reglas de negocio.

Procesado de la información y aplicación de reglas de negocio

Para aplicar las reglas de fraude definidas, va a ser necesario discernir entre tipos de movimientos, ya que cada uno de ellos va a tener unas reglas para el control de fraude diferentes.

Con ksqlDB es muy sencillo diferenciar entre ambos tipos con las sentencias SQL, ya que simplemente hay que crear un nuevo stream por cada tipo a partir del stream_movements generado anteriormente, diferenciando cada movimiento a partir de su información y enviándolo a su topic correspondiente.

CREATE STREAM stream_movements_online WITH (KAFKA_TOPIC = 'movements_online',TIMESTAMP = 'createdAt', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss z') AS SELECT * FROM stream_movements WHERE ORIGIN = 3 PARTITION BY CARD;
CREATE STREAM stream_movements_physical WITH (KAFKA_TOPIC = 'movements_physical',TIMESTAMP = 'createdAt', TIMESTAMP_FORMAT='yyyy-MM-dd HH:mm:ss z') AS SELECT * FROM stream_movements WHERE ORIGIN != 3 PARTITION BY CARD

En estas sentencias conviene destacar y detallar algunas de sus partes, ya que son muy importantes para el devenir del proceso.

De esta forma, nuestro proceso no solo será capaz de leer los movimientos que vayan llegando, sino que también será capaz de separar los movimientos de distinto tipo.

Una vez tenemos los eventos separados y preparados en los distintos topics, vamos a agrupar dichos eventos por la clave con el objetivo de verificar los movimientos fraudulentos. Además, vamos a definir una ventana de sesión para que la agrupación realizada tenga en cuenta los movimientos en un determinado período de tiempo, para ajustar el caso de uso a los problemas de fraude reales.

A priori puede parecer una tarea compleja, pero con ksqlDB vuelve a resultar sencillo implementar una sentencia SQL para conseguir realizar este proceso. Esta vez vamos a crear una tabla que nos permita, en ese período de ventana definido, obtener la última actualización de la agrupación.

CREATE TABLE table_movements_online AS SELECT CARD, AS_VALUE(CARD) AS CARD_NUMBER, COUNT(ID) as num_movements, SUM(AMOUNT) as total_amount, 'ONLINE' as description, COLLECT_SET("ID") as movements, COLLECT_SET("DEVICE") as devices, COLLECT_SET("SITE") as sites FROM stream_movements_online WINDOW SESSION (60 seconds) GROUP BY CARD EMIT FINAL;

Para no añadir mayor complejidad a la lectura, a partir de este punto se van a mostrar únicamente las sentencias realizadas para los movimientos de origen ONLINE, ya que son similares a los otros orígenes.

Conviene destacar algunas de las partes de la sentencia, ya que es donde está la mayor complejidad de este proceso.

Si se necesita entender el concepto de extractor de tiempo de un evento y las ventanas de sesión en profundidad, o incluso de los conceptos de stream y tabla, podéis ir al artículo donde se implementa este mismo caso de uso con Kafka Streams. Allí está explicado de forma mucho más detallada.

En este punto ya hemos realizado las operaciones más difíciles del proceso, ya que disponemos de la información agrupada dada una ventana de tiempo y con los cálculos relevantes para la operación de control de fraude.

Para finalizar con esta etapa del proceso, simplemente nos faltaría convertir la tabla en stream para poder operar sobre esa información.

CREATE STREAM stream_possible_fraud_movements_online (CARD_NUMBER STRING, TOTAL_AMOUNT DECIMAL(5,2), NUM_MOVEMENTS INTEGER, MOVEMENTS ARRAY<String>, DEVICES ARRAY<STRING>, SITES ARRAY<STRING>, DESCRIPTION STRING) WITH (KAFKA_TOPIC = 'TABLE_MOVEMENTS_ONLINE', VALUE_FORMAT = 'JSON');

Quemando estas etapas, simplemente quedaría filtrar la información dependiendo de si consideramos que la agrupación de movimientos realizada tiene riesgo de fraude o no.

Salida de información

Vamos a aprovechar para realizar dos operaciones en una: filtrar los movimientos agrupados con riesgo de fraude y enviarlos a un topic destino, donde quedarán registrados y la organización tomará las medidas al respecto.

Las reglas definidas para saber si una agrupación de movimientos son potencialmente fraudulentas son las mismas que hemos visto para los ejemplos de Kafka Streams y Processor API:

ONLINE: Total de dinero gastado en 60sg > 200 && Número de movimientos > 3

PHYSICAL: Número de dispositivos distintos > 1 || Número de movimientos > 4

Con ksqlDB simplemente tenemos que crear un nuevo stream a partir del generado en la etapa anterior y ver si se cumplen estas condiciones.

CREATE STREAM stream_fraud_movements_online WITH (KAFKA_TOPIC = 'fraud-cases') AS SELECT * FROM stream_possible_fraud_movements_online WHERE total_amount > 200 AND num_movements > 3

Las agrupaciones que cumplan el caso total_amount > 200 AND num_movements > 3, se enviarán al topic fraud-cases.

En este punto ya tenemos todo el proceso de control de fraude implementado, por lo que a partir de aquí se pueden capturar los movimientos con índices de fraude.

Al final, como en las anteriores maneras de crear soluciones para el procesamiento de flujos de datos, ksqlDB nos genera una tipología que construye nuestro proceso de detección de fraude.

Para verificar cómo está respondiendo ksqlDB, la consola de ksqlDB Server y el propio Kafka pueden resultar de gran ayuda, ya que los distintos problemas que pueden ocurrir debido a un mal funcionamiento pueden verse notificados en su propio log.

Todo el código del caso de uso está accesible en el siguiente repositorio.

Conclusión

Como hemos ido viendo durante el artículo, ksqlDB ofrece una abstracción de alto nivel, con una interfaz más amigable y una arquitectura más simple que otras tecnologías.

Esto proporciona muchos beneficios en su uso, desde la incorporación de nuevos desarrolladores al mundo del procesamiento del dato (ya que la barrera de entrada es más baja), hasta una mayor productividad debido a su nivel de abstracción, que nos permite evitar otro tipo de complejidades.

Entonces, ¿siempre hay que utilizar ksqlDB?

Depende. Si vemos que nuestras aplicaciones para procesar flujos de datos pueden aprovechar los beneficios de ksqlDB, y dichas aplicaciones pueden expresarse de forma natural y simple en SQL, naturalmente es una muy buena elección.

En cambio, hay ocasiones en que necesitamos ir a un nivel más bajo de abstracción para tener más flexibilidad en nuestros procesos, o incluso realizar tareas más complejas que ksqlDB no puede abarcar. En esos casos, Kafka Streams o incluso processor API son una mejor elección.

Si comparamos las distintas soluciones implementadas en la serie, podemos ver varias diferencias relacionadas con la forma de plantear la solución. No solo las evidentes, sino también las relacionadas con la forma de crear las tipologías, las acciones realizadas para cumplir con los objetivos, etc.

En el próximo capítulo compararemos todas estas tecnologías para poder tener una visión global que nos permita decidir cuál se adapta mejor a nuestras necesidades.

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.