Las APIs de Apache Kafka están siendo tan ampliamente utilizadas que hay quienes dicen que se han convertido en un estándar de facto para Event Streaming de igual manera que las APIs de S3 lo son para el almacenamiento de objetos.

Estas APIs son un punto de entrada a través del cual podemos empezar a comunicarnos y utilizar Kafka. Y no sólo existe un API, sino que hay 5 que nos hacen más sencillo escribir aplicaciones que interactúen con Kakfa. Aunque existen múltiples clientes open source para trabajar con Kafka en diferentes lenguajes, sólo el cliente Java está mantenido y soportado dentro del proyecto core de Kafka.

Las 5 APIs son las siguientes:

Para profundizar un poco más en el uso de estas APIs vamos a ver cómo podemos utilizar las de Producer y Consumer para implementar un productor y un consumidor de ejemplo.

Producer

Comenzaremos por el productor. Lo primero que vamos a hacer es crear un topic de prueba con dos particiones:

kafka-topics --bootstrap-server localhost:9092 --create --topic producer_test --partitions 2 --replication-factor 1

En nuestro proyecto Java incluiremos la dependencia correspondiente. Para ello, nosotros utilizamos gradle. El paquete kafka-clients engloba tanto la API Producer como la API Consumer.

dependencies {
    implementation 'org.apache.kafka:kafka-clients:2.8.0'
    testImplementation 'junit:junit:4.12'
}

A continuación, escribimos el código de nuestro productor dentro de una simple clase Main. En el inicio establecemos las propiedades de configuración del productor: la primera propiedad hace referencia a la url del servidor bootstrap. Como en este ejemplo vamos a ejecutar este código sobre el propio broker simplemente utilizaremos la referencia a localhost con el puerto 9092. Las dos siguientes propiedades son los serializadores por defecto para la clave y el valor de los registros que vamos a producir, ambos serán de tipo string. La última configuración que incluímos es la configuración de los ACKs. Esta confirmación marca que un registro ha sido entregado desde el productor al clúster de Kafka. Cuando lo configuramos como aquí, a “all”, significa que esa confirmación se produce cuando el registro ha sido recibido y procesado por el broker líder y, además, las réplicas sincronizadas confirman que lo han recibido. Esta configuración será la que nos ofrezca un mayor nivel de integridad de los datos. Pero, ojo, porque esto no significa que sea la mejor para todos los casos de uso.

Ahora lo que vamos a hacer es producir dentro de un bucle 100 registros con valores del 0 al 99. Utilizamos una variable para establecer el valor de la partición a la que queremos enviar el registro. Esto es algo que podremos hacer si queremos tener el control sobre dónde se va a escribir la partición. Si no, los registros que creemos se repartirán mediante round robin en las particiones disponibles. Nosotros escribiremos los primeros 50 registros en la partición 0 y los siguientes en la 1.

Invocamos al método producer.send para enviar el registro al clúster de Kafka. Podíamos hacer la invocación sin más, pero hemos querido incluir un callback con una expresión Lambda que será invocada cuando nuestro registro sea confirmado. Esta función recibe un parámetro con metadatos y una excepción. Si ocurre algo que haga saltar la excepción podremos gestionarla, si no simplemente imprimimos los valores del registro y los metadatos por la salida estándar. Evidentemente, en un caso de uso real, podríamos procesar y trazar esta información de una manera más completa que nos sirviera para resolver cualquier problema que pudiéramos tener más adelante.

La última instrucción de nuestro código es producer.close para cerrar el productor.

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerMain {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("acks", "all");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            int partition = 0;
            if (i > 49) {
                partition = 1;
            }
            ProducerRecord record = new ProducerRecord<>("producer_test", partition, "count", Integer.toString(i));
            producer.send(record, (RecordMetadata metadata, Exception e) -> {
                if (e != null) {
                    System.out.println("Error publishing message: " + e.getMessage());
                } else {
                    System.out.println("Published message: key=" + record.key() +
                            ", value=" + record.value() +
                            ", topic=" + metadata.topic() +
                            ", partition=" + metadata.partition() +
                            ", offset=" + metadata.offset());
                }
            });
        }

        producer.close();
    }

}

Para probar que nuestro productor funciona bien podemos ejecutar el código anterior y levantar un consumidor de Kafka por consola mediante el siguiente comando:

kafka-console-consumer --bootstrap-server localhost:9092 --topic producer_test --from-beginning

Consumer

Un consumidor lee datos de topics de Kafka y al igual que hemos hecho antes con el productor podemos crearlo en Java utilizando la API Consumer. Vamos a ver, mediante un ejemplo muy simple, cómo podemos consumir datos de varios topics y acceder a algunos metadatos de los mensajes que recibimos.

Comenzamos creando un par de topics de prueba, consumer_test1 y consumer_test2, con dos particiones cada uno:

kafka-topics --bootstrap-server localhost:9092 --create --topic consumer_test1 --partitions 2 --replication-factor 1
kafka-topics --bootstrap-server localhost:9092 --create --topic consumer_test2 --partitions 2 --replication-factor 1

Al inicio, establecemos las propiedades de configuración. Como hacíamos en el productor, configuramos el servidor bootstrap y el group ID. La novedad aquí es el uso del parámetro auto commit a false. Básicamente, la manera que tiene un consumidor de indicar al clúster de Kafka que ha terminado de procesar un registro o un conjunto de ellos es haciendo commit del offset de esos registros. Si establecemos el auto commit a true lo que hace el consumidor es enviar actualizaciones de forma automática cada cierto tiempo. Desactivando el auto commit lo que estamos haciendo es evitar que el consumidor notifique de manera automática al clúster los registros que ha consumido. Así vamos a poder ver en este ejemplo cómo se hace el commit manual.

Después instanciamos el consumidor con esas propiedades y nos suscribimos a la lista de topics que queremos leer. Así hacemos que el consumidor escuche continuamente si hay nuevos mensajes en los topics consumer_test1 y consumer_test2.

Para leer los mensajes creamos un bucle infinito y dentro hacemos polling en busca de nuevos mensajes. El parámetro que va dentro de la instrucción de polling es el timeout. Nuestro consumidor esperará 100ms y luego volverá a sondear. Hay que tener en cuenta que esta instrucción no nos devuelve necesariamente un registro cada vez, sino que nos puede devolver cualquier número de registros. Podrían ser todos los registros nuevos desde el último sondeo. Por eso, para procesar todos esos registros que nos lleguen, utilizamos otro bucle. Y como esto es simplemente un ejemplo, hacemos algo tan “tonto” como imprimir los datos por la salida estándar. Además de los datos del registro también imprimiremos algunos metadatos, como el topic, la partición o el offset para saber de dónde viene.

Como vimos anteriormente, habíamos deshabilitado el commit automático; por tanto, ahora nos tocará hacer el commit manual del offset. Tras hacer polling, obtener todos los registros y procesarlos, eso es lo que hacemos con consumer.commitSync en cada iteración. Si hiciéramos commit antes de procesar los registros, en caso de error no tendríamos la forma de saber qué registros se han procesado correctamente y cuáles no. Pero de esta manera, sí podemos controlar eso y reprocesar únicamente los registros necesarios.

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerMain {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "group1");
        props.setProperty("enable.auto.commit", "false");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("consumer_test1", "consumer_test2"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("key=" + record.key() + ", value=" + record.value() + ", topic=" + record.topic() + ", partition=" + record.partition() + ", offset=" + record.offset());
            }
            consumer.commitSync();
        }  
    }

}

Para verificar que nuestro consumidor es capaz de leer los mensajes que se escriban en esos dos topics podemos abrir un par de productores a través de la consola con los siguientes comandos:

kafka-console-producer --broker-list localhost:9092 --topic consumer_test1
kafka-console-producer --broker-list localhost:9092 --topic consumer_test2

En esta introducción a las APIs de Kafka hemos conocido cuáles son y para qué sirven cada una de ellas. Y creando un productor y un consumidor muy sencillos hemos visto algunas de las funcionalidades y características más importantes que nos ofrecen las APIs Producer y Consumer.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.