Saltar a contenido

HDFS

Apuntes sin actualizar

Estos apuntes pertenecen al curso 21/22 y, por lo tanto, ya no se actualizan.

Puedes acceder a la última versión de esta sesión en https://aitor-medrano.github.io/iabd2223/hadoop/04hdfs.html.

Funcionamiento de HDFS

En la sesión anterior hemos estudiado los diferentes componentes que forman parte de HDFS: namenode y datanodes. En esta sesión veremos los procesos de lectura y escritura, aprenderemos a interactuar con HDFS mediante comandos, el uso de instantáneas y practicaremos con los formatos de datos más empleados en Hadoop, como son Avro y Parquet.

Procesos de lectura

Vamos a entender como fluyen los datos en un proceso de lectura entre el cliente y HDFS a partir de la siguiente imagen:

Proceso de lectura
  1. El cliente abre el fichero que quiere leer mediante el método open() del sistema de archivos distribuido.
  2. Éste llama al namenode mediante una RPC (llamada a procedimiento remoto) el cual le indica la localización del primer bloque del fichero. Para cada bloque, el namenode devuelve la dirección de los datanodes que tienen una copia de ese bloque. Además, los datanodes se ordenan respecto a su proximidad con el cliente (depende de la topología de la red y despliegue en datacenter/rack/nodo). Si el cliente en sí es un datanode, la lectura la realizará desde su propio sistema local.
  3. El sistema de ficheros distribuido devuelve al cliente un FSDataInputStream (un flujo de entrada que soporta la búsqueda de ficheros), sobre el cual se invoca la lectura mediante el método read(). Este flujo, que contiene las direcciones de los datanodes para los primeros bloques del fichero, conecta con el datanode más cercano para la lectura del primer bloque.
  4. Los datos se leen desde el datanode con llamadas al método read(). Cuando se haya leído el bloque completo, el flujo de entrada cerrará la conexión con el datanode actual y buscará el mejor datanode para el siguiente bloque.
  5. Se repite el paso anterior (siempre de manera transparente para el cliente, el cual solo está leyendo datos desde un flujo de datos continuo).
  6. Cuando el cliente finaliza la lectura, cierra la conexión con el flujo de datos.

Durante la lectura, si el flujo encuentra un error al comunicarse con un datanode (o un error de checksum), intentará el proceso con el siguiente nodo más cercano (además, recordará los nodos que han fallado para no realizar reintentos en futuros bloques y/o informará de los bloque corruptos al namenode)

Namenode sin datos

Recordad que los datos nunca pasan por el namenode. El cliente que realiza la conexión con HDFS es el que hace las operaciones de lectura/escritura directamente con los datanodes. Este diseño permite que HDFS escale de manera adecuada, ya que el tráfico de los clientes se esparce por todos los datanodes de nuestro clúster.

Proceso de escritura

El proceso de escritura en HDFS sigue un planteamiento similar. Vamos a analizar la creación, escritura y cierre de un archivo con la siguiente imagen:

Proceso de escritura
  1. El cliente crea el fichero mediante la llamada al método create() del DistributedFileSystem.
  2. Este realiza una llamada RPC al namenode para crear el fichero en el sistema de ficheros del namenode, sin ningún bloque asociado a él. El namenode realiza varias comprobaciones para asegurar que el fichero no existe previamente y que el usuario tiene los permisos necesarios para su creación. Tras ello, el namenode determina la forma en que va a dividir los datos en bloques y qué datanodes utilizará para almacenar los bloques.
  3. El DistributedFileSystem devuelve un FSDataOutputStream el cual gestiona la comunicación con los datanodes y el namenode para que el cliente comience a escribir los datos de cada bloque en el namenode apropiado.
  4. Conforme el cliente escribe los datos, el flujo obtiene del namenode una lista de datanodes candidatos para almacenar las réplicas. La lista de nodos forman un pipeline, de manera que si el factor de replicación es 3, habrá 3 nodos en el pipeline. El flujo envía los paquete al primer datanode del pipeline, el cual almacena cada paquete y los reenvía al segundo datanode del pipeline. Y así sucesivamente con el resto de nodos del pipeline.
  5. Cuando todos los nodos han confirmado la recepción y almacenamiento de los paquetes, envía un paquete de confirmación al flujo.
  6. Cuando el cliente finaliza con la escritura de los datos, cierra el flujo mediante el método close() el cual libera los paquetes restantes al pipeline de datanodes y queda a la espera de recibir las confirmaciones. Una vez confirmado, le indica al namenode que la escritura se ha completado, informando de los bloques finales que conforman el fichero (puede que hayan cambiado respecto al paso 2 si ha habido algún error de escritura).

HDFS por dentro

HDFS utiliza de un conjunto de ficheros que gestionan los cambios que se producen en el clúster.

Primero entramos en $HADOOP_HOME/etc/hadoop y averiguamos la carpeta de datos que tenemos configurada en hdfs-site.xml para el namenode:

hdfs-site.xml
<property>
    <name>dfs.name.dir</name>
    <value>file:///opt/hadoop-data/hdfs/namenode</value>
</property>

Desde nuestro sistema de archivos, accedemos a dicha carpeta y vemos que existe una carpeta current que contendrá un conjunto de ficheros cuyos prefijos son:

  • edits_000NNN: histórico de cambios que se van produciendo.
  • edits_inprogress_NNN: cambios actuales en memoria que no se han persistido.
  • fsimagen_000NNN: snapshot en el tiempo del sistema de ficheros.
HDFS por dentro

Al arrancar HDFS se carga en memoria el último fichero fsimage disponible junto con los edits que no han sido procesados. Mediante el secondary namenode, cuando se llena un bloque, se irán sincronizando los cambios que se producen en edits_inprogress creando un nuevo fsimage y un nuevo edits.

Así pues, cada vez que se reinicie el namenode, se realizará el merge de los archivos fsimage y edits log.

Trabajando con HDFS

Para interactuar con el almacenamiento desde un terminal, se utiliza el comando hdfs. Este comando admite un segundo parámetro con diferentes opciones.

Antes la duda, es recomendable consultar la documentación oficial

hdfs comando

hadoop fs

HDFS DFS

hadoop fs se relaciona con un sistema de archivos genérico que puede apuntar a cualquier sistema de archivos como local, HDFS, FTP, S3, etc. En versiones anteriores se utilizaba el comando hadoop dfs para acceder a HDFS, pero ya quedado obsoleto en favor de hdfs dfs.

En el caso concreto de interactuar con el sistema de ficheros de Hadoop se utiliza el comando dfs, el cual requiere de otro argumento (empezando con un guión) el cual será uno de los comandos Linux para interactuar con el shell. Podéis consultar la lista de comandos en la documentación oficial.

hdfs dfs -comandosLinux

Por ejemplo, para mostrar todos los archivos que tenemos en el raíz haríamos:

hdfs dfs -ls

Los comandos más utilizados son:

  • put: Coloca un archivo dentro de HDFS
  • get: Recupera un archivo de HDFS y lo lleva a nuestro sistema host.
  • cat / text / head / tail: Visualiza el contenido de un archivo.
  • mkdir / rmdir: Crea / borra una carpeta.
  • count: Cuenta el número de elementos (número de carpetas, ficheros, tamaño y ruta).
  • cp / mv / rm: Copia / mueve-renombra / elimina un archivo.

Autoevaluación

¿Sabes qué realiza cada uno de los siguientes comandos?

hdfs dfs -mkdir /user/iabd/datos
hdfs dfs -put ejemplo.txt /user/iabd/datos/
hdfs dfs -put ejemplo.txt /user/iabd/datos/ejemploRenombrado.txt
hdfs dfs -ls datos
hdfs dfs -count datos
hdfs dfs -mv datos/ejemploRenombrado.txt /user/iabd/datos/otroNombre.json
hdfs dfs -get /datos/otroNombre.json /tmp

Bloques

A continuación vamos a ver cómo trabaja internamente HDFS con los bloques. Para el siguiente ejemplo, vamos a trabajar con un archivo que ocupe más de un bloque, como puede ser El registro de taxis amarillos de Nueva York - Enero 2020.

Comenzaremos creando un directorio dentro de HDFS llamado prueba-hdfs:

hdfs dfs -mkdir /user/iabd/prueba-hdfs

Una vez creado subimos el archivo con los taxis:

hdfs dfs -put yellow_tripdata_2020-01.csv  /user/iabd/prueba-hdfs

Con el fichero subido nos vamos al interfaz gráfico de Hadoop (http://iabd-virtualbox:9870/explorer.html#/), localizamos el archivo y obtenemos el Block Pool ID del block information:

Identificador de bloque

Si desplegamos el combo de block information, podremos ver cómo ha partido el archivo CSV en 5 bloques (566 MB que ocupa el fichero CSV / 128 del tamaño del bloque).

Así pues, con el código del Block Pool Id, podemos confirmar que debe existir el directorio current del datanode donde almacena la información nuestro servidor (en `/opt/hadoop-data/):

ls /opt/hadoop-data/hdfs/datanode/current/BP-481169443-127.0.1.1-1639217848073/current

Dentro de este subdirectorio existe otro finalized, donde Hadoop irá creando una estructura de subdirectorios subdir donde albergará los bloques de datos:

ls /opt/hadoop-data/hdfs/datanode/current/BP-481169443-127.0.1.1-1639217848073/current/finalized/subdir0

Una vez en este nivel, vamos a buscar el archivo que coincide con el block id poniéndole como prefijo blk_:

find -name blk_1073743451

En mi caso devuelve ./subdir6/blk_1073743451. De manera que ya podemos comprobar como el inicio del documento se encuentra en dicho archivo:

head /opt/hadoop-data/hdfs/datanode/current/BP-481169443-127.0.1.1-1639217848073/current/finalized/subdir0/subdir6/blk_1073743451

Administración

Algunas de las opciones más útiles para administrar HDFS son:

  • hdfs dfsadmin -report: Realiza un resumen del sistema HDFS, similar al que aparece en el interfaz web, donde podemos comprobar el estado de los diferentes nodos.
  • hdfs fsck: Comprueba el estado del sistema de ficheros. Si queremos comprobar el estado de un determinado directorio, lo indicamos mediante un segundo parámetro: hdfs fsck /datos/prueba
  • hdfs dfsadmin -printTopology: Muestra la topología, identificando los nodos que tenemos y al rack al que pertenece cada nodo.
  • hdfs dfsadmin -listOpenFiles: Comprueba si hay algún fichero abierto.
  • hdfs dfsadmin -safemode enter: Pone el sistema en modo seguro el cual evita la modificación de los recursos del sistema de archivos.

Snapshots

Mediante las snapshots podemos crear una instantánea que almacena cómo está en un determinado momento nuestro sistema de ficheros, a modo de copia de seguridad de los datos, para en un futuro poder realizar una recuperación.

El primer paso es activar el uso de snapshots, mediante el comando de administración indicando sobre qué carpeta vamos a habilitar su uso:

hdfs dfsadmin -allowSnapshot /user/iabd/datos

El siguiente paso es crear una snapshot, para ello se indica tanto la carpeta como un nombre para la captura (es un comando que se realiza sobre el sistema de archivos):

hdfs dfs -createSnapshot /user/iabd/datos snapshot1

Esta captura se creará dentro de una carpeta oculta dentro de la ruta indicada (en nuestro caso creará la carpeta /user/iabd/datos/.snapshot/snapshot1/ la cual contendrá la información de la instantánea).

A continuación, vamos a borrar uno de los archivo creados anteriormente y comprobar que ya no existe:

hdfs dfs -rm /user/iabd/datos/ejemplo.txt
hdfs dfs -ls /user/iabd/datos

Para comprobar el funcionamiento de los snapshots, vamos a recuperar el archivo desde la captura creada anteriormente.

hdfs dfs -cp \
    /user/iabd/datos/.snapshot/snapshot1/ejemplo.txt \
    /user/iabd/datos

Si queremos saber que carpetas soportan las instantáneas:

hdfs lsSnapshottableDir

Finalmente, si queremos deshabilitar las snapshots de una determinada carpeta, primero hemos de eliminarlas y luego deshabilitarlas:

hdfs dfs -deleteSnapshot /user/iabd/datos snapshot1
hdfs dfsadmin -disallowSnapshot /user/iabd/datos

HDFS UI

En la sesión anterior ya vimos que podíamos acceder al interfaz gráfico de Hadoop (http://iabd-virtualbox:9870/explorer.html#/) y navegar por las carpetas de HDFS.

Si intentamos crear una carpeta o eliminar algún archivo recibimos un mensaje del tipo Permission denied: user=dr.who, access=WRITE, inode="/":iabd:supergroup:drwxr-xr-x. Por defecto, los recursos via web los crea el usuario dr.who.

Error al crear un directorio mediante Hadoop UI

Si queremos habilitar los permisos para que desde este IU podamos crear/modificar/eliminar recursos, podemos cambiar permisos a la carpeta:

hdfs dfs -mkdir /user/iabd/pruebas
hdfs dfs -chmod 777 /user/iabd/pruebas 

Si ahora accedemos al interfaz, sí que podremos trabajar con la carpeta pruebas via web, teniendo en cuenta que las operaciones las realiza el usuario dr.who que pertenece al grupo supergroup.

Otra posibilidad es modificar el archivo de configuración core-site.xml y añadir una propiedad para modificar el usuario estático:

core-site.xml
<property>
    <name>hadoop.http.staticuser.user</name>
    <value>iabd</value>
</property>

Tras reiniciar Hadoop, ya podremos crear los recursos como el usuario iabd.

HDFS y Python

Para el acceso mediante Python a HDFS podemos utilizar la librería HdfsCLI (https://hdfscli.readthedocs.io/en/latest/).

Primero hemos de instalarla mediante pip:

pip install hdfs

Vamos a ver un sencillo ejemplo de lectura y escritura en HDFS:

from hdfs import InsecureClient

# Datos de conexión
HDFS_HOSTNAME = 'iabd-virtualbox'
HDFSCLI_PORT = 9870
HDFSCLI_CONNECTION_STRING = f'http://{HDFS_HOSTNAME}:{HDFSCLI_PORT}'

# En nuestro caso, al no usar Kerberos, creamos una conexión no segura
hdfs_client = InsecureClient(HDFSCLI_CONNECTION_STRING)

# Leemos el fichero de 'El quijote' que tenemos en HDFS
fichero = '/user/iabd/el_quijote.txt'
with hdfs_client.read(fichero) as reader:
    texto = reader.read()

print(texto)

# Creamos una cadena con formato CSV y la almacenamos en HDFS
datos="nombre,apellidos\nAitor,Medrano\nPedro,Casas"
hdfs_client.write("/user/iabd/datos.csv", datos)

En el mundo real, los formatos de los archivos normalmente serán Avro y/o Parquet, y el acceso lo realizaremos en gran medida mediante la librería de Pandas.

Formatos de datos

En el primer bloque ya vimos una pequeña introducción a los diferentes formatos de datos.

Las propiedades que ha de tener un formato de datos son:

  • independiente del lenguaje
  • expresivo, con soporte para estructuras complejas y anidadas
  • eficiente, rápido y reducido
  • dinámico, de manera que los programas puedan procesar y definir nuevos tipos de datos.
  • formato de fichero standalone y que permita dividirlo y comprimirlo.

Para que Hadoop pueda procesar documento, es imprescindible que el formato del fichero permita su división en fragmentos (splittable in chunks).

Si los clasificamos respecto al formato de almacenamiento tenemos:

  • texto (más lentos, ocupan más pero son más expresivos y permiten su interoperabilidad): CSV, XML, JSON, etc...
  • binarios (mejor rendimiento, ocupan menos, menos expresivos): Avro, Parquet, ORC, etc...

Si comparamos los formatos más empleados a partir de las propiedades descritas tenemos:

Característica CSV XML / JSON SequenceFile Avro
Independencia del lenguaje :thumbsup: :thumbsup: :fontawesome-regular-thumbs-down: :thumbsup:
Expresivo :fontawesome-regular-thumbs-down: :thumbsup: :thumbsup: :thumbsup:
Eficiente :fontawesome-regular-thumbs-down: :fontawesome-regular-thumbs-down: :thumbsup: :thumbsup:
Dinámico :thumbsup: :thumbsup: :fontawesome-regular-thumbs-down: :thumbsup:
Standalone :grey_question: :thumbsup: :fontawesome-regular-thumbs-down: :thumbsup:
Dividible :grey_question: :grey_question: :thumbsup: :thumbsup:

Las ventajas de elegir el formato correcto son:

  • Mayor rendimiento en la lectura y/o escritura
  • Ficheros trozeables (splittables)
  • Soporte para esquemas que evolucionan
  • Soporte para compresión de los datos (por ejemplo, mediante Snappy).

Filas vs Columnas

Los formatos con los que estamos más familiarizados, como son CSV o JSON, se basan en filas, donde cada registro se almacena en una fila o documento. Estos formatos son más lentos en ciertas consultas y su almacenamiento no es óptimo.

En un formato basado en columnas, cada fila almacena toda la información de una columna. Al basarse en columnas, ofrece mejor rendimiento para consultas de determinadas columnas y/o agregaciones, y el almacenamiento es más óptimo (como todos los datos de una columna son del mismo tipo, la compresión es mayor).

Supongamos que tenemos los siguientes datos:

Ejemplo de tabla

Dependiendo del almacenamiento en filas o columnas tendríamos la siguiente representación:

Comparación filas y columnas

En un formato columnas los datos del mismo tipo se agrupan, lo que mejor el rendimiento de acceso y reduce el tamaño:

Comparación filas y columnas

El artículo Apache Parquet: How to be a hero with the open-source columnar data format compara un formato basado en filas, como CSV, con uno basado en columnas como Parquet, en base al tiempo y el coste de su lectura en AWS (por ejemplo, AWS Athena cobra 5$ por cada TB escaneado):

Comparación CSV y Parquet

En la tabla podemos observar como 1TB de un fichero CSV en texto plano pasa a ocupar sólo 130GB mediante Parquet, lo que provoca que las posteriores consultas tarden menos y, en consecuencia, cuesten menos.

En la siguiente tabla comparamos un fichero CSV compuesto de cuatro columnas almacenado en S3 mediante tres formatos:

Comparación filas y columnas

Queda claro que la elección del formato de los datos y la posibilidad de elegir el formato dependiendo de sus futuros casos de uso puede conllevar un importante ahorro en tiempo y costes.

Avro

Logo de Apache Avro

Apache Avro es un formato de almacenamiento basado en filas para Hadoop, utilizado para la serialización de datos, ya que es más rápido y ocupa menos espacio que JSON, debido a que la serialización de los datos se realiza en un formato binario compacto.

Avro se basa en esquemas, los cuales se realizan mediante JSON para definir los tipos de datos y protocolos. Cuando los datos .avro son leídos siempre está presente el esquema con el que han sido escritos.

Schema

Los esquemas se componen de tipos primitivos (null, boolean, int, long, float, double, bytes, y string) y compuestos (record, enum, array, map, union, y fixed).

Un ejemplo de esquema podría ser:

empleado.avsc
{
   "type" : "record",
   "namespace" : "SeveroOchoa",
   "name" : "Empleado",
   "fields" : [
      { "name" : "Nombre" , "type" : "string" },
      { "name" : "Altura" , "type" : "float" }
      { "name" : "Edad" , "type" : "int" }
   ]
}

Python

Para poder serializar y deserializar documentos Avro mediante Python, previamente debemos instalar la librería avro:

pip install avro-python3
# o si utilizamos Anaconda
conda install -c conda-forge avro-python3

Vamos a realizar un ejemplo donde primero leemos un esquema de un archivo Avro, y con dicho esquema, escribiremos nuevos datos en un fichero. A continuación, abrimos el fichero escrito y leemos y mostramos los datos:

import avro
import copy
import json
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter

# abrimos el fichero en modo binario y leemos el esquema
schema = avro.schema.parse(open("empleado.avsc", "rb").read())

# escribimos un fichero a partir del esquema leído
with open('empleados.avro', 'wb') as f:
    writer = DataFileWriter(f, DatumWriter(), schema)
    writer.append({"nombre": "Carlos", "altura": 180, "edad": 44})
    writer.append({"nombre": "Juan", "altura": 175})
    writer.close()

# abrimos el archivo creado, lo leemos y mostramos línea a línea
with open("empleados.avro", "rb") as f:
    reader = DataFileReader(f, DatumReader())
    # copiamos los metadatos del fichero leído
    metadata = copy.deepcopy(reader.meta)
    # obtenemos el schema del fichero leído
    schemaFromFile = json.loads(metadata['avro.schema'])
    # recuperamos los empleados
    empleados = [empleado for empleado in reader]
    reader.close()

print(f'Schema de empleado.avsc:\n {schema}')
print(f'Schema del fichero empleados.avro:\n {schemaFromFile}')
print(f'Empleados:\n {empleados}')
Schema de empleado.avsc:
{"type": "record", "name": "empleado", "namespace": "SeveroOchoa", "fields": [{"type": "string", "name": "nombre"}, {"type": "int", "name": "altura"}, {"type": ["null", "int"], "name": "edad", "default": null}]}
Schema del fichero empleados.avro:
{'type': 'record', 'name': 'empleado', 'namespace': 'SeveroOchoa', 'fields': [{'type': 'string', 'name': 'nombre'}, {'type': 'int', 'name': 'altura'}, {'type': ['null', 'int'], 'name': 'edad', 'default': None}]}
Empleados:
[{'nombre': 'Carlos', 'altura': 180, 'edad': 44}, {'nombre': 'Juan', 'altura': 175, 'edad': None}]

Fastavro

Para trabajar con Avro y grandes volúmenes de datos, se utiliza la librería Fastavro (https://github.com/fastavro/fastavro) la cual ofrece un rendimiento mucho mejor (en vez de estar codificada en Python puro, tiene algunos fragmentos realizados mediante Cython).

Primero, hemos de instalar la librería:

pip install fastavro
# o si utilizamos Anaconda
conda install -c conda-forge fastavro

Como podéis observar a continuación, hemos repetido el ejemplo y el código es muy similar:

import fastavro
import copy
import json
from fastavro import reader

# abrimos el fichero en modo binario y leemos el esquema
with open("empleado.avsc", "rb") as f:
    schemaJSON = json.load(f)
schemaDict = fastavro.parse_schema(schemaJSON)

empleados = [{"nombre": "Carlos", "altura": 180, "edad": 44},
            {"nombre": "Juan", "altura": 175}]

# escribimos un fichero a partir del esquema leído
with open('empleadosf.avro', 'wb') as f:
    fastavro.writer(f, schemaDict, empleados)

# abrimos el archivo creado, lo leemos y mostramos línea a línea
with open("empleadosf.avro", "rb") as f:
    reader = fastavro.reader(f)
    # copiamos los metadatos del fichero leído
    metadata = copy.deepcopy(reader.metadata)
    # obtenemos el schema del fichero leído
    schemaReader = copy.deepcopy(reader.writer_schema)
    schemaFromFile = json.loads(metadata['avro.schema'])
    # recuperamos los empleados
    empleados = [empleado for empleado in reader]

print(f'Schema de empleado.avsc:\n {schemaDict}')
print(f'Schema del fichero empleadosf.avro:\n {schemaFromFile}')
print(f'Empleados:\n {empleados}')
Schema de empleado.avsc:
{'type': 'record', 'name': 'SeveroOchoa.empleado', 'fields': [{'name': 'nombre', 'type': 'string'}, {'name': 'altura', 'type': 'int'}, {'default': None, 'name': 'edad', 'type': ['null', 'int']}], '__fastavro_parsed': True, '__named_schemas': {'SeveroOchoa.empleado': {'type': 'record', 'name': 'SeveroOchoa.empleado', 'fields': [{'name': 'nombre', 'type': 'string'}, {'name': 'altura', 'type': 'int'}, {'default': None, 'name': 'edad', 'type': ['null', 'int']}]}}}
Schema del fichero empleadosf.avro:
{'type': 'record', 'name': 'SeveroOchoa.empleado', 'fields': [{'name': 'nombre', 'type': 'string'}, {'name': 'altura', 'type': 'int'}, {'default': None, 'name': 'edad', 'type': ['null', 'int']}]}
Empleados:
[{'nombre': 'Carlos', 'altura': 180, 'edad': 44}, {'nombre': 'Juan', 'altura': 175, 'edad': None}]

Fastavro y Pandas

Finalmente, vamos a realizar un último ejemplo con las dos librerías más utilizadas.

Vamos a leer un fichero CSV de ventas (que ya utilizamos en las sesiones de Pentaho) mediante Pandas, y tras limpiar los datos y quedarnos únicamente con las ventas de Alemania, almacenaremos el resultado del procesamiento en Avro.

import pandas as pd
from fastavro import writer, parse_schema

# Leemos el csv mediante pandas
df = pd.read_csv('pdi_sales.csv',sep=';')
# Limpiamos los datos (strip a los códigos postales) y nos quedamos con Alemania
df['Zip'] = df['Zip'].str.strip()
filtro = df.Country=="Germany"
df = df[filtro]

# 1. Definimos el esquema
schema = {
    'name': 'Sales',
    'namespace' : 'SeveroOchoa',
    'type': 'record',
    'fields': [
        {'name': 'ProductID', 'type': 'int'},
        {'name': 'Date', 'type': 'string'},
        {'name': 'Zip', 'type': 'string'},
        {'name': 'Units', 'type': 'int'},
        {'name': 'Revenue', 'type': 'float'},
        {'name': 'Country', 'type': 'string'}
    ]
}
schemaParseado = parse_schema(schema)

# 2. Convertimos el dataframe a una lista de diccionarios
records = df.to_dict('records')

# 3. Persistimos en un fichero avro
with open('sales.avro', 'wb') as f:
    writer(f, schemaParseado, records)
import pandas as pd
from fastavro import parse_schema
from hdfs import InsecureClient
from hdfs.ext.avro import AvroWriter
from hdfs.ext.dataframe import write_dataframe

# 1. Nos conectamos a HDFS
HDFS_HOSTNAME = 'iabd-virtualbox'
HDFSCLI_PORT = 9870
HDFSCLI_CONNECTION_STRING = f'http://{HDFS_HOSTNAME}:{HDFSCLI_PORT}'
hdfs_client = InsecureClient(HDFSCLI_CONNECTION_STRING)

# 2. Leemos el dataframe
with hdfs_client.read('/user/iabd/pdi_sales.csv') as reader:
    df = pd.read_csv(reader,sep=';')

# Limpiamos los datos (strip a los códigos postales) y nos quedamos con Alemania
df['Zip'] = df['Zip'].str.strip()
filtro = df.Country=="Germany"
df = df[filtro]

# 3. Definimos el esquema
schema = {
    'name': 'Sales',
    'namespace' : 'SeveroOchoa',
    'type': 'record',
    'fields': [
        {'name': 'ProductID', 'type': 'int'},
        {'name': 'Date', 'type': 'string'},
        {'name': 'Zip', 'type': 'string'},
        {'name': 'Units', 'type': 'int'},
        {'name': 'Revenue', 'type': 'float'},
        {'name': 'Country', 'type': 'string'}
    ]
}
schemaParseado = parse_schema(schema)

# 4a. Persistimos en un fichero avro dentro de HDFS mediante la extension AvroWriter de hdfs
with AvroWriter(hdfs_client, '/user/iabd/sales.avro', schemaParseado) as writer:
    records = df.to_dict('records') # diccionario
    for record in records:
        writer.write(record)

# 4b. O directamente persistimos el dataframe mediante la extension write_dataframe de hdfs
write_dataframe(hdfs_client, '/user/iabd/sales2.avro', df)  # infiere el esquema
write_dataframe(hdfs_client, '/user/iabd/sales3.avro', df, schema=schemaParseado)

Para el acceso HDFS hemos utilizados las extensiones Fastavro y Pandas de la librería HDFS del apartado anterior.

Comprimiendo los datos

¿Y sí comprimimos los datos para ocupen menos espacio en nuestro clúster y por tanto, nos cuesten menos dinero?

Fastavro soporta dos tipos de compresión: gzip (mediante el algoritmo deflate) y snappy. Snappy es una biblioteca de compresión y descompresión de datos de gran rendimiento que se utiliza con frecuencia en proyectos Big Data, la cual hemos de instalar previamente mediante pip install python-snappy.

Para indicar el tipo de compresión, únicamente hemos de añadir un parámetros extra con el algoritmo de compresión en la función/constructor de persistencia:

writer(f, schemaParseado, records, 'deflate')
with AvroWriter(hdfs_client, '/user/iabd/sales.avro', schemaParseado, 'snappy') as writer:
write_dataframe(hdfs_client, '/user/iabd/sales3.avro', df, schema=schemaParseado, codec='snappy')

Comparando algoritmos de compresión

Respecto a la compresión, sobre un fichero de 100GB, podemos considerar media si ronda los 50GB y alta si baja a los 40GB.

Algoritmo Velocidad Compresión
Gzip Media Media
Bzip2 Lenta Alta
Snappy Alta Media

Más que un tema de espacio, necesitamos que los procesos sean eficientes y por eso priman los algoritmos que son más rápidos. Si te interesa el tema, es muy interesante el artículo Data Compression in Hadoop.

Por ejemplo, si realizamos el ejemplo de Fast Avro y Pandas con acceso local obtenemos los siguientes tamaños:

  • Sin compresión: 6,9 MiB
  • Gzip: 1,9 MiB
  • Snappy: 2,8 MiB

Parquet

Logo de Apache Parquet

Apache Parquet es un formato de almacenamiento basado en columnas para Hadoop, con soporte para todos los frameworks de procesamiento de datos, así como lenguajes de programación. De la misma forma que Avro, se trata de un formato de datos auto-descriptivo, de manera que embebe el esquema o estructura de los datos con los propios datos en sí. Parquet es idóneo para analizar datasets que contienen muchas columnas.

Formato de un archivo Parquet

Cada fichero Parquet almacena los datos en binario organizados en grupos de filas. Para cada grupo de filas (row group), los valores de los datos se organizan en columnas, lo que facilita la compresión a nivel de columna.

La columna de metadatos de un fichero Parquet se almacena al final del fichero, lo que permite que las escrituras sean rápidas con una única pasada. Los metadatos pueden incluir información como los tipos de datos, esquemas de codificación/compresión, estadísticas, nombre de los elementos, etc...

Parquet y Python

Para interactuar con el formato Parquet mediante Python, la librería más utilizada es la que ofrece Apache Arrow, en concreto la librería PyArrow.

Así pues, la instalamos mediante pip:

pip install pyarrow

Apache Arrow usa un tipo de estructura denominada tabla para almacenar los datos bidimentsional (sería muy similar a un dataframe de Pandas). La documentación de PyArrow dispone de un libro de recetas con ejemplos con código para los diferentes casos de uso que se nos puedan plantear.

Vamos a simular el mismo ejemplo que hemos realizado previamente mediante Avro, y vamos a crear un fichero en formato JSON con empleados, y tras persistirlo en formato Parquet, lo vamos a recuperar:

dict-parquet.py
import pyarrow.parquet as pq
import pyarrow as pa

# 1.- Definimos el esquema
schema = pa.schema([ ('nombre', pa.string()),
                    ('altura', pa.int32()),
                    ('edad', pa.int32())  ])

# 2.- Almacenamos los empleados por columnas
empleados = {"nombre": ["Carlos", "Juan"],
            "altura": [180, 44],
            "edad": [None, 34]}

# 3.- Creamos una tabla Arrow y la persistimos mediante Parquet
tabla = pa.Table.from_pydict(empleados, schema)
pq.write_table(tabla, 'empleados.parquet')

# 4.- Leemos el fichero generado
table2 = pq.read_table('empleados.parquet')
schemaFromFile = table2.schema

print(f'Schema del fichero empleados.parquet:\n{schemaFromFile}\n')
print(f'Tabla de Empleados:\n{table2}')

Para que pyarrow pueda leer los empleados como documentos JSON, a día de hoy sólo puede hacerlo leyendo documentos individuales almacenados en fichero:

Por lo tanto, creamos el fichero empleados.json con la siguiente información:

empleados.json
{ "nombre": "Carlos", "altura": 180, "edad": 44 }
{ "nombre": "Juan", "altura": 175 }

De manera que podemos leer los datos JSON y persistirlos en Parquet del siguiente modo:

json-parquet.py
import pyarrow.parquet as pq
import pyarrow as pa
from pyarrow import json

# 1.- Definimos el esquema
schema = pa.schema([ ('nombre', pa.string()),
                    ('altura', pa.int32()),
                    ('edad', pa.int32())  ])

# 2.- Leemos los empleados
tabla = json.read_json("empleados.json")
# 3.- Persistimos la tabla en Parquet
pq.write_table(tabla, 'empleados-json.parquet')

# 4.- Leemos el fichero generado
table2 = pq.read_table('empleados-json.parquet')
schemaFromFile = table2.schema

print(f'Schema del fichero empleados-json.parquet:\n{schemaFromFile}\n')
print(f'Tabla de Empleados:\n{table2}')

En ambos casos obtendríamos algo similar a:

Schema del fichero empleados.parquet:
nombre: string
altura: int32
edad: int32

Tabla de Empleados:
pyarrow.Table
nombre: string
altura: int32
edad: int32
----
nombre: [["Carlos","Juan"]]
altura: [[180,44]]
edad: [[null,34]]

Parquet y Pandas

En el caso del uso de Pandas el código todavía se simplifica más. Si reproducimos el mismo ejemplo que hemos realizado con Avro tenemos que los Dataframes ofrecen el método to_parquet para exportar a un fichero Parquet:

csv-parquet.py
import pandas as pd

df = pd.read_csv('pdi_sales.csv',sep=';')

df['Zip'] = df['Zip'].str.strip()
filtro = df.Country=="Germany"
df = df[filtro]

# A partir de un DataFrame, persistimos los datos
df.to_parquet('sales.parquet')

Si quisiéramos almacenar el archivo directamente en HDFS, necesitamos indicarle a Pandas la dirección del sistema de archivos que tenemos configurado en core-site.xml:

core-site.ml
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://iabd-virtualbox:9000</value>
</property>

Así pues, únicamente necesitamos modificar el nombre del archivo donde serializamos los datos a Parquet:

df.to_parquet('hdfs://iabd-virtualbox:9000/sales.parquet')

Comparando tamaños

Si comparamos los tamaños de los archivos respecto al formato de datos empleado con únicamente las ventas de Alemania tendríamos:

  • ger_sales.csv: 9,7 MiB
  • ger_sales.avro: 6,9 MiB
    • ger_sales-gzip.avro: 1,9 MiB
    • ger_sales-snappy.avro: 2,8 MiB
  • ger_sales.parquet: 2,3 MiB
    • ger_sales-gzip.parquet: 1,6 MiB
    • ger_sales-snappy.parquet: 2,3 MiB

Hue

Hue (Hadoop User Experience) es una interfaz gráfica de código abierto basada en web para su uso con Apache Hadoop. Hue actúa como front-end para las aplicaciones que se ejecutan en el clúster, lo que permite interactuar con las aplicaciones mediante una interfaz más amigable que el interfaz de comandos.

En nuestra máquina virtual ya lo tenemos instalado y configurado para que funcione con HDFS y Hive.

La ruta de instalación es /opt/hue-4.10.0 y desde allí, arrancaremos Hue:

./build/env/bin/hue runserver

Tras arrancarlo, nos dirigimos a http://127.0.0.1:8000/y visualizaremos el formulario de entrada, el cual entraremos con el usuario iabd y la contraseña iabd:

Login en Hue

Una vez dentro, por ejemplo, podemos visualizar e interactuar con HDFS:

HDFS en Hue

Referencias

Actividades

Para los siguientes ejercicios, copia el comando y/o haz una captura de pantalla donde se muestre el resultado de cada acción.

  1. Explica paso a paso el proceso de lectura (indicando qué bloques y los datanodes empleados) que realiza HDFS si queremos leer el archivo /logs/101213.log:

    Proceso de lectura HDFS

  2. En este ejercicio vamos a practicar los comandos básicos de HDFS. Una vez arrancado Hadoop:

    1. Crea la carpeta /user/iabd/ejercicios.
    2. Sube el archivo el_quijote.txt a la carpeta creada.
    3. Crea una copia en HDFS y llámala el_quijote2.txt.
    4. Recupera el principio del fichero el_quijote2.txt.
    5. Renombra el_quijote2.txt a el_quijote_copia.txt.
    6. Adjunta una captura desde el interfaz web donde se vean ambos archivos.
    7. Vuelve al terminal y elimina la carpeta con los archivos contenidos mediante un único comando.
  3. (opcional) Vamos a practicar los comandos de gestión de instantáneas y administración de HDFS. Para ello:

    1. Crea la carpeta /user/iabd/instantaneas.
    2. Habilita las snapshots sobre la carpeta creada.
    3. Sube el archivo el_quijote.txt a la carpeta creada.
    4. Crea una copia en HDFS y llámala el_quijote_snapshot.txt.
    5. Crea una instantánea de la carpeta llamada ss1.
    6. Elimina ambos ficheros del quijote.
    7. Comprueba que la carpeta está vacía.
    8. Recupera desde ss el archivo el_quijote.txt.
    9. Crea una nueva instantánea de la carpeta llamada ss2.
    10. Muestra el contenido de la carpeta /user/iabd/instantaneas así como de sus snapshots.
  4. (opcional) HDFS por dentro

    1. Accede al archivo de configuración hdfs-site.xml y averigua la carpeta donde se almacena el namenode.
    2. Muestra los archivos que contiene la carpeta current dentro del namenode
    3. Comprueba el id del archivo VERSION.
    4. En los siguientes pasos vamos a realizar un checkpoint manual para sincronizar el sistema de ficheros. Para ello entramos en modo safe con el comando hdfs dfsadmin -safemode enter, de manera que impedamos que se trabaje con el sistema de ficheros mientras lanzamos el checkpoint.
    5. Comprueba mediante el interfaz gráfico que el modo seguro está activo (Safe mode is ON).
    6. Ahora realiza el checkpoint con el comando hdfs dfsadmin -saveNamespace
    7. Vuelve a entrar al modo normal (saliendo del modo seguro mediante hdfs dfsadmin -safemode leave)
    8. Accede a la carpeta del namenode y comprueba que los fsimage del namenode son iguales.
  5. Mediante Python, carga los datos de los taxis que hemos almacenado en HDFS en el apartado de Bloques y crea dentro de /user/iabd/datos los siguientes archivos con el formato adecuado:

    • taxis.avro: la fecha (tpep_pickup_datetime), el VendorID y el coste de cada viaje (total_amount)
    • (opcional) taxis.parquet con los mismos atributos.