La necesidad de integración entre distintos sistemas ha originado la existencia de un amplio abanico de posibilidades que nos ayudan a solventar dicha necesidad de una forma adecuada, adaptándose a los distintos contextos que nos podemos encontrar.

Aprovechando que en el blog de Paradigma ya hemos hablado sobre Apache Kafka como mecanismo de procesamiento e integración en tiempo real de alto rendimiento y también sobre distintos servicios de Amazon Web Services, en este artículo vamos a ver cómo podemos integrar un clúster de Apache Kafka con funciones Amazon Lambda de manera directa. Para ello, hemos preparado un par de casos de uso en los que vamos a mostrar cómo realizar dicha integración desde distintos puntos de vista que pueden resultar interesantes.

El primer caso de uso va a consistir en una alerta de fraude en tiempo real, en el que desde una función Lambda consumimos eventos de un Apache Kafka externo.

Desde una función Lambda consumimos eventos de un Apache Kafka externo.

En el segundo caso, veremos cómo notificar en tiempo real la existencia de un nuevo fichero de auditoría, en el que a partir de la creación de un fichero en Amazon S3 llegamos a publicar un mensaje en un Apache Kafka externo.

Creación de un fichero en Amazon S3 llegamos a publicar un mensaje en un Apache Kafka externo.

Pero antes de empezar, vamos a ver cuál es la motivación que nos ha llevado a querer utilizar Apache Kafka junto a Amazon Lambda para realizar la integración de ambos mundos.

Un poco de contexto

¿Por qué puede resultar interesante la integración de Apache Kafka con Amazon Lambda?

Integración de Apache Kafka con Amazon Lambda.

Uno de los aspectos más atractivos de esta integración es el de poder compartir información de forma rápida y eficaz entre dos mundos, un clúster externo de Apache Kafka y los servicios en la nube de AWS.

Si bien es verdad que podemos administrar de manera cómoda un clúster de Apache Kafka en la nube de AWS (por ejemplo con Amazon MSK), lo más normal es encontrarse con sistemas heterogéneos debido a la gran cantidad de variables existentes a la hora de desarrollar los sistemas de una compañía. Por ello, vamos a enfocarnos en un clúster de Apache Kafka externo a la nube de AWS, independientemente si es on-Premise o si está alojado en otros proveedores cloud.

Clúster de Apache Kafka externo a la nube de AWS.

Por otro lado, la forma que tienen de integrarse ambos mundos es de manera directa, sin ningún tipo de intermediario como puede ser Kafka Connect, Amazon Kinesis, o cualquier otro servicio que pueda usarse como componente entre Apache Kafka y AWS Lambda, lo cual simplifica mucho la integración y el mantenimiento de dicha integración.

Por todo esto, vamos a aprovechar la capacidad de Amazon Lambda como función serverless y de Apache Kafka como mecanismo de integración en tiempo real para poder acometer casos de uso interesantes.

1 Caso de uso: alerta por correo electrónico de fraude en tarjetas de crédito en tiempo real

El caso de uso que proponemos consiste en enviar una alerta por correo electrónico al equipo directivo de una entidad bancaria en el mismo momento que se detecte fraude en una tarjeta de crédito.

Este caso de uso va muy ligado al caso de uso visto en la serie de soluciones de stream processing en Kafka, donde se detectaban en tiempo real los movimientos fraudulentos de las tarjetas de crédito y se almacenaban en un topic de Apache Kafka para su posterior explotación.

La idea detrás de este caso de uso es el de poder integrar la información existente en el clúster de Apache Kafka con nuestra nube de AWS para, así, poder explotarla mediante los servicios facilitados por el proveedor y cumplir con los objetivos del caso.

Integrar la información existente en el clúster de Apache Kafka con nuestra nube de AWS.

Como hemos ido avanzando, la integración del clúster de Apache Kafka con la nube lo haremos con Amazon Lambda, que es la pieza que se encargará de detectar la llegada de nuevos eventos al topic que informa sobre los movimientos fraudulentos, y utilizará el servicio Amazon Simple Email Service para envíar la alerta al equipo directivo con la información requerida.

¿Cómo lo hacemos?

Teniendo en cuenta que partimos de un clúster de Apache Kafka ya en funcionamiento, en primer lugar vamos a ver cómo realizar la integración con la nube a través de Amazon Lambda. Para ello, lo primero que tendremos que hacer es crear la función Lambda en nuestra cuenta de AWS.

Creamos la función Lambda en nuestra cuenta de AWS.

De momento solo vamos a definir el nombre y el lenguaje que deseamos utilizar para escribir el código de la función.

Cuando se crea una función Lambda se le asigna un rol, ya sea uno existente si lo hemos indicado previamente o uno nuevo para la nueva función.

Cuando se crea una función Lambda se le asigna un rol.

Este rol contiene, entre otras cosas, los permisos de la función. Es tarea indispensable dar los permisos necesarios para que nuestra función pueda realizar su cometido, como son los permisos sobre Amazon SES para permitir enviar correos electrónicos y los permisos sobre AWS Secrets Manager donde se guardan las credenciales para acceso al clúster externo de Apache Kafka. Para ello, simplemente hay que añadir a las políticas asignadas al rol los permisos necesarios para poder usar dichos servicios en nuestra función.

Permisos necesarios para poder usar dichos servicios.

Ya que en este artículo nos centramos exclusivamente en la creación de la propia función Lambda, sí puede resultar de interés ver cómo se crea un secret en AWS Secret Manager o cómo crear una identidad para el uso de Amazon SES en la documentación oficial de AWS.

Una vez tenemos nuestra función creada y los permisos establecidos, ya nos podemos centrar en la lógica de la propia función.

Teniendo en cuenta el caso de uso que queremos representar, lo primero que nos deberíamos plantear es cómo podemos monitorizar lo que ocurre en un topic de Apache Kafka y cómo podemos integrar los datos en nuestra nube. Pues bien, Amazon Lambda te permite agregar un trigger para desencadenar alguna acción cuando ocurre un nuevo evento en el topic que se esté observando.

Amazon Lambda te permite agregar un trigger para desencadenar alguna acción.

Para configurar el trigger simplemente debemos añadir los datos del clúster de Apache Kafka requeridos para realizar la conexión, el topic que se requiere monitorizar y algunos datos relevantes respecto a cómo realizar el consumo de su información.

Para configurar el trigger simplemente debemos añadir los datos del clúster de Apache Kafka requerido.

De esta manera tan simple, ya tendríamos disponible tanto una integración entre Apache Kafka y la nube de AWS, como la posibilidad de explotar los datos del topic.

A partir de aquí, ya sólo hay que implementar el código que permita manejar los eventos que vienen de Apache Kafka y enviar el correo electrónico a través de Amazon SES con la información requerida.

Para manejar los eventos que vienen de Apache Kafka, simplemente hay que iterar sobre el evento que genera el trigger que hemos configurado anteriormente.

// manejo de eventos que vienen de Apache Kafka
export const handler = async(event) => {
    const notifications = event.records["fraud-notifications-0"];
    for (let notification of notifications) {
    // llamada a método para el envío de alerta por correo electrónico
       (...)
    }
};

Por otro lado, para enviar un correo electrónico con Amazon SES simplemente tendremos que importar las librerías necesarias que proporciona el SDK de Amazon y generar el correo electrónico para su posterior envío.

// librerías para la construcción y envío del mensaje del cuerpo y asunto del mensaje
import { SendEmailCommand } from "@aws-sdk/client-ses";
import { SESClient } from "@aws-sdk/client-ses";

(...)

// construcción del cuerpo y asunto del mensaje
const buildEmail = (to, from, subject, …) => {
  return new SendEmailCommand({
    Destination: {
      CcAddresses: [],
      ToAddresses: [
        to,
      ],
    },
    Message: {
      Body: {
        Html: {
          Charset: "UTF-8",
          Data: (...=,
        },
        Text: {
          Charset: "UTF-8",
          Data: (...),
        },
      },
      Subject: {
        Charset: "UTF-8",
        Data: subject,
      },
    },
    Source: from,
    ReplyToAddresses: [],
  });
};

(...)

//envío del mensaje
const sendAlert = async (to, from, subject, …) => {
  const sesClient = new SESClient({ region: REGION });
  const sendEmailCommand = createSendEmailCommand(to, from, subject, …);
  try {
    return await sesClient.send(sendEmailCommand);
  } catch (e) {
    return e;
  }
};

Una vez hecho esto, solamente habría que desplegar la función Lambda y ya tendríamos realizado el caso de uso.

Para comprobar el funcionamiento de la integración, simplemente deberíamos escribir un evento de prueba en el topic de Apache Kafka asociado a la función y ver que la alerta ha sido enviada.

Mail con una notificación de posible fraude.

Todo el código implementado para la función está accesible en el siguiente repositorio. En esta ocasión, el código implementado está realizado en Node.js, pero las funciones Lambda permiten otro tipo de lenguajes, por lo que es posible adaptarlo al lenguaje en el que nos sintamos más cómodos.

2 Caso de uso: notificación de disponibilidad de autoría externa en tiempo real

En esta ocasión, el caso de uso que proponemos consiste en notificar la disponibilidad de un fichero de auditoría en el momento justo en el que se produce.

La idea detrás de este caso de uso es la de integrar la información existente en nuestra nube de AWS con un clúster externo de Apache Kafka para poder realizar alguna operación dentro de un flujo de negocio.

Integrar la información existente en nuestra nube de AWS con un clúster externo de Apache Kafka.

Si bien antes hemos visto un caso de uso de que la información fluye de Apache Kafka a la nube de AWS, en este caso de uso vamos a ver cómo fluye en sentido contrario.

Teniendo en cuenta que los ficheros de auditoría se almacenan dentro de un bucket de Amazon S3, vamos a usar Amazon Lambda para que monitorice dicho bucket y, en el mismo momento que se cree un nuevo objeto, notificarlo a un topic de Apache Kafka.

¿Cómo lo hacemos?

En este caso, partimos de un clúster de Apache Kafka ya en funcionamiento y, además, de un bucket de Amazon S3 ya creado de forma similar a lo que describe la documentación oficial sobre cómo crear un bucket de Amazon S3.

Lo primero que tendremos que hacer es crear la función Lambda en nuestra cuenta de AWS y agregar los permisos necesarios al rol de la función de forma similar a como lo hemos hecho en el caso anterior, por lo que no vamos a entrar en más detalle. Una vez tenemos nuestra función creada y los permisos establecidos, ya nos podemos centrar en la lógica de la propia función.

Para acometer el caso de uso, lo primero que nos deberíamos plantear es cómo podemos monitorizar lo que ocurre en un bucket de Amazon S3 y ver cómo, de una forma reactiva, desencadenar acciones una vez se ha creado un nuevo objeto. Amazon Lambda también nos ayuda en este caso, ya que nos permite agregar un trigger que lanza un evento en el momento en el que se crea un nuevo objeto en el bucket que se esté observando.

Agregamos un trigger que lanza un evento en el momento en el que se crea un nuevo objeto en el bucket

Para configurar el trigger simplemente debemos añadir el nombre del bucket y el tipo de evento que queremos que monitorice (creación, actualización, eliminación). Además, de manera opcional, podemos configurar otro tipo de cosas en relación a los eventos generados.

Configuramos el trigger simplemente debemos añadir el nombre del bucket y el tipo de evento.

De esta manera tan simple ya somos capaces de reaccionar a los eventos de creación que se produzcan en el bucket.

A partir de aquí, ya solo hay que implementar el código de la función Lambda que permita manejar los eventos que vienen del bucket de Amazon S3 y notificar al topic de Apache Kafka sobre la creación de un nuevo fichero de auditoría.

Para manejar los eventos que vienen del bucket es realmente sencillo, ya que solo hay que tener en cuenta el formato del evento generado por el trigger.

// manejo de eventos que vienen del bucket de Amazon S3
export const handler = async(event, context) => {
    // procesar evento de creación de S3
    const fileName = event.Records[0].s3.object.key;
    const eventTime = event.Records[0].eventTime;
    (...)
};

Por otro lado, la integración con Apache Kafka no es tan elegante como en el caso anterior, pero resulta igual de sencilla. En este caso, haremos uso del módulo kafkajs de Node.js para notificar sobre la creación del nuevo fichero.

// librería para integrarse con Apache Kafka
import { Kafka } from "kafkajs";

(...)

// conexión a Kafka
const kafka = new Kafka({
    clientId: 'audit-notification-client',
    brokers: ['kafka-server:9092'],
    ssl: true,
    sasl: {
        username: <key>,
        password: <value>,
        mechanism: 'plain'
    }
});

(...)

// productor de Kafka
const producer = kafka.producer();
const topic = 'audit-notifications';

try {
    await producer.connect();
    await producer.send({
        topic: topic,
        messages: [{key: <key>, value: <value>}]
    });
} catch (error) {
    console.log(error);
}

await producer.disconnect();

(...)

Una vez hecho esto, solamente habría que desplegar la función Lambda y ya tendríamos realizado el caso de uso.

Para comprobar el funcionamiento de la integración que hemos realizado, simplemente deberíamos escribir un fichero de prueba en el bucket asociado a la función.

Escribimos un fichero en el bucket.

Si todo funciona correctamente, veremos que en el topic de Apache Kafka se notifica el nuevo evento de auditoría.

Si funciona, se notifica el nuevo evento de auditoría.

A partir de aquí ya podemos implementar cualquier acción para explotar esta información y potenciar las líneas de negocio. Como en el caso anterior, todo el código implementado para la función está accesible en el siguiente repositorio.

¿El camino a seguir?

Teniendo en cuenta todas la posibilidades que existen para integrar sistemas y los distintos contextos en los que nos podemos encontrar, definir cuál es la mejor herramienta para poder integrar sistemas sería bastante arriesgado.

Respecto al artículo que nos atañe, la solución de integración entre Apache Kafka y AWS Lambda resulta muy sencilla y suele resultar bastante económica. Sin embargo, hay que tener en cuenta que en determinados contextos puede que esta solución no pueda cubrir todas las necesidades requeridas.

Por ello, lo más importante es tener la capacidad de análisis y sentido común para ver qué herramienta es la más adecuada para nuestra situación.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.