La necesidad de construir aplicaciones que gestionen datos en tiempo real crece cada día. Y para desarrollar esos sistemas el uso de Apache Kafka o de Confluent, su versión con soporte de pago, cada vez es más habitual.

Anteriormente hemos hablado en nuestro blog acerca de las características fundamentales de Kafka y de sus principales componentes. Hoy queremos dar otro paso más allá dentro este ecosistema y ver qué nos ofrece Kafka Streams.

¿Qué es Kafka Streams?

Kafka Streams es una biblioteca open source, que facilita la construcción de aplicaciones para procesar flujos de datos utilizando Apache Kafka como sistema de almacenamiento de datos de entrada y de salida.

Las aplicaciones que desarrollemos con Kafka Streams podrán realizar procesamiento en streaming, es decir, podrán procesar los datos de forma continua, tan pronto como estén disponibles para su análisis. De esta forma, se procesa de manera secuencial sobre flujos de datos sin límites temporales.

Kafka Streams se basa en la mensajería de Kafka para permitir procesar datos en tiempo real. Pero mientras un productor Kafka sólo publica datos en un topic, y un consumidor únicamente consume datos de topics, las aplicaciones Kafka Streams pueden utilizar uno o varios topics como entrada, realizar algún tipo de transformación o procesado de esos datos y dejar el resultado como salida en otro u otros topics.

KStreams Vs. KTables

Los streams y las tablas son las dos maneras de modelar datos que tenemos en Kafka Streams.

Los KStreams se utilizan para modelar datos, donde cada registro es una pieza autocontenida de los datos dentro de un conjunto de datos sin consolidar. Esto debe entenderse como un flujo de registros, donde los nuevos registros no reemplazarán una parte de los datos existentes con un nuevo valor. Los streams contienen una historia o una secuencia de los datos.

Las KTables contienen registros que representan un estado actual y que pueden ser sobrescritos o actualizados. No tienen la historia de los cambios en los datos, sino que representan un estado.

El mejor símil para explicar la diferencia entre un stream y una tabla lo encontramos en el ajedrez. Un listado de las jugadas que se van produciendo durante la partida es similar a un stream, mientras que el propio tablero sería muy parecido a una tabla.

Transformaciones

Las transformaciones que se pueden realizar mediante Kafka Streams se subdividen en dos grandes grupos:

Transformaciones Stateless

Branch: Permite dividir un stream en múltiples streams utilizando los predicados que le proporcionemos. Es importante tener en cuenta que los predicados se evaluarán en orden, así que el registro irá a uno y sólo a uno de los streams de salida, en concreto al primero que haga match.

A continuación, vamos a ver un ejemplo donde el primer stream resultante tendría todos los registros que comiencen por “A”, el segundo todos los que comiencen por “B” y el tercero, todos los demás.

El código para hace esto sería:

KStream<String, Long> stream = ...;
KStream<String, Long>[] branches = stream.branch(
    (key, value) -> key.startsWith("A"), /* first predicate  */
    (key, value) -> key.startsWith("B"), /* second predicate */
    (key, value) -> true                 /* third predicate  */
  );

Filter: Elimina registros de un stream. Todo aquel registro para el que la expresión lambda no se evalúe como verdadera, será eliminado del stream.

También existe la transformación Inverse Filter, que hace lo opuesto: mantiene solamente los registros para los que la expresión se evalúe como falsa.

Por ejemplo, imaginemos que queremos eliminar del stream todos los registros negativos. Se haría de la siguiente manera:

KStream<String, Long> stream = ...;
KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);

Flat Map: Lo que hace flat map es transformar un registro en un conjunto diferente de registros. A partir de un registro, puede devolver 0 registros o N registros.

Veamos cómo utilizarlo con un ejemplo. Generaremos 2 registros a partir de cada registro de entrada. En los registros de salida, incluirá como clave el valor que tenía la entrada (en uno en mayúsculas y en el otro en minúsculas) y le dará los valores fijos de 1000 y 9000 respectivamente: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000).

KStream<Long, String> stream = ...;
KStream<String, Integer> transformed = stream.flatMap(
    (key, value) -> {
      List<KeyValue<String, Integer>> result = new LinkedList<>();
      result.add(KeyValue.pair(value.toUpperCase(), 1000));
      result.add(KeyValue.pair(value.toLowerCase(), 9000));
      return result;
    }
  );

Foreach: Realiza cualquier operación stateless que le indiquemos sobre cada registro de un stream. Es una operación terminal, lo que significa que no podremos realizar ningún procesamiento adicional sobre el stream después de utilizar foreach.

KStream<String, Long> stream = ...;
stream.foreach((key, value) -> System.out.println(key + " => " + value));

En este caso, sólo se imprime el contenido del stream por consola.

Group by y Group by key: Como indica su propio nombre, agrupan registros por su clave. El primero, mediante una nueva clave y el segundo, mediante la actual. Estas dos transformaciones se suelen utilizar como primer paso en transformaciones stateful, pero ellas mismas son stateless.

KStream<byte[], String> stream = ...;
KGroupedStream<byte[], String> groupedStream = stream.groupByKey(
    Serialized.with(
      Serdes.ByteArray(), /* key */
      Serdes.String())     /* value */
  );

Map: Similar a Flat Map pero en este caso sirve para procesar un registro, y siempre devolverá exactamente un registro.

En el siguiente ejemplo vamos a ver cómo cambiar la clave y el tipo de la clave, así como el valor y el tipo del valor.

KStream<byte[], String> stream = ...;
KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));

Merge: La operación opuesta a Branch. Coge 2 streams y los fusiona en uno.

KStream<byte[], String> stream1 = ...;
KStream<byte[], String> stream2 = ...;
KStream<byte[], String> merged = stream1.merge(stream2);

Peek: Realiza una acción sin estado sobre cada registro del stream.

KStream<byte[], String> stream = ...;
KStream<byte[], String> unmodifiedStream = stream.peek(
    (key, value) -> System.out.println("key=" + key + ", value=" + value));

En este caso, la acción que realiza es una impresión de los datos por la salida estándar.

Como hemos visto hasta ahora, las transformaciones stateless nos permiten procesar registros de manera individual. Pero, ¿qué pasa si necesitamos información de múltiples registros a la vez? Para eso tenemos las transformaciones stateful, veamos qué podemos hacer con ellas.

Transformaciones Stateful

Dentro de estas transformaciones veremos los siguientes grupos:

De forma general, las agregaciones utilizan como paso previo las transformaciones stateless groupByKey y groupBy. Existen 3 tipos de agregaciones: Aggregate, Count y Reduce.

Aggregate: Es la forma general de realizar una agregación. Se puede utilizar para múltiples propósitos, ya que aplica sobre un grupo de registros la función de agregación con la lógica que nosotros le especifiquemos. Cuando agregamos un stream agrupado, debemos pasarle un iniciador (establece el valor inicial para el valor agregado) y una función de agregación (la lógica de nuestra agregación).

En este ejemplo iremos sumando las longitudes de los valores de todos los registros que comparten la misma clave.

KGroupedStream<byte[], String> groupedStream = ...;
// Agregando un KGroupedStream (los tipos de los valores cambian de String a Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    () -> 0L, /* iniciador */
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* suma */
    Materialized.as("aggregated-stream-store") /* nombre del almacén */
        .withValueSerde(Serdes.Long()); /* Serdes (Serializador Deserializador) para el valor agregado*/

Count: Cuenta el número de registros que existen para cada clave. El valor de esta cuenta es de tipo Long. Y el resultado siempre será un KTable por lo que, si quisiéramos escribirlo en un topic de salida, deberíamos convertirlo antes en un stream.

KGroupedStream<String, Long> groupedStream = ...;
// Contando una KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.count();

Reduce: Sirve para combinar todos los registros que comparten la misma clave en un solo registro.

KGroupedStream<String, Long> groupedStream = ...;
// Reducing a KGroupedStream
KTable<String, Long> aggregatedStream = groupedStream.reduce(
    (aggValue, newValue) -> aggValue + newValue /* adder */);

El siguiente tipo de transformación stateful son los Joins. Los joins permiten combinar streams en un nuevo stream de diferentes maneras. Si estás habituado a trabajar con bases de datos relacionales y SQL, muchos de los términos y conceptos que usaremos relacionados con los joins te resultarán muy familiares.

Los joins combinan los registros de dos streams teniendo en cuenta las claves comunes entre ellos. Los topics que utilicemos en los joins deben tener exactamente el mismo número de particiones y deben seguir la misma estrategia de partición. Es decir, deben estar coparticionados. De esa manera se puede hacer el join entre ellos fácilmente.

Podemos evitar las necesidades de coparticionado utilizando una KTable global. Esto nos permitirá que múltiples instancias de una aplicación de Streams tengan una copia de todos los datos de todas las particiones en lugar de tener los de solo una partición.

Existen tres tipos diferentes de joins: Inner Join, Left Join y Outer Join. Veamos cada uno de ellos:

Inner Join: Mezcla los registros que tienen una clave común, y su valor lo determinará la función que le indiquemos. Un inner join contendrá sólo aquellos registros que están en las 2 entidades que estamos mezclando. Si una clave sólo existe en uno de los topics de entrada, pero no en el otro, entonces no estará en el resultado del Inner Join. Se invoca mediante la función join sobre un stream, pasándole otro como argumento. Y el segundo parámetro (ValueJoiner) será una función lambda que se usará para determinar el valor de los registros combinados.

En este ejemplo, el valor resultante será la concatenación de los valores de ambos registros. Como estamos haciendo un join de 2 KStreams, tenemos que hacer windowing (más adelante explicamos este concepto). Por eso incluimos la sentencia JoinWindows.of(Duration.ofMinutes95).

KStream<String, Long> left = ...;
KStream<String, Double> right = ...;

KStream<String, String> joined = left.join(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.of(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Left Join: Contendrá todos los registros que estén presentes en el topic sobre el que invocamos la función leftJoin y combinará los valores que estén presentes en ambos topics (que compartan clave común). Es similar al Inner Join, pero aquí se incluyen todos los registros del topic sobre el que se invoca aunque no estén en el topic que le pasamos como argumento.

KStream<String, Long> left = ...;
KStream<String, Double> right = ...;
KStream<String, String> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.of(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Outer Join: Contendrá todos los registros de ambos topics y combinará sus valores si ambas claves existen en los dos topics. Pero si alguna clave está solo en uno de los topics, ese registro se incluirá igualmente en el resultado del join. Se invoca mediante la función outerJoin.

KStream<String, Long> left = ...;
KStream<String, Double> right = ...;
KStream<String, String> joined = left.outerJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
    JoinWindows.of(Duration.ofMinutes(5)),
    Joined.with(
      Serdes.String(), /* key */
      Serdes.Long(),   /* left value */
      Serdes.Double())  /* right value */
  );

Comentemos ahora en qué consiste el Windowing. Podría traducirse como “utilizar un sistema de ventanas”. El windowing nos permite subdividir grupos de registros utilizando sus timestamps.

Dependiendo de la configuración de tiempo que se utilice, podemos tener 4 tipos diferentes de ventanas:

Ejemplo de ventanas Tumbling time de 5 minutos.
Ejemplo de ventanas Tumbling time de 5 minutos.
Ejemplo de ventanas Hopping time de 5 minutos con saltos de 1 minuto.
Ejemplo de ventanas Hopping time de 5 minutos con saltos de 1 minuto.

Otro concepto importante es el de los registros tardíos (late arriving records). Es posible recibir los datos desordenados y, a causa de eso, un registro que por su timestamp debería estar dentro de una ventana, por problemas de latencia o porque algo va mal, no se procesa hasta pasados unos minutos después de que la ventana se haya cerrado. A este registro se le denomina late arriving record. Para garantizar que esos registros se procesan, Kafka tiene el período de retención de una ventana. Tras el cierre de una ventana, Kafka Streams mantendrá los buckets de la ventana durante un tiempo definido. Si durante ese tiempo llegan tarde algunos registros, se procesarán e incluirán en esa ventana. Cuando el período de retención acaba, Kafka Streams se deshace de todos los datos de la ventana y, si llega algún registro más, no se procesará ni se incluirá en la ventana.

Ejemplo de ventanas de Sesión con intervalos de inactividad de 5 minutos y late arriving records.
Ejemplo de ventanas de Sesión con intervalos de inactividad de 5 minutos y late arriving records.

Casos de uso

El streaming de datos nos abre un abanico muy amplio de posibilidades:

Ventajas y desventajas

Entre las ventajas podemos citar las siguientes

Y como desventajas:

Como hemos visto, Kafka Streams nos ofrece una solución escalable y con un gran rendimiento que puede utilizarse prácticamente en cualquier aplicación actual. Ha sido desarrollada pensando en que su uso resultara muy sencillo. Proporciona multitud de herramientas para construir aplicaciones que procesen datos en tiempo real. Nos ofrece aquellas funcionalidades que pueden resultar más habituales y nos permite implementar nuestras propias funcionalidades.

En Kafka, sin utilizar Kafka Streams, también podríamos consumir datos en tiempo real, procesarlos y volverlos a escribir de nuevo en el clúster. Pero con Kafka Streams resulta mucho más fácil y rápido, no tenemos que preocuparnos de utilizar las APIs de Consumer y Producer, únicamente tenemos que dedicar nuestro tiempo a lo que de verdad nos importa, la lógica de nuestra aplicación.

Cuéntanos qué te parece.

Enviar.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.