Apache Airflow, el nuevo director de orquesta de Google Cloud

Apache Airflow es uno de los últimos proyectos open source que han despertado un gran interés de la comunidad. Hasta el punto de haber sido integrado dentro del stack de Google Cloud como la herramienta de facto para orquestar sus servicios.

¿Qué hace a este proyecto tan especial y por qué ha tenido tan buena acogida? Analizamos su evolución y desgranamos sus principales características.

Historia

Apache Airflow fue creado en octubre de 2014 por Maxime Beauchemin, dentro del equipo de ingeniería de datos de Airbnb, la famosa plataforma de alquiler vacacional. Desde sus comienzos fue concebido como software abierto y se publicó oficialmente en junio de 2015 estando disponible en GitHub.

Airflow fue acogido dentro del programa de incubación de la Apache Software Foundation en marzo de 2016, siguiendo así los pasos de otros grandes proyectos de software open source dentro del área de datos como Hadoop o Spark. Hoy en día es usado en producción por más de 200 compañías en todo el mundo, como Paypal, Spotify o Twitter.

El último gran empujón a Apache Airflow viene de la mano de Google. En mayo de 2018 anunció Google Cloud Composer, un servicio gestionado de Apache Airflow totalmente integrado en la plataforma de Google Cloud y que se convierte en una de las piedras angulares para orquestar servicios gestionados en Google Cloud.

DAGs y Operators

Pero realmente, ¿qué es Apache Airflow? Se trata de una plataforma a través de la cual podemos crear workflows de forma programática y, además, planificarlos y monitorizarlos de forma centralizada.

Un workflow dentro de Airflow podríamos definirlo como una secuencia de tareas, disparadas por un evento o planificación y que suelen usarse para manejar pipelines de datos.

Un ejemplo de un workflow muy sencillo sería el compuesto por las siguientes tareas:

  1. Descargar ciertos datos de un origen de datos (una base de datos por ejemplo).
  2. Enviar estos a un sistema de procesamiento (por ejemplo un cluster de Spark).
  3. Monitorizar el avance de dicho procesamiento.
  4. Generar un reporte con los resultados del procesamiento.
  5. Enviar el reporte por email.

Airflow modela estos workflows o conjuntos de tareas como grafos acíclicos dirigidos, en inglés DAG (Directed Acyclic Graphs). Estos grafos tienen la peculiaridad de que cumplen dos condiciones:

  • Son dirigidos: las uniones entre los diferentes nodos tienen un sentido
  • Son acíclicos: no podemos formar ciclos y por lo tanto volver a un nodo por el que ya hayamos pasado

Este tipo de grafo es usado también por otros proyectos de software muy relevantes hoy en día como Spark o Tensorflow para crear sus modelos de ejecución.

Cada una de estas tareas que componen un DAG de Airflow es un Operator en Airflow. Por lo tanto, para definir un DAG tendremos que definir todos los Operators necesarios y establecer las relaciones y dependencias entre ellos, esto lo haremos de forma sencilla con código Python.

Existen multitud de Operators predefinidos aunque podemos extender los nuestros si es necesario. Algunos de los Operators predefinidos más interesantes son los siguientes:

  • BashOperator: ejecuta un comando bash.
  • PythonOperator: invoca una función Python.
  • EmailOperator: envía un email.
  • SimpleHttpOperator: hace una petición HTTP.
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc.: ejecuta una query SQL.
  • Sensor: espera por un tiempo, fichero, fila de base de datos, objeto en S3…

En la documentación oficial podemos encontrar una documentaOperatorsción exhaustiva de todos los Operators “oficiales”.

Un ejemplo real

Un ejemplo de un DAG mínimo sería el siguiente:

En este workflow hemos definido tres tareas:

  • La primera print_date mostrará la fecha actual y nos referiremos a ella como t1 en el código, una vez se haya ejecutado se dispararán otras dos tareas en paralelo.
  • sleep, que se dormirá 5 segundos, y que en código llamaremos t2.
  • Y templated, que mostrará cierta información en base a una plantilla predefinida y que llamaremos t3.

A continuación describimos el código necesario para programar este workflow.

En primer lugar, realizamos las importaciones correspondientes, entre ellas la del BashOperator que es el tipo de Operator, que usaremos en este ejemplo:

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

En segundo lugar, definimos los argumentos por defecto que usaremos para instanciar el DAG, en este punto configuraremos aspectos importantes como la política de reintentos.

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'dag': dag,
    # 'adhoc':False,
    # 'sla': timedelta(hours=2),
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function,
    # 'on_success_callback': some_other_function,
    # 'on_retry_callback': another_function,
    # 'trigger_rule': u'all_success'
}

Instanciamos el DAG pasándole los argumentos por defecto y la planificación de ejecución. En este caso, una vez al día.

Para planificar la ejecución podemos usar también una notación de tipo cron que suele ser la más conveniente:

dag = DAG(
    'tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)

A continuación, definimos los tres Operators. En este caso como instancias de BashOperator, los asociaremos a nuestro DAG y en el parámetro bash_command definiremos el comando bash que deben ejecutar:

# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)

t2 = BashOperator(
    task_id='sleep',
    depends_on_past=False,
    bash_command='sleep 5',
    dag=dag,
)

templated_command = """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
    echo "{{ params.my_param }}"
{% endfor %}
"""

t3 = BashOperator(
    task_id='templated',
    depends_on_past=False,
    bash_command=templated_command,
    params={'my_param': 'Parameter I passed in'},
    dag=dag,
)

Por último, definiremos la dependencia o relación que existe entre nuestros operadores. En este caso, haciendo uso de los operadores bit shift (>>), que están sobrecargados desde la versión 1.8 para simplificar la definición de relaciones entre Operators:

t1 >> [t2, t3]

Características interesantes

Hasta aquí hemos descrito el funcionamiento general de Airflow y cómo programar un DAG. Pero, ¿por qué es especial Airflow y qué lo diferencia de otras herramientas?

A continuación, describimos algunas de las características más notables de Airflow:

  • Gestión de fallos en data sources or sinks: gestión integrada de errores en el acceso a datos pudiendo definir diferente comportamiento para cada caso.
  • Reprocesamiento jobs históricos: mandar reprocesar jobs del pasado directamente desde el GUI de forma muy sencilla.
  • Parámetros entre jobs: pasar parámetros entre diferentes jobs a través de un almacenamiento intermedio, esto nos permite dotar a los workflows de un estado.
  • Reintentos automáticos: gestión automática de reintentos en base a la configuración que hagamos para cada DAG.
  • Soporte CI y CD en workflows: integrar fácilmente Airflow con nuestros sistemas de integración o despliegue continuo, ya sea con herramientos como Jenkins o gestión de DAGs en Git.
  • Muchas integraciones: Airflow tiene integraciones con multitud de plataformas y software de terceros a través de los Operators, muchos de ellos son contribuciones de la comunidad: Hive, Presto, Druid, AWS, Google Cloud, Azure, Databricks, Jenkins, Kubernetes, Mongo, Oracle, SSH, etc.
  • Data sensors: son un tipo particular de operadores que nos permiten esperar hasta que se cumpla cierta condición, por ejemplo, que se escriba un fichero en HDFS o en S3.
  • Capacidades para testing de jobs: permite testear y validar nuestros tests antes de que sean desplegados.
  • Logs y metadatos desde el GUI web: un punto centralizado desde donde acceder a todos los logs y metadatos asociados a cada tarea desde el interfaz web.
  • Triggers que disparen tareas: tenemos diferentes opciones para disparar la ejecución de nuestros workflows de forma automática en base a una planificación temporal, o también podemos dispararlos de forma manual.
  • Monitorización en RT y alertas: podemos monitorizar el estado de ejecución de nuestros workflows en tiempo real desde el GUI.
  • Gran soporte de la comunidad: Airflow está licenciado con licencia Apache 2.0 y tiene un gran apoyo de la comunidad con más de 700 contribuidores y 6000 commits.

Google Cloud Composer

Como en ocasiones anteriores, Google se basa en proyectos open source con prestigio y una amplia comunidad y los integra en su plataforma en la nube como servicios gestionados.

Con esta estrategia Google no parte de cero a la hora de construir servicios en la nube. Por otro lado, refuerza y apoya estas comunidades, contribuyendo al desarrollo de los proyectos. Podemos ver otros casos parecidos con Apache Beam y Dataflow o con Kubernetes y GKE.

Cloud Composer no deja de ser una versión de Apache Airflow, pero tiene ciertas ventajas al tratarse un servicio gestionado (por supuesto también tiene un coste adicional). Algunas de los principales beneficios de Cloud Composer son los siguientes:

  • Simplicidad:
    • One-click para crear un nuevo entorno de Airflow.
    • Control integrado con Google Cloud SDK, Google Developer Console.
    • Acceso sencillo a Airflow Web UI.
  • Seguridad:
    • Identity access management (IAM): credenciales, permisos, policies.
  • Escalabilidad:
    • Fácilmente escalable sobre infraestructura de Google Cloud.
  • Monitorización:
    • Logging y monitorización integrada con Stackdriver.
    • Gestión de dependencias de paquetes Python (pip).
  • Total integración con GCP: Big Data, Machine Learning…

Otro aspecto interesante a destacar es que con Cloud Composer nuestros ficheros que describen los DAGs son almacenados en Cloud Storage.

Para desplegar un nuevo DAG simplemente tenemos que subirlo a Cloud Storage, ya sea por el interfaz web de la consola de Google Cloud o por las herramientas del SDK de Google Cloud (gcloud).

Conclusiones

Podemos concluir que Apache Airflow es un proyecto open source consolidado, con una gran y activa comunidad detrás y con el apoyo de empresas importantes como Airbnb y Google.

Muchas compañías están usando Airflow hoy en día en producción para orquestar sus workflows de datos e implementar sus políticas de gobierno y calidad del dato.

Airflow nos permite gobernar de forma centralizada nuestros pipelines de datos y nos ofrece un sistema moderno de logging y monitorización homogéneo.

Gracias a Cloud Composer podemos disponer de un entorno de Airflow productivo de forma muy rápida y sencilla con las comodidades de un servicio gestionado e integrado con toda la plataforma de Google Cloud.

Reduciremos nuestros esfuerzos en el despliegue y el mantenimiento de la infraestructura, pero tendremos los costes asociados al servicio en la nube.

Foto de mzaforas

Manuel Zaforas es Ingeniero Superior en Informática por la UPM. Actualmente trabaja en Paradigma como líder de la iniciativa de innovación en AI y Big Data ayudando a diferentes compañías a aplicar tecnologías innovadoras en sus negocios. Está interesado en Big Data, AI, Data Science, Machine Learning, Deep Learning, HPC, IoT, Cloud, NoSQL, NewSQL, Python y Agile.

Ver toda la actividad de Manuel Zaforas

Escribe un comentario