Snowflake proporciona una serie de conectores para poder desarrollar y facilitar un poco las labores de automatización. Ahora mismo existen conectores para python, spark y kafka así como drivers para nodejs, go, .NET, JDBC y ODBC. En este caso, y como hemos comentado, vamos a hacer uso del conector de python, ya que es uno de los lenguajes más utilizados en análisis y tratamiento de datos junto con sus librerías.

¿Cómo implementar un flujo de subida de datos a Snowflake desde local en Python? En este post te lo contamos.

Instalación

Lo primero que tendríamos que hacer es instalar la librería dentro del entorno virtual en el que vamos a trabajar:

pip install --upgrade snowflake-connector-python

Tenemos la posibilidad de trabajar solamente con el conector de snowflake y ejecutar las queries directamente mediante el uso de cursores, tanto para consultas de resultados como para subir y bajar datos, pero en este caso vamos a hacer uso de pandas, por ser una de las librerías de tratamiento de datos más extendidas (por no decir el estándar de la industria).

Subida de datos a Snowflake

Stages

En Snowflake existe lo que llaman stages, que son unos almacenes, temporales o no, para los ficheros de datos que queremos ingestar dentro del datawarehouse.

Cuando se trata de subir datos para generar una tabla, o incorporarlos a una ya existente, tenemos la posibilidad de subirlo a un stage definido (como si fuera una ruta) o al stage del usuario. Por defecto, si no se define un stage, se suben a este último, aunque lo recomendable si hay varios usuarios trabajando y subiendo ficheros, es utilizar un stage definido.

En este caso, y para tenerlo todo un poco más ordenado, tendríamos que utilizar la siguiente sentencia SQL para generar el stage antes de subir el fichero:

CREATE OR REPLACE STAGE nombre_del_stage

Además, en esa sentencia se puede definir no solo el nombre, sino también si se trata de un stage temporal, el tipo de fichero que vamos a subir, las opciones de copia en caso de que las necesitemos y ciertos comentarios que quedarán asociados al fichero que subamos.

Como veremos más adelante, esta sentencia la podemos integrar en un proceso automático de carga de ficheros, ya que las librerías de python para trabajar con Snowflake permiten lanzar consultas SQL directamente sobre las bases de datos.

También está la posibilidad de crear el stage de manera externa a Snowflake. Por ejemplo, en S3 debemos añadir el parámetro url a la sentencia, así como las credenciales y claves de cifrado si fuera necesario.


CREATE OR REPLACE STAGE nombre_del_stage url='s3://external/stage' credentials=(aws_key_id='abcde' aws_secret_key='12345')

También se podrían crear stages en GCP o Azure, pues están soportados por Snowflake.

Una vez hemos comenzado a trabajar con stages podemos revisar los ficheros que tenemos subidos, y también podemos consultar los que tenemos en un stage concreto mediante consultas SQL.

Si solamente queremos consultar los ficheros en el stage del propio usuario, haremos la siguiente consulta:

LIST @~;

Si por el contrario es en un stage definido:

LIST @nombre_del_stage;

Además, podemos consultar la información en los ficheros dentro del stage directamente sin cargarla en la base de datos, por ejemplo si tenemos un fichero csv con 2 columnas en la ruta nombre_del_stage/ruta/fichero.csv:

select t.$1, t.$2 from @nombre_del_stage (file_format => 'formato', pattern=>'.*ruta/fichero.csv') t;

Y con eso podemos ver las columnas 1 y 2 del fichero antes de cargarlo en la tabla en caso de que hiciera falta.

A continuación veremos cómo se cargan los ficheros directamente en las tablas desde python de manera que se pueda integrar en un proceso automatizado.

Write_pandas

Write_pandas es una función de la librería de Snowflake que nos permite escribir directamente en una base de datos de Snowflake desde una tabla en formato dataframe, solamente generando la conexión y pasando el dataframe como parámetro.

Lo primero será importar lo que vayamos a necesitar, que en este caso será:

import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

¿Cómo lo vamos a utilizar? Definimos las variables de conexión, en nuestro caso:

USER = 'USER'
PASSWORD = 'PASSWORD'
ACCOUNT = 'ACCOUNT.europe-west4.gcp'
WAREHOUSE = 'SMALL_WH'
DATABASE = 'SF_DB_POC'
SCHEMA = 'PUBLIC

En “USER” completamos el usuario que aparece en el webui de Snowflake, tal y como se muestra en el post de mi compañera Silvia. La “PASSWORD” es la que utilizamos para loguearnos y “ACCOUNT”, la parte de la url que está por delante de snowflakecomputing.com.

Para definir la variable “ACCOUNT” tendremos que seleccionar la cadena completa, ya que indica no solo el nombre de la cuenta, si no también la zona geográfica y el proveedor de cloud en el que está alojado. En este caso, podéis comprobar que yo he optado por trabajar con Google Cloud Platform tal como se indica en la tercera parte de la cuenta, gcp. En caso de optar por AWS o Azure, aparecerían estos datos en lugar de gcp.

En este caso, ya tenía creadas con anterioridad la bbdd y el esquema, pero en caso contrario se podría omitir y crearlo una vez que realicemos la conexión.

En este ejemplo utilizaremos un fichero de reserva de hoteles, y si necesitáis ficheros de ejemplo, podéis buscarlos en kaggle.com. En mi caso lo he llamado 'hotel_bookings.csv', está en formato csv y con la siguiente línea de cabecera:

hotel,is_canceled,lead_time,arrival_date_year,arrival_date_month,
arrival_date_week_number,arrival_date_day_of_month,stays_in_weekend_nights,
stays_in_week_nights,adults,children,babies,meal,country,market_segment,
distribution_channel,is_repeated_guest,previous_cancellations,
previous_bookings_not_canceled,reserved_room_type,assigned_room_type,
booking_changes,deposit_type,agent,company,days_in_waiting_list,
customer_type,adr,required_car_parking_spaces,total_of_special_requests,
reservation_status,reservation_status_date

Una vez tenemos el fichero, nos ponemos manos a la obra:

# generamos conexión con snowflake
con = snowflake.connector.connect(
  user=USER,
  password=PASSWORD,
  account=ACCOUNT,
  warehouse=WAREHOUSE,
  database=DATABASE,
  schema=SCHEMA
  )
df = pd.read_csv(FILE)<br>

De esta forma, ya tenemos la conexión abierta con Snowflake y el fichero cargado en memoria con pandas. Ahora es un buen momento para revisar que todos los datos son correctos.

Una vez en Pandas, utilizaremos la siguiente línea para subir los datos del .csv:

# Subiendo dataframe de Pandas con el conector de Python de Snowflake

success, nchunks, nrows, output = write_pandas(con,df,'TABLE_NAME')

Estas variables nos indicarán si la subida ha sido exitosa o no:

Ahora podemos hacer las comprobaciones necesarias en nuestro código para asegurarnos de que la información se ha subido correctamente: que la variable success sea True, que el número de filas coincida con el número de filas que tenemos en el dataframe, etc.

Método pd_writer con sqlalchemy

“Pd_writer” es un método de inserción de la librería de Snowflake que podemos utilizar junto con el método “to_sql” de pandas (pandas.Dataframe.to_sql) para escribir los datos en Snowflake.

En caso de que no nos guste utilizar la función write_pandas para la subida, podemos realizarla con el método pd_writer, aunque en ese caso es necesario hacer algún preparativo más:

Debemos importar también “create_engine” desde la librería de sqlalchemy. En mi caso, tuve que importar “registry” desde esa librería y registrar Snowflake para poder usarlo, ya que me daba errores y no era capaz de encontrarlo.

from sqlalchemy.dialects import registry
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')
from sqlalchemy import create_engine
from snowflake.connector.pandas_tools import pd_writer

Ahora que tenemos todo importado, realizaremos la conexión de la siguiente manera:

engine = create_engine(
    'snowflake://{user}:{password}@{account}/{database}/{schema}?warehouse={warehouse}'.format(
        user=USER,
        password=PASSWORD,
        account=ACCOUNT,
        warehouse=WAREHOUSE,
        database=DATABASE,
        schema=SCHEMA,
    )
)

Y para subir los datos, una vez que los tenemos dentro de un dataframe y correctamente revisados, solamente haría falta:

df.to_sql(TABLE_NAME, engine,index=False, method=pd_writer)

Es importante resaltar que en el caso de tener definidas ambas conexiones, podría dar problemas como nos pasó durante nuestras pruebas, suponemos que debido a que se quedó cacheado algún valor de la anterior conexión y por lo tanto no era capaz de abrir una nueva. Simplemente con cerrar la aplicación y volver a abrirla se soluciona, así que recomendamos utilizar uno de los métodos por aplicación, o ir cerrando las conexiones una vez que las terminemos de usar con:

con.close()

En este caso no teníamos generada la tabla de reservas ni una tabla para traducir los códigos de países a nombres, así que para poder generarla podéis utilizar esto:

query = 
'''
CREATE OR REPLACE TABLE paises_iso3166 (
pais VARCHAR(50), 
nombre_iso_oficial VARCHAR(75), 
cod_alfa2 VARCHAR(2),
cod_alfa3 VARCHAR(3),
cod_numerico INT,
observaciones STRING
);
'''

con.cursor().execute(query)

Esa última línea es la que tendríamos que usar para lanzar consultas SQL dentro de la base de datos de Snowflake.

Recuperación de datos

Ya sabemos cómo generar stages y cargar datos, tanto a los stages como directamente en las tablas. Ahora vamos a ver cómo se recuperan datos desde Snowflake utilizando Python para consultar una porción de los datos, realizar comprobaciones automáticas y, en definitiva, trabajar con conjuntos de datos existentes en Snowflake en un proceso automatizado.

Cabe destacar que podemos seguir trabajando con Pandas y también con los cursores para acceder a los datos.

En el caso de sentirnos más cómodos trabajando con cursores, la librería de Snowflake dispone de varios métodos, dependiendo de si queremos un resultado o varios y nos va a devolver objetos de Python diferentes.

Si solamente queremos un resultado, utilizaremos el método fetchone(), que nos devolverá una tupla de elementos:

con.cursor().execute("SELECT * FROM TABLE").fetchone() 

También tenemos la posibilidad de extraer un número concreto de elementos o incluso todos los elementos que devuelva la consulta y, en ambos casos, recibiremos una lista:

con.cursor().execute("SELECT * FROM TABLE").fetchmany(2)
con.cursor().execute("SELECT * FROM TABLE").fetchall()

Como hemos comentado, también está la posibilidad de trabajar con Pandas para consultar las bases de datos, y los dos métodos de los que dispone la librería de Snowflake son: fetch_pandas_all() y fetch_pandas_batches().

El primero se trae el set completo de datos de la consulta y el segundo devuelve un lote. Puede utilizarse en un bucle para descargar los datos en caso de que no sea posible realizarlo con fetch_pandas_all:

con.cursor().execute("SELECT * FROM TABLE").fetch_pandas_all()

for df in con.cursor().execute("SELECT * FROM TABLE").fetch_pandas_batches():
    función_para_procesar_los_datos(df)

Como contrapartida, estos dos métodos solo funcionan actualmente con sentencias SELECT. En caso de querer realizar alguna otra operación, sería necesario utilizar el método read_sql de la propia librería Pandas, de manera similar a como hemos utilizado el método to_sql anteriormente.

Conclusiones

La librería de Snowflake para Python facilita muchísimo la automatización de tareas y ejecuciones, y es muy sencilla de implementar dentro de los flujos ya existentes. Lo más complicado sería generar la conexión y después, simplemente utilizar cualquiera de las funciones para subir los datos o ejecutar las queries correspondientes.

Todo esto está alineado con la filosofía de Snowflake de simplificar el trabajo con almacenes de datos. Usar estas herramientas que nos proporciona Snowflake podría llegar a reducir la carga de trabajo de ciertos perfiles al ser capaces de reutilizar el trabajo de otros, como analistas de datos o BI, que muchas veces trabajan directamente con SQL.

En este ejemplo hemos optado por utilizar un fichero en local, pero usar datos ya en la nube sería tan sencillo como utilizar las librerías del lenguaje de preferencia, subir los datos al stage de datos de Snowflake y ejecutar las consultas SQL correspondientes para generar tablas, copiar, actualizar datos o la tarea que corresponda.

Cuéntanos qué te parece.

Los comentarios serán moderados. Serán visibles si aportan un argumento constructivo. Si no estás de acuerdo con algún punto, por favor, muestra tus opiniones de manera educada.

Suscríbete

Estamos comprometidos.

Tecnología, personas e impacto positivo.