Saltar a contenido

Hive I

Apache Hive (https://hive.apache.org/) es una tecnología distribuida diseñada y construida sobre un clúster de Hadoop. Permite leer, escribir y gestionar grandes datasets (con escala de petabytes) que residen en HDFS haciendo uso de un lenguaje dialecto de SQL, conocido como HiveSQL, lo que simplifica mucho el desarrollo y la gestión de Hadoop.

Logo de Apache Hive
Logo de Apache Hive

El proyecto lo inició Facebook para conseguir que la interacción con Hadoop fuera similar a la que se realiza con un datawarehouse tradicional. La tecnología Hadoop es altamente escalable, aunque hay que destacar su dificultad de uso y que está orientado únicamente a operaciones batch, con lo que no soporta el acceso aleatorio ni está optimizado para ficheros pequeños.

Hive y Hadoop

Si volvemos a ver como casa Hive dentro del ecosistema de Hadoop, Hive es una fachada construida sobre Hadoop que permite acceder a los datos almacenados en HDFS de forma muy sencilla sin necesidad de conocer Java, Map Reduce u otras tecnologías.

Aunque en principio estaba diseñado para el procesamiento batch, ahora se integra con frameworks en streaming como Tez y Spark.

Ecosistema Hadoop
Ecosistema Hadoop

Características

Hive impone una estructura sobre los datos almacenados en HDFS. Esta estructura se conoce como Schema, y Hive la almacena en su propia base de datos (metastore). Gracias a ella, optimiza de forma automática el plan de ejecución y usa particionado de tablas en determinadas consultas. También soporta diferentes formatos de ficheros, codificaciones y fuentes de datos como HBase.

Para interactuar con Hive utilizaremos HiveQL, el cual es un dialecto de SQL (recuerda que SQL no es sensible a las mayúsculas, excepto en la comparación de cadenas).

Hive amplía el paradigma de SQL incluyendo formatos de serialización. También podemos personalizar el procesamiento de consultas creando un esquema de tabla acorde con nuestros datos, pero sin tocar los datos. Aunque SQL solo es compatible con tipos de valor primitivos (como fechas, números y cadenas), los valores de las tablas de Hive son elementos estructurados, por ejemplo, objetos JSON o cualquier tipo de datos definido por el usuario o cualquier función escrita en Java.

Una consulta típica en Hive se ejecuta en varios datanodes en paralelo, con varios trabajos MapReduce asociados. Estas operaciones son de tipo batch, por lo que la latencia es más alta que en otros tipos de bases de datos. Además, hay que considerar el retardo producido por la inicialización de los trabajos, sobre todo en el caso de consultar pequeños datasets.

Ventajas

Las ventajas de utilizar Hive son:

  • Reduce la complejidad de la programación MapReduce al usar HiveQL como lenguaje de consulta.
  • Está orientado a aplicaciones de tipo Data Warehouse, con datos estáticos, poco cambiantes y sin requisitos de tiempos de respuesta rápidos.
  • Permite a los usuarios despreocuparse de en qué formato y dónde se almacenan los datos.
  • Incorpora Beeline: una herramienta por línea de comandos para realizar consultas con HiveQL.

En cambio, Hive no es la mejor opción para consultas en tiempo real o de tipo transaccional. Además, no está diseñado para usarse con actualizaciones de valores al nivel de registro, y el soporte de SQL es limitado.

Alternativas

Una de las alternativas más populares es Apache Impala, originalmente creado por Cloudera, el cual utiliza un demonio dedicado en cada datanode del clúster, de manera que hay un coordinador que reenvía a cada datanode la consulta a realizar y luego se encarga de unir los datos en el resultado final. Impala utiliza el metastore de Hive y soporta la mayoría de construcciones de Hive, con lo que la migración de un sistema a otro es sencilla.

En lugar de utilizar MapReduce, aprovecha un motor de procesamiento masivo en paralelo (MPP) como el que existe en los sistemas gestores de bases de datos relacionales (RDBMS). Esta arquitectura hace que Impala sea adecuado para análisis interactivos y de baja latencia.

Otras alternativas open source son :

  • Presto de Facebook y Apache Drill, con arquitecturas muy similares a Impala.
  • Spark SQL: utiliza Spark como motor de ejecución y permite utilizar consultas SQL embebidas. La estudiaremos en sesiones futuras.

Pig

Apache Pig es una herramienta que abstrae el acceso a MapReduce de forma similar a como lo realiza Hive, pero en vez de SQL, utiliza su propio lenguaje de scripting (PigLatin) para expresar los flujos de datos. Actualmente ha perdido uso en detrimento de Hive/Impala y de Spark.

Tenéis una pequeña introducción en https://www.analyticsvidhya.com/blog/2021/08/an-introduction-to-apache-pig-for-absolute-beginners/.

Componentes

A continuación podemos ver un gráfico que relaciona los diferentes componentes de Hive y define su arquitectura:

Arquitectura de Apache Hive
Arquitectura de Apache Hive

Hive Server

HiveServer 2 (HS2) es la última versión del servicio. Se compone de una interfaz que permite a clientes externos ejecutar consultas contra Apache Hive y obtener los resultados. Está basado en Thrift RPC y soporta clientes concurrentes. Para arrancar el servidor, ejecutaremos el comando hiveserver2, el cual quedará a la escucha en el puerto 10000.

iabd@iabd-virtualbox:~$ hiveserver2
2023-01-20 09:39:51: Starting HiveServer2
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: ...
Hive Session ID = 9e39b0c8-45a6-46ca-bfb0-6e320c85f989

A este servidor nos conectaremos mediante la herramienta Beeline (Beeline CLI).

Hive Metastore

Es el repositorio central para los metadatos de Hive, y se almacena en una base de datos relacional como MySQL, PostgreSQL o Apache Derby (embebida). Mantiene los metadatos, las tablas y sus tipos mediante Hive DDL (Data Definition Language). Además, el sistema se puede configurar para que también almacene estadísticas de las operaciones y registros de autorización para optimizar las consultas.

En las últimas versiones de Hive, este componente se puede desplegar de forma remota e independiente, para no compartir la misma JVM con HiveServer. Dentro del metastore podemos encontrar el Hive Catalog (HCatalog), que permite acceder a sus metadatos, actuando como una API. Al poder desplegarse de forma aislada e independiente, permite que otras aplicaciones hagan uso del schema sin tener que desplegar el motor de consultas de Hive.

Configuraciones del Metastore de Apache Hive
Configuraciones del Metastore de Apache Hive

En nuestra máquina, inicialmente tenemos instalada una versión local que permite varias conexiones simultáneas utilizando MySQL como almacén de los metadatos, aunque en el caso de uso 7 necesitaremos configurarlo como remoto para poder acceder a él desde Flume.

Así pues, al Metastore podremos acceder mediante HiveCLI, o a través del Hive Server mediante una conexión remota mediante Beeline.

Metastore remoto

Para configurar el metastore en remoto, necesitamos modificar la propiedad hive.metastore.uris del archivo hive-site.xml situado en $HIVE_HOME/conf:

hive-site.xml
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://iabd-virtualbox:9083</value>
</property>

Una vez configurado, volvemos a arrancar Hive mediante hiveserver2 y a continuación, cada vez que arranquemos Hive, necesitamos también arrancar el Metastore, ya que ahora funciona como un servicio totalmente desacoplado:

hive --service metastore

En la sesión de Spark Catalog veremos cómo desde una herramienta externa a Hive, se accede y utilizan los metadatos almacenados.

Beeline

Hive incorpora Beeline, el cual actúa como un cliente basado en JDBC para hacer consultas por línea de comandos contra el Hive Server, sin necesitar las dependencias de Hive, mediante el comando beeline:

iabd@iabd-virtualbox:~$ beeline
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: ...
Beeline version 3.1.2 by Apache Hive
beeline> 

Por otro lado, también podemos utilizar Hive CLI, un cliente basado en Apache Thrift, que usa los mismos drivers que Hive.

Apache Tez y Spark

Hive 3 deja de soportar MapReduce. Apache Tez lo reemplaza como el motor de ejecución por defecto, de manera que mejora el rendimiento y se ejecuta sobre Hadoop Yarn, que encola y planifica los trabajos en el clúster. Además de Tez, Hive también puede utilizar Apache Spark como motor de ejecución.

Para indicar que queremos ejecutar Tez como motor de ejecución, ejecutaríamos el siguiente comando:

SET hive.execution.engine=tez;

En nuestro caso no tenemos Tez instalado en la máquina virtual, quedando fuera del alcance del presente curso.

Conviene distinguir que Spark SQL no tiene nada que ver con utilizar el motor de ejecución de Spark dentro de Hive. Al utilizar el motor de Hive, podemos hacer un uso completo de las características de Hive, mientras que Spark SQL es un motor diferente que ofrece una alta compatibilidad con Hive, pero tiene sus particularidades.

Tipos de datos

Los tipos de datos que podemos emplear en Hive son muy similares a los que se utilizan en el DDL de SQL. Los tipos simples más comunes son STRING e INT, aunque podemos utilizar otros tipos como TINYINT, BIGINT, DOUBLE, DATE, TIMESTAMP, etc...

Para realizar una conversión explicita de tipos, por ejemplo de un tipo texto a uno numérico, hay que utilizar la función CAST:

select CAST('1' as INT) from tablaPruebas;

Respecto a los tipos compuestos, tenemos tres tipos:

  • arrays mediante el tipo ARRAY, para agrupar elementos del mismo tipo: ["manzana", "pera", "naranja].
  • mapas mediante el tipo MAP, para definir parejas de clave-valor: {1: "manzana", 2: "pera"}
  • estructuras mediante el tipo STRUCT, para definir estructuras con propiedades: {"fruta": "manzana", "cantidad": 1, "tipo": "postre"}.

Instalación y configuración

Máquina virtual

Los siguientes pasos no son necesarios ya que nuestra máquina virtual ya tiene Hive instalado y configurado correctamente. Si quieres hacer tu propia instalación sigue los siguientes pasos de la documentación oficial.

Una vez instalado vamos a configurarlo. Para ello, debemos crear los ficheros de configuración a partir de las plantilla que ofrece Hive. Para ello, desde la carpeta $HIVE_HOME/conf, ejecutaremos los siguientes comandos:

cp hive-default.xml.template hive-site.xml
cp hive-env.sh.template hive-env.sh
cp hive-exec-log4j2.properties.template hive-exec-log4j2.properties
cp hive-log4j2.properties.template hive-log4j2.properties

Modificamos el fichero hive.env.sh para incluir dos variables de entorno con las rutas de Hadoop y la configuración de Hive

hive.env.sh
export HADOOP_HOME=/opt/hadoop-3.3.1
export HIVE_CONF_DIR=/opt/hive-3.1.2/conf
export HCAT_HOME=$HIVE_HOME/hcatalog

Para que funcione la ingesta de datos en Hive mediante Sqoop, necesitamos añadir una librería a Sqoop:

cp $HIVE_HOME/lib/hive-common-3.1.2.jar $SQOOP_HOME/lib

Y configuramos las variables de entorno de nuestro path para exportar las rutas de Hive y del metastore:

~/.bashrc
export HIVE_HOME=/opt/hive-3.1.2
export HIVE_CONF_DIR=$HIVE_HOME/conf
export HCAT_HOME=$HIVE_HOME/hcatalog

A continuación, preparamos HDFS para crear la estructura de archivos:

hdfs dfs -mkdir /tmp
hdfs dfs -mkdir -p /user/hive/warehouse
hdfs dfs -chmod g+w /tmp
hdfs dfs -chmod g+w /user/hive/warehouse

Para el metastore, como en nuestra máquina virtual tenemos un servidor de MariaDB corriendo, tal como hemos comentado previamente, vamos a reutilizarlo para tener una instalación que permite la conexión de más una sesión de forma simultánea. La mayoría de ejemplos que hay en internet y la diferente bibliografía utilizan DerbyDB como almacén (ya que no requiere una instalación extra, pero solo permiten una única conexión). Así pues, creamos el almacén mediante:

schematool -dbType mysql -initSchema

Modificamos el fichero de configuración hive-site.xml y configuramos :

hive-site.xml
<!-- nuevas propiedades -->
<property>
  <name>system:java.io.tmpdir</name>
  <value>/tmp/hive/java</value>
</property>
<property>
  <name>system:user.name</name>
  <value>${user.name}</value>
</property>
<property>
  <name>datanucleus.schema.autoCreateAll</name>
  <value>true</value>
</property>
<!-- propiedades existentes a modificar -->
<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>iabd</value>
</property>
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>iabd</value>
</property>

Hola Mundo

Si entramos a nuestro $HIVE_HOME podemos comprobar con tenemos las siguientes herramientas:

  • hive: Herramienta cliente
  • beeline: Otra herramienta cliente
  • hiserver2: Nos permite arrancar el servidor de Hive
  • schematool: Nos permite trabajar contra la base de datos de metadatos (Metastore)

Una vez arrancado Hadoop y YARN, podemos arrancar Hive mediante el cliente local:

hive

Y una vez dentro, comprobar las bases de datos existentes para ver que todo se configuró correctamente

show databases;

Si quisiéramos ejecutar un script, podemos hacerlo desde el propio comando hive con la opción -f:

hive -f script.sql

Además, tenemos la opción de pasar una consulta desde la propia línea de comandos mediante la opción -e. Supongamos que queremos recuperar los datos de la tabla categories de la base de datos iabd haríamos:

hive -e 'select * from iabd.categories'

Acceso remoto

HiveServer2 (desde Hive 0.11) tiene su propio cliente conocido como Beeline. En entornos reales, el cliente Hive está en desuso a favor de Beeline, por la falta de múltiples usuarios, seguridad y otras características de HiveServer2.

Arrancamos HiveServer2 (lo hará en el puerto 10000) y Beeline en dos pestañas diferentes mediante los comandos hiveserver2 y beeline:

hiveserver2

Esperamos a que arranque completamente y a continuación:

beeline

Una vez dentro de Beeline, puede que nos toque volver a unos segundos a que HiveServer2 haya arrancando completamente, nos conectamos al servidor:

!connect jdbc:hive2://iabd-virtualbox:10000

Al conectarnos, tras introducir iabd como usuario y contraseña, obtendremos un interfaz similar al siguiente:

Beeline version 3.1.2 by Apache Hive
beeline> !connect jdbc:hive2://iabd-virtualbox:10000
Connecting to jdbc:hive2://iabd-virtualbox:10000
Enter username for jdbc:hive2://iabd-virtualbox:10000: iabd
Enter password for jdbc:hive2://iabd-virtualbox:10000: ****
Connected to: Apache Hive (version 3.1.2)
Driver: Hive JDBC (version 3.1.2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://iabd-virtualbox:10000>

Beeline y conexión a la vez

Otra forma de trabajar, para arrancar en el mismo proceso Beeline y HiveServer2 para pruebas/desarrollo y tener una experiencia similar al cliente Hive accediendo de forma local, podemos ejecutar el siguiente comando donde indicamos tanto el usuario (-n) como la contraseña (-p):

beeline -u jdbc:hive2://iabd-virtualbox:10000 -n iabd -p iabd

Dentro de Beeline, en cualquier momento podemos ejecutar el comando help que nos mostrará todos los comandos disponibles. Si nos fijamos, además de las comandos del cliente hive, tenemos los comandos beeline que empiezan por el símbolo de exclamación !:

0: jdbc:hive2://iabd-virtualbox:10000> help
!addlocaldriverjar  Add driver jar file in the beeline client side.
!addlocaldrivername Add driver name that needs to be supported in the beeline
                    client side.
!all                Execute the specified SQL against all the current connections
!autocommit         Set autocommit mode on or off
!batch              Start or execute a batch of statements
...

Mediante la interfaz gráfica de Hive Server UI a la cual podemos acceder mediante http://localhost:10002 podemos monitorizar los procesos ejecutados por HiveServer2:

Monitorización mediante Hive Server UI
Monitorización mediante Hive Server UI

Caso 1: Creación y borrado de tablas

Para este caso de uso, vamos a utilizar la base de datos retail_db que ya utilizamos en las actividades de la sesión anterior.

Para empezar, vamos a cargar en HDFS los datos de los clientes que contiene la tabla customer. Mediante Sqoop, ejecutamos el siguiente comando:

sqoop import --connect "jdbc:mysql://localhost/retail_db" \
  --username iabd --password iabd \
  --table customers --target-dir /user/iabd/hive/customer \
  --fields-terminated-by '|' --delete-target-dir \
  --columns "customer_id,customer_fname,customer_lname,customer_city"

Si comprobamos el contenido en HDFS, veremos cómo se han cargado los datos:

hdfs dfs -head /user/iabd/hive/customer/part-m-00000
# 1|Richard|Hernandez|Brownsville
# 2|Mary|Barrett|Littleton
# 3|Ann|Smith|Caguas
# 4|Mary|Jones|San Marcos
...

A continuación, tras habernos conectado con el cliente hive o mediante beeline, creamos una base de datos llamada iabd:

create database if not exists iabd;

Nos conectamos a la base de datos que acabamos de crear:

use iabd;

default

Si olvidamos el comando use, se utilizará la base de datos default, la cual reside en /user/hive/warehouse como raíz en HDFS.

A continuación, vamos a crear una tabla que almacene el identificador, nombre, apellido y ciudad de los clientes (como puedes observar, la sintaxis es similar a SQL):

CREATE TABLE customers
(
  custId INT,
  fName STRING COMMENT "Nombre",
  lName STRING COMMENT "Apellido",
  city STRING
)
ROW FORMAT DELIMITED  -- (2)!
FIELDS TERMINATED BY '|'  -- (3)!
STORED AS TEXTFILE  -- (4)!
LOCATION '/user/iabd/hive/customer';  -- (5)!
  1. Mediante COMMENT le añadimos un comentario a cada campo
  2. Indica el formato de cada fila como delimitado (con un salto del línea)
  3. Los campos están separados por el carácter | (es el mismo que habíamos indicado en Sqoop)
  4. El contenido está almacenado en HDFS en formato texto
  5. Ruta de HDFS donde se encuentran los datos

Y ya podemos realizar algunas consultas:

select * from customers limit 5;
select count(*) from customers;

Utilizando Hue

Si no queremos conectarnos mediante Beeline, siempre podemos utilizar Hue como un entorno más amigable. Recuerda que para acceder desde la máquina virtual necesitas arrancarlo previamente mediante /opt/hue-4.10.0/build/env/bin/hue runserver.

Una vez dentro, podemos realizar las consultas u operaciones de creación de tablas, etc...

Consultas Hive desde Hue

En ocasiones necesitamos almacenar la salida de una consulta Hive en una nueva tabla. Las definiciones de las columnas de la nueva tabla se deriva de las columnas recuperadas en la consulta. Para ello, usaremos el comando create table-as select:

CREATE TABLE customers_new as SELECT * from customers;

En el caso de que la consulta falle por algún motivo, la tabla no se crearía.

Otra posibilidad es crear una tabla con la misma estructura que otra ya existente (pero sin datos):

CREATE TABLE customers2 LIKE customers;

En cualquier momento podemos obtener información de la tabla:

describe customers_new;
describe formatted customers_new;

Si empleamos la forma larga, obtendremos mucha más información. Por ejemplo, si nos fijamos, vemos que la localización de la nueva tabla ya no es /user/iabd/hive/customer sino hdfs://iabd-virtualbox:9000/user/hive/warehouse/iabd.db/customers_new. Esto se debe a que en vez de crear una tabla enlazada a un recurso de HDFS ya existente, ha creado una copia de los datos en el propio almacén de Hive (hemos pasado de una tabla externa a una interna).

Igual que las creamos, las podemos eliminar:

drop table customers_new;
drop table customers2;

Si ejecutamos el comando !tables (o show tables en el cliente hive) veremos que ya no aparecen dichas tablas.

En el caso de que queramos eliminar una base de datos, de la misma manera que en SQL, ejecutaríamos el comando drop database iabd. Si nuestra base de datos contiene tablas y las queremos borrar igualmente, entonces necesitamos un borrado en cascada mediante drop database iabd cascade.

Caso 2: Insertando datos

Para insertar datos en las tablas de Hive podemos hacerlo de varias formas:

  • Cargando los datos mediante sentencias LOAD DATA.
  • Insertando los datos mediante sentencias INSERT.
  • Cargando los datos directamente mediante Sqoop o alguna herramienta similar.

Cargando datos

Para cargar datos se utiliza la sentencia LOAD DATA. Si quisiéramos volver a cargar los datos desde HDFS utilizaremos:

Datos eliminados

Al cargar los datos mediante LOAD DATA desde HDFS, los datos de origen se eliminan.

LOAD DATA INPATH '/user/iabd/hive/customer'
  overwrite into table customers;

Si en cambio vamos a cargar los datos desde un archivo local a nuestro sistema de archivos añadiremos la keyword LOCAL:

LOAD DATA LOCAL INPATH '/home/iabd/datos.csv'
  overwrite into table customers;

Insertando datos

Aunque podemos insertar datos de forma atómica (es decir, registro a registro mediante INSERT INTO [TABLE] ... VALUES), realmente las inserciones que se realizan en Hive se hacen a partir de los datos de otras tablas mediante el comando insert-select a modo de ETL:

INSERT INTO destino
  SELECT col1, col2 FROM fuente;
INSERT OVERWRITE destino
  SELECT col1, col2 FROM fuente;

Mediante la opción OVERWRITE, en cada ejecución se vacía la tabla y se vuelve a rellenar. Si no lo indicamos o utilizamos INTO, los datos se añadirían a los ya existentes.

Por ejemplo, vamos a crear una nueva tabla de clientes, pero la vamos a almacenar en formato Parquet:

CREATE TABLE customersp
(
  custId INT,
  fName STRING,
  lName STRING,
  city STRING
)
STORED AS PARQUET;

Y a continuación la cargamos con los datos de la tabla customers:

INSERT INTO customersp SELECT * FROM customers;

Rendimiento

Prueba a realizar una consulta que cuente la cantidad de clientes en cada una de las tablas ¿Cuál tarda menos? ¿Por qué?

Si necesitamos insertar datos en múltiples tablas a la vez lo haremos mediante el comando from-insert, ya que ofrece un mejor rendimiento al sólo necesitar un escaneado de los datos:

FROM fuente
INSERT OVERWRITE TABLE destino1
  SELECT col1, col2
INSERT OVERWRITE TABLE destino2
  SELECT col1, col3

Por ejemplo, vamos a crear un par de tablas con la misma estructura de clientes, pero para almacenar los clientes de determinadas ciudades:

CREATE TABLE customers_brooklyn LIKE customers;
CREATE TABLE customers_caguas LIKE customers;

Y a continuación rellenamos ambas tablas con sus clientes;

FROM customers
INSERT OVERWRITE TABLE customers_brooklyn
  SELECT custId, fName, lName, city WHERE city = "Brooklyn"
INSERT OVERWRITE TABLE customers_caguas
  SELECT custId, fName, lName, city WHERE city = "Caguas";

Modificando datos

Acabamos de añadir datos, y en teoría podemos realizar operaciones UPDATE y DELETE sobre las filas de una tabla.

HDFS no se diseñó pensando en las modificaciones de archivos, con lo que los cambios resultantes de las inserciones, modificaciones y borrados se almacenan en archivos delta. Por cada transacción, se crea un conjunto de archivo delta que altera la tabla (o partición). Los ficheros delta se fusionan periódicamente con los ficheros base de las tablas mediante jobs MapReduce que el metastore ejecuta en background.

Para poder modificar o borrar los datos, Hive necesita trabajar en un contexto transaccional, por lo que necesitamos activar las siguientes variables:

set hive.support.concurrency=true;
set hive.enforce.bucketing=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

Una vez configurado, hemos de crear las tablas con el formato ORC, organizar los datos mediante buckets (los estudiaremos más adelante en esta la siguiente sesión) y finalmente indicar mediante TBLPROPERTIES que se trata de una tabla transaccional . Por ejemplo, volvemos a crear la tabla de clientes:

CREATE TABLE customerstx
(
  custId INT,
  fName STRING,
  lName STRING,
  city STRING
)
CLUSTERED BY (custId) INTO 4 BUCKETS
STORED AS ORC
TBLPROPERTIES ('transactional' = 'true');

Una vez creada la tabla, la vamos a cargar con los datos de los clientes:

INSERT INTO customerstx
  SELECT * FROM customers;

Un par de minutos después, con los datos datos ya cargados, ya podemos cambiar el nombre de la ciudad Caguas por Elche:

UPDATE customerstx SET city="Elche" WHERE city="Caguas";

Ingestando datos

Tal como vimos en la sesión anterior, podemos ingestar los datos en Hive haciendo uso de Sqoop (también lo podemos hacer con Flume o Nifi).

Por ejemplo, vamos a ingestar los datos de la tabla orders de la base de datos MariaDB que tenemos instalada en nuestra máquina virtual. En el comando de Sqoop le indicamos que dentro de Hive lo ingeste en la base de datos iabd y que cree una tabla llamada orders:

sqoop import --connect jdbc:mysql://localhost/retail_db \
    --username=iabd --password=iabd \
    --table=orders --driver=com.mysql.jdbc.Driver \
    --hive-import --hive-database iabd \
    --create-hive-table --hive-table orders

Una vez realizada la ingesta, podemos comprobar que los datos están dentro de Hive (en una tabla interna/gestionada):

hdfs dfs -ls /user/hive/warehouse/iabd.db/orders

Y si entramos a hive, podemos consultar sus datos:

select * from orders limit 10;

Extrayendo datos insertados

Combinando los comandos de HQL y HDFS podemos extraer datos a ficheros locales o remotos:

# Añadiendo contenido local
hive -e "use iabd; select * from customers" >> prueba1
# Sobrescribiendo contenido local
hive -e "use iabd; select * from customers" > prueba2
# Añadiendo contenido HDFS
hive -e "use iabd; select * from customers" | hdfs dfs --appendToFile /tmp/prueba3
# Sobrescribiendo contenido
hive -e "use iabd; select * from customers" | hdfs dfs --put -f /tmp/prueba4

Si indicamos la propiedad set hive.cli.print.header=true antes de la consulta, también nos mostrará el encabezado de las columnas. Esto puede ser útil si queremos generar un csv con el resultado de una consulta:

hive -e 'use iabd; set hive.cli.print.header=true; select * from customers' | \
  sed 's/[\t]/,/g' > fichero.csv

¿Y usar INSERT LOCAL?

Mediante INSERT LOCAL DIRECTORY podemos escribir el resultado de una consulta en nuestro sistema de archivos, fuera de HDFS. El problema es que si hay muchos datos creará múltiples ficheros y necesitaremos concatenarlos para tener un único resultado:

insert overwrite local directory '/home/iabd/datos'
  row format delimited
  fields terminated by ','
  select * from customers;

Caso 3: Consultas con join

En este caso de uso vamos a trabajar con los datos de clientes y pedidos que hemos cargado en los dos casos anteriores, tanto en customers como en orders.

Si queremos relacionar los datos de ambas tablas, tenemos que hacer un join entre la clave ajena de orders (order_customer_id) y la clave primaria de customers (custid):

hive> describe customers;
OK
custid                  int                                         
fname                   string                                      
lname                   string                                      
city                    string                                      
Time taken: 0.426 seconds, Fetched: 4 row(s)
hive> describe orders;
OK
order_id                int                                         
order_date              string                                      
order_customer_id       int                                         
order_status            string                                      
Time taken: 0.276 seconds, Fetched: 4 row(s)

Para ello, para obtener la ciudad de cada pedido podemos ejecutar la consulta:

select o.order_id, o.order_date, c.city
from orders o join customers c
  on (o.order_customer_id=c.custid);

Outer join

De la misma manera que en cualquier SGBD, podemos realizar un outer join, tanto left como right o full.

Por ejemplo, vamos a obtener para cada cliente, cuantos pedidos ha realizado:

select c.custid, count(order_id)
from customers c join orders o
  on (c.custid=o.order_customer_id)
group by c.custid
order by count(order_id) desc;

Si queremos que salgan todos los clientes, independientemente de que tengan pedidos, deberemos realizar un left outer join:

select c.custid, count(order_id)
from customers c left outer join orders o
  on (c.custid=o.order_customer_id)
group by c.custid
order by count(order_id) desc;

Semi-joins

Si quisiéramos obtener las ciudades de los clientes que han realizado pedidos podríamos realizar la siguiente consulta:

select distinct c.city from customers c
  where c.custid in (
    select order_customer_id
    from orders);

Mediante un semi-join podemos obtener el mismo resultado:

select distinct city
from customers c
left semi join orders o on
  (c.custid=o.order_customer_id)

Hay que tener en cuenta la restricción que las columnas de la tabla de la derecha sólo pueden aparecer en la clausula on, nunca en la expresión select.

Map joins

Consideramos la consulta inicial de join:

select o.order_id, o.order_date, c.city
from orders o join customers c
  on (o.order_customer_id=c.custid);

Si una tabla es suficientemente pequeña para caber en memoria, tal como nos ocurre con nuestros datos, Hive puede cargarla en memoria para realizar el join en cada uno de los mappers. Esto se conoce como un map join.

El job que ejecuta la consulta no tiene reducers, con lo que esta consulta no funcionará para un right o right outer join, ya que la ausencias de coincidencias sólo se puede detectar en los pasos de agregación (reduce).

En el caso de utilizar map joins con tablas organizadas en buckets, la sintaxis es la misma, sólo siendo necesario activarlo mediante la propiedad hive.optimize.bucketmapjoin:

SET hive.optimize.bucketmapjoin=true;

Comandos

Mediante los casos de uso realizados hasta ahora, hemos podido observar cómo para interactuar con Hive se utilizan comandos similares a SQL.

Es conveniente consultar la siguiente cheatsheet: http://hortonworks.com/wp-content/uploads/2016/05/Hortonworks.CheatSheet.SQLtoHive.pdf

Además Hive viene con un conjunto de funciones predefinidas para tratamiento de cadenas, fechas, funciones estadísticas, condicionales, etc... las cuales puedes consultar en la documentación oficial.

Mediante el comando show functions podemos obtener una lista de las funciones. Si queremos más información sobre una determinada función utilizaremos el comando describe function nombreFuncion:

hive> describe function length;
length(str | binary) - Returns the length of str or number of bytes in binary data

Caso 4: Tabla interna

Hive permite crear tablas de dos tipos:

  • tabla interna o gestionada (managed): gestiona la estructura y el almacenamiento de los datos. Para ello, si no le indicamos lo contrario, crea los datos en HDFS dentro de una carpeta propia, normalmente /user/hive/warehouse. Al borrar la tabla de Hive, se borra la información de HDFS.
  • tabla externa: Hive define la estructura de los datos en el metastore, pero los datos residen HDFS, ya sea que estaban previamente o los vamos a cargar a posteriori. Al borrar la tabla de Hive, no se eliminan los datos de HDFS. Se emplea cuando compartimos datos almacenados en HDFS entre diferentes herramientas.

Ejemplos realizados

Todos los ejemplos realizados hasta ahora, aunque hayamos indicado una ruta externa HDFS mediante la keyword LOCATION son tablas internas. Por tanto, si eliminamos dichas tablas mediante instrucciones DROP TABLE, también eliminaremos los datos en HDFS.

En este caso de uso, vamos a centrarnos en una tabla interna.

Supongamos el siguiente fichero con datos de empleados:

empleados.txt
Michael|Montreal,Toronto|Male,30|DB:80|Product:DeveloperLead
Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead
Shelley|New York|Female,27|Python:80|Test:Lead,COE:Architect
Lucy|Vancouver|Female,57|Sales:89,HR:94|Sales:Lead

Podemos observar como se utiliza | como separador de campos. Analizando los datos, vemos que tenemos los siguientes campos:

  • Nombre
  • Centros de trabajo (array con las ciudades)
  • Sexo y edad
  • Destreza y puntuación
  • Departamento y cargo

Creamos la siguiente tabla interna en Hive mediante el siguiente comando:

CREATE TABLE IF NOT EXISTS empleados_interna
(
  name string,
  work_place ARRAY<string>,
  sex_age STRUCT<sex:string,age:int>,
  skills_score MAP<string,int>,
  depart_title MAP<string,ARRAY<string>>
) COMMENT 'Esto es una tabla interna'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':';

La sintaxis es muy similar a SQL, destacando las siguientes opciones:

  • ROW FORMAT DELIMITED: cada registro ocupa una línea
  • FIELDS TERMINATED BY '|': define el | como separador de campos
  • COLLECTION ITEMS TERMINATED BY ',': define la coma como separador de los arrays / estructuras
  • MAP KEYS TERMINATED BY ':': define los dos puntos como separador utilizado en los mapas.

Si queremos comprobar la estructura de la tabla mediante el comando show create table empleados_interna veremos las opciones que hemos indicado:

+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `empleados_interna`(                  |
|   `name` string,                                   |
|   `work_place` array<string>,                      |
|   `sex_age` struct<sex:string,age:int>,            |
|   `skills_score` map<string,int>,                  |
|   `depart_title` map<string,array<string>>)        |
| COMMENT 'Esto es una tabla interna'                |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'collection.delim'=',',                          |
|   'field.delim'='|',                               |
|   'mapkey.delim'=':',                              |
|   'serialization.format'='|')                      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://iabd-virtualbox:9000/user/hive/warehouse/iabd.db/empleados_interna' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transient_lastDdlTime'='1647432129')            |
+----------------------------------------------------+

Formato de las tablas

Si queremos indicarle el formato a las tablas, mediante STORED AS podemos indicar formatos:

  • basados en filas, como TEXTFILE, JSONFILE, AVRO o en formato de secuencia mediante SEQUENCEFILE.
  • basados en columnas, como PARQUET o ORC.

A continuación, vamos a cargar los datos del fichero empleados.txt, el cual colocaremos en nuestra carpeta de Descargas:

LOAD DATA LOCAL INPATH '/home/iabd/Descargas/empleados.txt' OVERWRITE INTO TABLE empleados_interna;

Comprobamos que los datos se han cargado correctamente:

select * from empleados_interna;

Y obtenemos:

+-------------------------+-------------------------------+----------------------------+---------------------------------+----------------------------------------+
| empleados_interna.name  | empleados_interna.work_place  | empleados_interna.sex_age  | empleados_interna.skills_score  |     empleados_interna.depart_title     |
+-------------------------+-------------------------------+----------------------------+---------------------------------+----------------------------------------+
| Michael                 | ["Montreal","Toronto"]        | {"sex":"Male","age":30}    | {"DB":80}                       | {"Product":["Developer","Lead"]}       |
| Will                    | ["Montreal"]                  | {"sex":"Male","age":35}    | {"Perl":85}                     | {"Product":["Lead"],"Test":["Lead"]}   |
| Shelley                 | ["New York"]                  | {"sex":"Female","age":27}  | {"Python":80}                   | {"Test":["Lead"],"COE":["Architect"]}  |
| Lucy                    | ["Vancouver"]                 | {"sex":"Female","age":57}  | {"Sales":89,"HR":94}            | {"Sales":["Lead"]}                     |
+-------------------------+-------------------------------+----------------------------+---------------------------------+----------------------------------------+

Y si nos abrimos otra pestaña, mediante HDFS, comprobamos que tenemos los datos:

hdfs dfs -ls /user/hive/warehouse/curso.db/empleados_interna
hdfs dfs -cat /user/hive/warehouse/curso.db/empleados_interna/empleados.txt

Y obtenemos:

Michael|Montreal,Toronto|Male,30|DB:80|Product:DeveloperLead
Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead
Shelley|New York|Female,27|Python:80|Test:Lead,COE:Architect
Lucy|Vancouver|Female,57|Sales:89,HR:94|Sales:Lead

Consultando datos compuestos

Si nos fijamos bien en la tabla, tenemos tres columnas con diferentes datos compuestos.

  • work_place, con un ARRAY<string>
  • sex_age con una STRUCT<sex:string,age:int>
  • skills_score con un MAP<string,int>,
  • depart_title con un MAP<STRING,ARRAY<STRING>>

Si queremos obtener los datos del array, podemos realizar las siguientes consultas:

Todos los lugares de trabajo:

select name, work_place from empleados_interna;

Resultado:

+----------+-------------------------+
|   name   |       work_place        |
+----------+-------------------------+
| Michael  | ["Montreal","Toronto"]  |
| Will     | ["Montreal"]            |
| Shelley  | ["New York"]            |
| Lucy     | ["Vancouver"]           |
+----------+-------------------------+

Utilizamos la notación de array para mostrar los dos primeros puestos de trabajo:

select work_place[0] as lugar1, work_place[1] as lugar2
from empleados_interna;

Resultado:

+------------+----------+
|   lugar1   |  lugar2  |
+------------+----------+
| Montreal   | Toronto  |
| Montreal   | NULL     |
| New York   | NULL     |
| Vancouver  | NULL     |
+------------+----------+

Mediante la función size obtenemos la cantidad de lugares de trabajo:

select size(work_place) as cantLugares
from empleados_interna;

Resultado:

+--------------+
| cantlugares  |
+--------------+
| 2            |
| 1            |
| 1            |
| 1            |
+--------------+

Para obtener los empleados y lugares de trabajo mostrados uno por fila, hemos de realizar un explode sobre el array:

select name, lugar
from empleados_interna
lateral view explode (work_place) e2 as lugar;

Para ello hemos creado una vista lateral y con la función explode desenrollamos el array. Resultado:

+----------+------------+
|   name   |   lugar    |
+----------+------------+
| Michael  | Montreal   |
| Michael  | Toronto    |
| Will     | Montreal   |
| Shelley  | New York   |
| Lucy     | Vancouver  |
+----------+------------+

En el caso de la estructura con el sexo y la edad podemos realizar las siguientes consultas

Todas las estructuras de sexo/edad:

select sex_age from empleados_interna;

Resultado:

+----------------------------+
|          sex_age           |
+----------------------------+
| {"sex":"Male","age":30}    |
| {"sex":"Male","age":35}    |
| {"sex":"Female","age":27}  |
| {"sex":"Female","age":57}  |
+----------------------------+

Utilizando la notación . podemos obtener el sexo y edad por separado

select sex_age.sex as sexo, sex_age.age as edad
from empleados_interna;

Resultado:

+---------+-------+
|  sexo   | edad  |
+---------+-------+
| Male    | 30    |
| Male    | 35    |
| Female  | 27    |
| Female  | 57    |
+---------+-------+

Respecto al mapa con las habilidades y sus puntuaciones:

Todas las habilidades como un mapa:

select skills_score from empleados_interna;

Resultado:

+-----------------------+
|     skills_score      |
+-----------------------+
| {"DB":80}             |
| {"Perl":85}           |
| {"Python":80}         |
| {"Sales":89,"HR":94}  |
+-----------------------+

Utilizando la notación array con claves, obtenemos el nombre y puntuación de las habilidades:

select name, skills_score["DB"] as db,
  skills_score["Perl"] as perl,
  skills_score["Python"] as python,
  skills_score["Sales"] as ventas,
  skills_score["HR"] as hr
from empleados_interna;

Resultado:

+----------+-------+-------+---------+---------+-------+
|   name   |  db   | perl  | python  | ventas  |  hr   |
+----------+-------+-------+---------+---------+-------+
| Michael  | 80    | NULL  | NULL    | NULL    | NULL  |
| Will     | NULL  | 85    | NULL    | NULL    | NULL  |
| Shelley  | NULL  | NULL  | 80      | NULL    | NULL  |
| Lucy     | NULL  | NULL  | NULL    | 89      | 94    |
+----------+-------+-------+---------+---------+-------+

Claves y valores de las habilidades:

select name, map_keys(skills_score) as claves,
  map_values(skills_score) as valores
from empleados_interna;

Resultado:

+----------+-----------------+----------+
|   name   |     claves      | valores  |
+----------+-----------------+----------+
| Michael  | ["DB"]          | [80]     |
| Will     | ["Perl"]        | [85]     |
| Shelley  | ["Python"]      | [80]     |
| Lucy     | ["Sales","HR"]  | [89,94]  |
+----------+-----------------+----------+

Y finalmente, con el mapa de departamentos que contiene un array:

Toda la información sobre los departamentos:

select depart_title from empleados_interna;

Resultado:

+----------------------------------------+
|              depart_title              |
+----------------------------------------+
| {"Product":["Developer","Lead"]}       |
| {"Product":["Lead"],"Test":["Lead"]}   |
| {"Test":["Lead"],"COE":["Architect"]}  |
| {"Sales":["Lead"]}                     |
+----------------------------------------+

Nombre y puntuación de las habilidades:

select name, depart_title["Product"] as product,
  depart_title["Test"] as test,
  depart_title["COE"] as coe,
  depart_title["Sales"] as sales
from empleados_interna;

Resultado:

+----------+-----------------------+-----------+----------------+-----------+
|   name   |        product        |   test    |      coe       |   sales   |
+----------+-----------------------+-----------+----------------+-----------+
| Michael  | ["Developer","Lead"]  | NULL      | NULL           | NULL      |
| Will     | ["Lead"]              | ["Lead"]  | NULL           | NULL      |
| Shelley  | NULL                  | ["Lead"]  | ["Architect"]  | NULL      |
| Lucy     | NULL                  | NULL      | NULL           | ["Lead"]  |
+----------+-----------------------+-----------+----------------+-----------+

Primera habilidad de producto y pruebas de cada empleado:

select name, depart_title["Product"][0] as product0,
  depart_title["Test"][0] as test0
from empleados_interna;

Resultado:

+----------+-------------+--------+
|   name   |  product0   | test0  |
+----------+-------------+--------+
| Michael  | Developer   | NULL   |
| Will     | Lead        | Lead   |
| Shelley  | NULL        | Lead   |
| Lucy     | NULL        | NULL   |
+----------+-------------+--------+

Caso 5: Tabla externa

En este caso de uso vamos a repetir la misma estructura de la tabla del caso anterior, pero en esta ocasión en una tabla externa. De esta manera, al borrar la tabla de Hive, no se borra la información de HDFS.

Para ello, únicamente hemos de añadir la palabra EXTERNAL a la instrucción CREATE TABLE y la clausula LOCATION para indicar la ruta de HDFS donde se encuentran los datos:

CREATE EXTERNAL TABLE IF NOT EXISTS empleados_externa
(
  name string,
  work_place ARRAY<string>,
  sex_age STRUCT<sex:string,age:int>,
  skills_score MAP<string,int>,
  depart_title MAP<STRING,ARRAY<STRING>>
) COMMENT "Esto es una tabla externa"
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LOCATION "/user/iabd/hive/empleados_externa";

Realizamos la misma carga que en el caso anterior:

LOAD DATA LOCAL INPATH '/home/iabd/Descargas/empleados.txt' OVERWRITE INTO TABLE empleados_externa;

Si hacemos una consulta sobre la tabla para ver que están todos los campos obtendremos la misma información que antes:

SELECT * FROM empleados_externa;

Interna o externa

Como normal general, si todo nuestro procesamiento lo hacemos mediante Hive, es más cómodo utilizar tablas internas. Si no es así, y otras herramientas acceden al mismo dataset, es mejor utilizar tablas externas.

Un patrón de uso muy común es utilizar una tabla externa para acceder al dataset inicial almacenado en HDFS (creado por otro proceso), y posteriormente crear una transformación en Hive para mover los datos a una tabla interna.

Referencias

Actividades

(RASBD.3 / CESBD.3b y CESBD.3d) A partir de los archivos trabajados en la sesión sobre Pentaho, vamos a crear diversas tablas y consultas en Hive. Para las siguientes actividades, se pide adjuntar tanto los scripts como comandos empleados, así como una captura de pantalla donde se vea la ejecución de las diferentes instrucciones.

  1. Carga los archivos pdi_sales, pdi_product y pdi_manufacturer en HDFS en las rutas /user/iabd/pdi/sales, /user/iabd/pdi/product y /user/iabd/pdi/manufacturer respectivamente.
  2. Crea la base de datos hiveiabd sobre la cual realizaremos los siguientes pasos.
  3. (1p) Crea una tabla externa para cada uno de los archivos y llámalas sales, product y manufacturer respectivamente con formato de TEXTFILE teniendo en cuenta los separadores empleados en cada archivo (unos utilizan el punto y coma ; y otros la coma ,), la ruta de HDFS que hemos creado anteriormente y los tipos de datos necesarios (por simplicidad, el campo con la fecha de la venta gestiónala como STRING y renombra el campo, ya que Date es una palabra clave y no puede ser el nombre de una columna). Finalmente, realiza una consulta sobre cada tabla para comprobar que contiene los datos esperados.

    Para que no coja la primera línea que contiene el nombre de las columnas, añade la propiedad skip.header.line.count en la creación de cada tabla;

    CREATE ...
    ...
    TBLPROPERTIES ("skip.header.line.count"="1");
    
  4. (1.5p) Crea una tabla transaccional llamada productx en formato ORC con la misma estructura que la tabla de product.

    1. Cárgala con los datos de la tabla product y comprueba cuantos registros tiene.
    2. Elimina todos los registros de la tabla productx cuya categoría sea Mix y comprueba cuanto registros tiene ahora.
    3. Recupera todos los registros de la tabla productx de la categoría Urban y guarda el resultado en HDFS en la carpeta /user/iabd/pdi/productx utilizando la coma como separador de campos.
    4. ¿En qué ruta de HDFS se almacenan los datos de las tablas internas? ¿Y en concreto de la tabla productx? ¿Cómo lo has averiguado?
  5. (0.5p) Crea una tabla interna llamada productos con formato Parquet que contengan todas las columnas de las tablas product y manufacturer (sin repetidos) y rellénala con el resultado de realizar un join entre las tablas product y manufacturer.
  6. (1p) Crea una tabla externa llamada productext con la misma estructura que la tabla product pero cuyos datos referencien /user/iabd/pdi/productext.
    1. Ejecuta una consulta y comprueba que la tabla está vacía.
    2. Copia el archivo de /user/iabd/pdi/productx a /user/iabd/pdi/productext y vuelve a realizar la consulta.
    3. Elimina la tabla productext y comprueba si los datos siguen estando en HDFS en /user/iabd/pdi/productext.