Recientemente tuve la oportunidad de interactuar muy de cerca con la API RESTful de Elasticsearch. La verdad es que ha sido una experiencia bastante agradable e incluso sorprendente, ya que no me esperaba contar con tal variedad de funcionalidades y características para la ingesta y gestión de documentos.

Pero es cierto que mi mayor sorpresa llegó cuando vi funcionar a esta API en combinación con la tecnología RxJava, las mejoras a nivel de rendimiento fueron enormes y lo mejor de todo es que las preocupaciones en cuanto a la capacidad para manejar grandes cargas de trabajo pasaron a la historia.

En el siguiente post hablaremos sobre cómo trabajar con el API REST de Elasticsearch en conjunto con RxJava.

El gran reto

Cuando comenzamos a trabajar con la API REST de Elasticsearch, el objetivo principal era el de interactuar de forma directa con el repositorio de datos, haciendo uso de las operaciones básicas para la gestión de documentos (por decirlo de otra manera, un CRUD de toda la vida).

Alguno se preguntará, ¿por qué hacerlo de esta manera y no mediante el uso de los repositorios que nos provee Spring Data?

La respuesta es sencilla, se debe simplemente a un problema de compatibilidad en la combinación de versiones con las que trabajamos en el proyecto (Elasticsearch + Drivers + Spring Boot + Spring Data).

Elasticsearch - API REST

Elasticsearch nos provee una serie de recursos y herramientas para la gestión índices, estructuras y documentos, permitiendo la creación, actualización, borrado, consulta... Recordemos que los documentos representan los registros dentro de una base de datos relacional, dichos documentos son expresados en formato JSON.

Para fines de implementación basta con configurar el RestTemplate de Spring para gestionar las peticiones HTTP a través de código Java.

Por ejemplo, para crear un nuevo documento en la base de datos es necesario componer una petición de la siguiente con la siguiente estructura:

curl -X POST "localhost:9200///" -H 'Content-Type: application/json' -d' { "topic": "Meetup Elasticsearch + RxJava", "post_date": "2018-10-20T10:01:59", "message": "Creating document in Elasticsearch via RxJava" }'

Desde el código Java la invocación a este servicio sería algo como:

Parece bastante sencillo, ¿no? Ciertamente lo es, así que la solución a nuestro problema pasó por diseñar e implementar una nueva pieza de nuestra arquitectura, la cual sería la encargada de gestionar la ingesta de datos.

En otras palabras, se creó un microservicio cuya única responsabilidad era la de insertar documentos en la base de datos de Elasticsearch.

Arquitectura

Nuestro objetivo principal pasa por lograr un diseño que sea capaz de gestionar grandes volúmenes de eventos, donde la ingesta se realice en forma masiva y el procesamiento sea asíncrono, para así poder alcanzar el máximo rendimiento posible. A continuación os muestro un resumen de la arquitectura propuesta:

Obsérvese que cada vez que un microservicio requiere insertar un nuevo documento, deja un mensaje con la petición en una cola y ésta es tratada por el microservicio DAO que finalmente la traduce en una petición HTTP al API REST de Elasticsearch.

Todo muy bonito hasta aquí, ¿verdad?

Lo cierto es que nuestra primera solución, lejos de acabar con nuestras preocupaciones, se convirtió en un dolor de cabeza, ya que cuando el volumen de datos no es demasiado grande, funciona a las mil maravillas,pero cuando el volumen y flujo de datos aumenta de forma progresiva, nos encontramos con que la solución no es lo suficientemente buena y encontramos lo siguiente:

  1. Se genera un cuello de botella importante, ya que las instancias del microservicio DAO no son capaces de resolver las peticiones a un ritmo lo suficientemente alto como para consumir los mensajes a la par de cómo son producidos (back pressure).
  2. La ingesta de datos se produce con retraso. A pesar de lograrse la inserción de los documentos, no se realiza en el tiempo efectivo en el que se realizó la petición, pudiendo causar inconsistencia.
  3. El tráfico de peticiones hacia Elasticsearch se dispara, esto podría comprometer la estabilidad del repositorio.
  4. Se puede producir una degradación del servicio DAO, cosa que podría repercutir al resto de la aplicación.

En resumen, la solución no nos vale del todo, ya que no fuimos capaces de resolver el problema de ingesta de datos de forma eficaz y que el rendimiento del sistema se vio comprometido.

Sin embargo, no todo son malas noticias, ya que la propuesta a nivel de arquitectura podría considerarse lo suficientemente buena a falta de ajustar un par de detalles. En el siguiente apartado conoceremos una funcionalidad que nos servirá de gran ayuda para resolver el problema planteado anteriormente, el bulk de Elasitcsearch.

Bulk (Elasticsearch)

¿En qué consiste el bulk de Elasticsearch? Se trata de una funcionalidad en la que es posible agrupar varias operaciones sobre el repositorio de datos en una única petición HTTP, en el siguiente ejemplo vemos cómo sería una llamada agrupando varias acciones:

curl -X POST "localhost:9200/_bulk" -H 'Content-Type: application/json' -d' { "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } } { "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } } { "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } } { "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} } { "doc" : {"field2" : "value2"} } '

Para hacer uso de la funcionalidad bulk desde nuestro código Java será necesario importar la librería de Elasticsearch. Con maven sería algo como:

Lo siguiente sería inicializar el objeto BulkProcessor, que es donde se irán acumulando las peticiones en formato JSON.

Por cada elemento en la lista, realizamos la conversión al tipo requerido (en este caso JSON), para incluir el documento dentro de la petición.

Finalmente, hacemos la petición llamando al método flush del objeto BulkProcessor.

A continuación os presentamos el operador Buffer de ReactiveX, una pieza de software fundamental para la confeccionar la solución final a nuestro problema.

Buffer (RxJava)

El buffer es un operador dentro del ámbito de la programación reactiva que se encarga de acumular periódicamente los ítems emitidos por un objeto Observable, para conformar una petición agrupando todos los ítems en un solo paquete, en lugar de generar peticiones individuales por cada ítem recibido.

Existe una variante del operador en la que se considera el tiempo el número de ítems acumulados. De esta forma se garantiza que se emitirá una petición bien sea después de acumular un número N de elementos, o bien porque haya transcurrido el tiempo t indicado en la implementación.

Ahora bien, una vez que hemos conocido el operador buffer de la programación reactiva, nos replanteamos la solución al problema original, donde la misma se basa en combinar el dicho operador con la funcionalidad bulk que nos provee el API de Elasticsearch.

Un ejemplo de cómo podría implementarse el operador buffer en código Java.

En nuestro caso lo que hemos hecho ha sido crear una clase (BufferService) que implemente el método accept de la interfaz Consumer, además de definir un objeto PublishProcessor del que invocamos el método onNext cada vez que se agregue un nuevo elemento al buffer.

En el constructor de la clase BufferService lo que hacemos es simplemente inicializar el buffer mediante la invocación a la función fillBufferAndSubscribe**.** Y es que es en esta función donde se concentra toda la chicha y donde se encuentra el verdadero meollo del asunto.

Y es que es aquí donde se configura cómo va a ser comportamiento de nuestro buffer, profundidad, tiempo de espera y la función a la que se invocará cada vez que se libere un bloque (buffer y subscribe).

Un punto importante a resaltar es la llamada al método subscribeOn, el mismo sirve para indicar al planificador de hilos qué tipo usar, y es que en función del tipo de hilos, el rendimiento puede ser mejor o peor, además que dependiendo de la casuística en la que estemos nos podrá venir mejor el uso de uno u otro. Para más información sobre este punto podéis la documentación en el enlace RxJava Schedulers.

La solución

Optamos por incluir un buffer dentro del microservicio DAO donde se fuesen acumulando las peticiones por un volumen o tiempo dado para luego conformar una única petición al recurso bulk que nos provee Elasticsearch.

De esta manera hemos conseguido poner fin a nuestro problema.

  1. Hubo una reducción de peticiones de manera bestial, ya que ahora el número de operaciones por llamadas al DAO depende directamente del tamaño del buffer que hemos definido, con lo cual ya podéis imaginaros cómo mejora el rendimiento definiendo un buffer para 10K de operaciones. Aquí es importante hacer un pequeño paréntesis, pues hay que ser cuidadoso al definir el tamaño del buffer, ya que mientras este sea más grande, necesitaremos más memoria y posiblemente más tiempo para rellenarlo, tiempo que al final se traduce en latencia.
  2. Se disminuye la carga hacia el repositorio de Elasticsearch.
  3. Aunque se añade latencia entre peticiones, lo cierto es que se elimina por completo el cuello de botella en el microservicio DAO.
  4. No se sobrecargan las instancias del microservicio.
  5. La arquitectura propuesta inicialmente se mantuvo intacta.

Por otro lado, sabemos que no todas las soluciones son perfectas y que aunque nos puedan ofrecer muchos beneficios, siempre hay algo que tendremos que sacrificar.

  1. En este caso tenemos que mencionar que el trabajar con buffers tiene un gap importante en lo que se refiere a consistencia, ya que al tratarse de almacenamiento en memoria, si ocurre algún error no controlado, se perderían en su totalidad los mensajes acumulados en el buffer para el momento del fallo.
  2. Si bien es cierto que con el uso de buffers reducimos el número de peticiones en gran medida, también lo es que con el uso de los mismos añadimos un porcentaje de latencia a las peticiones.
  3. También hay que tener en cuenta que la gestión de memoria puede volverse un dolor de cabeza si no estimamos bien la asignación de recursos para los microservicios, ya que podríamos presentar problemas de desbordamiento de memoria.

¡Importante! Cuando trabajamos con buffers y peticiones bulk cobra mayor importancia el configurar un sistema de reintentos dentro de nuestra arquitectura, ya que si por algún motivo ocurre algún fallo en la invocación, el número de mensajes que se perderían sería mucho mayor.

Conclusión

Hemos podido conocer la potencia que tiene el combinar herramientas como el RxJava y el API REST Elasticsearch (Bulk) para dar solución a lo que originalmente era un problema complejo de resolver, además nos ha ayudado a optimizar el rendimiento del sistema sin que esto tuviera un gran impacto en nuestra arquitectura.

Referencias

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.