Crear una tabla en Amazon Athena desde un archivo CSV con Python

Athena AWS crear integraciones

<Hola>, con la finalidad de automatizar algunas funciones del trabajo, me di a la tarea de investigar cómo realizar la lectura de un archivo CSV y de ahí identificar las columnas para generar la consulta final que se ejecutará en Athena.

¿Qué es Athena?

Athena es uno de todos los servicio que ofrece Amazon, enfocado a obtener un análisis rápida de datos mediante SQL estándar, es bastante rápido, no lo he utilizado con una gran cantidad de datos, pero si he hecho una comparación de una consulta con el contenido en una tabla de Aurora(también de Amazon) de al menos unos 800,000 registros contra una consulta de Athena con la misma información, fue muy rápido el trabajo que hizo Athena.

¿Qué necesitamos?

  • Una cuenta en Amazon
  • Una terminal con python o un entorno de trabajo como Jupyter
    • Instalar la librería awswrangler(Lo veremos más adelante)
    • Instalar la librería awscli
  • Bajar el siguiente conjunto de datos para pruebas. Descargar CSV Pokémon

Instalación y configuración de librerías

Tenemos que instalar la librería awswrangler, nos permitirá identificar el tipo de variables en cada columna del CSV en automático, para la instalación en Jupyter, es necesario colocar los comandos siguientes.

import sys
!{sys.executable} -m pip install awswrangler
Instalación de la librería awswragler

Si estamos trabajando desde la terminal solamente necesitamos el siguiente comando

pip install awswrangler

También vamos a realizar lo mismo con la librería awscli

import sys
!{sys.executable} -m pip install awscli

y en terminal

pip install awscli

En la terminal necesitamos ejecutar el siguiente comando para configurar las credenciales AWS

aws configure
Configurar el AWS cli

Ahora podemos continuar con el código en Python

Iniciar el código

Primero declaramos las librerías

import os
import csv
import pandas as pd
import awswrangler as wr
import boto3

#from dotenv import load_dotenv
#load_dotenv()

#Varibles AWS
AWS_REGION = 'us-east-1'
#Necesitamos especificar donde deseamos que salgan los resultados de la consulta
AWS_RESULT_OUTPUT_LOCATION = "s3://news-test-s3/output-athena/"
#Tambien necesitamos una carpeta de lectura de la información
AWS_DATA_INPUT_LOCATION = "s3://news-test-s3/news-test-s3"
#Archivo CSV
CSV_PATH = 'pokemon.csv'

#nombre de la base de datos y el nombre de la tabla en Athena
NAME_DB_ATHENA = '`default`.`temporal`'

#leer el archivo CSV, solamente las 10 primeras filas
df = pd.read_csv(CSV_PATH, nrows=10)

#imprimir la lectura
print(df.to_string())

Aquí declaramos la configuración de la región de nuestros elementos en la nube, de igual forma declaramos la entrada y salida vía S3 del procesamiento de la consulta de Athena, este fragmento de código nos generara la siguiente salida

Primera salida de información

En la segunda parte del código haremos uso de la librería awswrangler, que nos facilitara la identificación de datos de manera fácil, también aprovechamos a que nos muestre esta información

#Procesamiento del CSV para la obteción de variables
columns_types, partitions_types = wr.catalog.extract_athena_types(
    df=df,
    file_format="csv",
    index=True
)

print( columns_types, type(columns_types) )
print( partitions_types, type( partitions_types ) )
Segunda salida en información

En el tercer fragmento de código haremos la consulta que se ejecutara en Athena, prácticamente es pura unión de cadenas de los elementos anteriores que se encuentra en una variable tipo diccionario.

#creando la consulta para crear la tabla en athena
sql_create_athena = "CREATE EXTERNAL TABLE IF NOT EXISTS " + NAME_DB_ATHENA + "("

add_create_athena = ""

#Eliminar le primer elemento
del columns_types['__index_level_0__']

#Visualizar la columna con su tipo de variables
for k,v in columns_types.items():
    add_create_athena += "`{}` {},".format(k, v)

sql_create_athena += add_create_athena[:-1]

sql_create_athena += ")" \
                        " ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " \
                        " WITH SERDEPROPERTIES ('field.delim' = ',') " \
                        " STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " \
                        " LOCATION '" + AWS_DATA_INPUT_LOCATION + "'" \
                        " TBLPROPERTIES ('classification' = 'csv', 'skip.header.line.count'='1'); "

#imprimimos la consulta para verificar manualmente
print( sql_create_athena )
Tercer fragmento de código, consulta creada para Athena

El ultimo fragmento de código, es para declarar la configuración del boto3 que nos ayuda a facilitar el acceso a AWS, para luego ejecutar la consulta, una vez realizada se nos entregara un código para identificar la consulta y saber si se realizo la consulta.

#Crear la sesión de Athena con awscli
session = boto3.Session(profile_name='default', region_name = AWS_REGION)
CLIENT_ATHENA = session.client('athena')

#consulta en athena
response = CLIENT_ATHENA.start_query_execution(
    QueryString = sql_create_athena,
    ResultConfiguration={"OutputLocation": AWS_RESULT_OUTPUT_LOCATION}
)

#imprime el id de la consulta para obtener más información
print( response["QueryExecutionId"] )
Impresión de identificador de consulta de Athena

Con este pequeño tutorial podremos automatizar la identificación de datos de un archivo CSV y así crear la tabla en Athena, en unas entradas futuras podremos ver la integración con la herramienta Quicksight para tener una buena integración de datos, dejo todo el código completo.

import os
import csv
import pandas as pd
import awswrangler as wr
import boto3

#from dotenv import load_dotenv
#load_dotenv()

#Varibles AWS
AWS_REGION = 'us-east-1'
#Necesitamos especificar donde deseamos que salgan los resultados de la consulta
AWS_RESULT_OUTPUT_LOCATION = "s3://news-test-s3/output-athena/"
#Tambien necesitamos una carpeta de lectura de la información
AWS_DATA_INPUT_LOCATION = "s3://news-test-s3/news-test-s3"
#Archivo CSV
CSV_PATH = 'pokemon.csv'

#nombre de la base de datos y el nombre de la tabla en Athena
NAME_DB_ATHENA = '`default`.`temporal`'

#leer el archivo CSV, solamente las 10 primeras filas
df = pd.read_csv(CSV_PATH, nrows=10)

#imprimir la lectura
print(df.to_string())

#for name, dtype in df.dtypes.iteritems():
#            print(name, dtype)

#Procesamiento del CSV para la obtención de variables
columns_types, partitions_types = wr.catalog.extract_athena_types(
    df=df,
    file_format="csv",
    index=True
)

print( columns_types, type(columns_types) )
print( partitions_types, type( partitions_types ) )

#creando la consulta para crear la tabla en athena
sql_create_athena = "CREATE EXTERNAL TABLE IF NOT EXISTS " + NAME_DB_ATHENA + "("

add_create_athena = ""

#Eliminar le primer elemento
del columns_types['__index_level_0__']

#Visualizar la columna con su tipo de variables
for k,v in columns_types.items():
    add_create_athena += "`{}` {},".format(k, v)

sql_create_athena += add_create_athena[:-1]

sql_create_athena += ")" \
                        " ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " \
                        " WITH SERDEPROPERTIES ('field.delim' = ',') " \
                        " STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' " \
                        " LOCATION '" + AWS_DATA_INPUT_LOCATION + "'" \
                        " TBLPROPERTIES ('classification' = 'csv', 'skip.header.line.count'='1'); "

#imprimimos la consulta para verificar manualmente
print( sql_create_athena )

#Crear la sesión de Athena con awscli
session = boto3.Session(profile_name='default', region_name = AWS_REGION)
CLIENT_ATHENA = session.client('athena')

#consulta en athena
response = CLIENT_ATHENA.start_query_execution(
    QueryString = sql_create_athena,
    ResultConfiguration={"OutputLocation": AWS_RESULT_OUTPUT_LOCATION}
)

#imprime el id de la consulta para obtener más información
print( response["QueryExecutionId"] )

</Saludos>

Deja un comentario