Delta Lake
En la sesión de Data Lakes ya estudiamos qué ofrece un data lake y sus diferencias respecto a los data warehouses, así como una pequeña introducción al concepto de data lakehouse.
Delta Lake es un producto que ofrece transaccionalidad y soporte para upserts y merges en data lakes manteniendo una escalabilidad horizontal y ofreciendo la separación del almacenamiento y la computación que necesita el big data. Así pues, podemos considerar a Delta Lake como una implementación del concepto de data lakehouse, el cual combina lo mejor de los data warehouses y data lakes, ofreciendo transacciones ACID, gestión escalable de los metadatos, un modelo unificado para procesar datos tanto por lotes como en streaming, histórico auditable y soporte para sentencias DML sobre los datos.
Podemos ejecutarlo sobre data lakes ya existentes y es completamente compatible con varios motores de procesamiento como es el caso de Apache Spark, y de ahí, el motivo de estudiarlo en esta sesión.
Data Lakehouse por dentro
El almacenamiento de un data lakehouse se cede a servicios de almacenamiento de objetos, los cuales son muy económicos, como son Amazon S3 o Azure ADLs, almacenando los datos en formatos abiertos como Apache Parquet.
Sin embargo, para ser un data lakehouse, necesitamos soporte para transacciones ACID. Para ello, debemos tener una capa de metadatos transaccionales sobre el almacenamiento cloud, que defina qué objetos forman parte de qué versión de tabla.
Para conseguir un gran rendimiento en las consultas SQL es necesario ofrecer servicios de caché, estructuras de datos auxiliares como índices y estadísticas para poder optimizar la capa de datos.
La herramienta final es el desarrollo de un API estándar, como es la DataFrame API, la cual soportan herramientas como TensorFlow o Spark MLlib, la cual permite, de forma declarativa, la construcción de un grafo DAG con su ejecución.
Otros productos alternativos como implementación del concepto de data lakehouse son Apache Iceberg y Apache Hudi.
Formalmente, podemos decir que Delta Lake ofrece una capa de metadatos, caché e indexación sobre el almacenamiento de un data lake, de manera que ofrece un nivel de abstracción con soporte para transacciones ACID y versionado de los datos.
Se trata de un proyecto open-source desde que en 2019 Databricks lo liberó. Por supuesto, Databricks ofrece soporte completo de Delta Lake como capa de persistencia de datos.
Características¶
Delta Lake ofrece las siguientes características:
- Transacciones ACID. Todas las transacciones realizadas con Spark se realizan de manera durable y se exponen a otros consumidores de forma atómica, gracias al Delta Transaction Log.
- Soporte completo de DML, pudiendo realizar borrados y modificados, pero también fusiones complejas de datos o escenarios upserts, lo que simplifica la creación de dimensiones y tablas de hechos al construir un MDW (modern data warehouse), así como cumplir la GDPR respecto a la modificación y/o borrado de datos.
- Time travel. El fichero de log de transacciones de Delta Lake guarda cada cambio realizado sobre los datos en el orden en el que se han realizado. Este log se convierte en un herramienta de auditoria completa, lo que facilita que administradores y desarrolladores puedan revertir a una versión anterior de los datos, y asea para auditorías, rollbacks o la realización de pruebas. Esta característica se conoce como Time Travel.
- Unificación del procesamiento batch y streaming en un único modelo, ya que puede realizar merges de los flujos de datos (requisito muy común al trabajar con IoT).
- Evolución y aplicación de esquemas, al provocar el cumplimiento de un esquema a la hora de leer o escribir datos desde el lago, permitiendo una evolución segura del esquema para casos de uso donde los datos necesitan evolucionar.
- Soporte de metadatos ricos y escalables, ya que los metadatos pueden crecer y convertirse en big data y no escalar correctamente, de manera que Delta Lake facilita el escalado y procesamiento eficiente mediante Spark pudiendo manejar petabytes de datos.
Arquitectura Medallion¶
La arquitectura Medallion es un patrón de diseño de datos que se utiliza para organizar los datos en un lakehouse, con el objetivo de mejorar progresivamente la estructura y calidad de los datos conforme fluyen a través de las diferentes capas de la arquitectura (de la capa raw/bronce a la plata, y de ahí a la oro.)
Conforme los datos transicionan de la capa bronce y plata a la de oro (cuanto más evolucionan, más valen, y de ahí su material precioso) obtenemos datos más precisos. Cuando realizamos la ingesta de datos mediante procesos batch o en streaming los almacenamos en la capa de bronce en su formato crudo (raw). Tras limpiarlos, normalizarlos y realizar el procesado necesario para realizar nuestras consultas, los volvemos a almacenar en la capa de plata (curated). Finalmente, en la capa de oro almacenamos los datos agregados, con las tablas de sumario que contienen los KPI o las tablas necesarias para la visualización de los datos por parte de las herramientas de negocio como PowerBI o Tableau.
Para este flujo de datos entre capas, Databricks ofrece las tablas Delta Live y el uso de pipelines (esta opción no está habilitada en la versión educativa y no la vamos a poder probar). Tenéis un ejemplo completo en Getting Started with Delta Live Tables.
Arquitectura de un Lakehouse¶
El uso de la arquitectura que propone Delta Lake permite el procesamiento simultáneos de los datos batch y en streaming, de manera que podemos tener escribir los datos batch y los flujos en streaming en la misma tabla, y a su vez, se escriben de manera progresiva en otras tablas más limpias y refinadas.
La arquitectura de un lakehouse se compone de tres capas, y en nuestro caso, se concreta en:
- la capa de almacenamiento, por ejemplo, sobre S3.
- la capa transaccional, que la implementa Delta Lake.
- la capa de procesamiento, que la aporta Spark.
El ecosistema Delta¶
Delta Lake se utiliza en su mayor medida como lakehouse por más de 7000 empresas, procesando exabytes de datos por día.
Sin embargo, los data warehouses y las aplicaciones de machine learning no son el único objetivo de Delta Lake, ya que el soporte transaccional ACID aporta confiabilidad al data lake y permite ingestar y consumir datos tanto en streaming como por lotes.
El ecosistema de DeltaLake se compone de una conjunto de componentes individuales entre los que destacan Delta Lake Storage, Delta Sharing, y Delta Connectors.
Delta Lake Storage¶
Se trata de una capa de almacenamiento que corre sobre los lagos de datos basados en el cloud, como son Azure Data Lake Storage (ADLS), AWS S3 o Google Cloud Storage (GCS), añadiendo transaccionalidad al lago de datos y las tablas, y por tanto, ofreciendo características de un data warehouse a un data lake.
Se trata del componente principal, ya que el resto de elementos del ecosistema dependen de él.
Delta Sharing¶
Todo lago de datos va a tener que compartir sus datos en algún momento, lo que requiere una solución segura y confiable que nos asegure la privacidad deseada en los datos.
Delta Sharing es un protocolo para compartir datos seguros para grandes conjuntos de datos almacenados en el data lake, de manera que podemos compartir los datos almacenados en S3 o ADLS y acceder mediante Spark o PowerBI sin necesidad de desplegar ningún componente adicional, facilitando compartir los datos incluso entre diferentes proveedores cloud sin necesidad de ningún desarrollo.
Por ejemplo, podemos:
- Procesar en AWS mediante Spark datos que están almacenados en Azure ADLS.
- Procesar en Google mediante Rust datos que están almacenados en S3.
Más información sobre el ecosistema de Delta Sharing en la página de Sharing de Delta Lake.
Delta Connectors¶
El principal objetivo de los conectores Delta es llevar Delta Lake a otros motores de procesamiento ajenos a Spark. Para ello, ofrece conectores open source que realizan la conexión directa a DeltaLake sin necesidad de pasar por Spark.
Los conectores más destacados son:
- Delta Standalone: librerías Java/Python/Rust/etc... para desarrollar aplicaciones que leen y escriben en Delta Lake. En el apartado de Delta Lake y Python veremos un ejemplo del acceso mediante Python.
- Hive. Lectura de datos desde Apache Hive.
- Flink. Lectura y escritura de datos desde Apache Flink.
- Sql-delta-import. Permite importar datos desde cualquier fuente de datos JDBC.
- Power BI. Función de Power Query que permite la lectura de una tabla Delta desde cualquier fuente de datos soportado por Power BI.
Más información sobre los conectores existentes, donde cada día hay más, en la página de Integrations de Delta Lake.
Hola Delta¶
Probando Delta Lake
Para poder realizar los ejemplos y practicar DeltaLake, en esta sesión nos vamos a centrar en la instalación de nuestra máquina virtual o mediante la cuenta creada previamente en DataBricks.
Si quieres probar con Docker, puedes descargar la imagen deltaio/delta-docker
desde https://hub.docker.com/r/deltaio/delta-docker o arrancar el contenedor mediante:
docker run --name deltalake -p 8888-8889:8888-8889 deltaio/delta-docker:latest
docker run --name deltalake -p 8888-8889:8888-8889 deltaio/delta-docker:latest_arm64
Si nos centramos en nuestra instalación de la máquina virtual, cuando lanzamos pyspark
tenemos que indicarle que vamos a utilizar delta
mediante la opción packages
:
pyspark --packages io.delta:delta-core_2.12:2.1.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Para facilitar su uso, en nuestro máquina virtual hemos creado el alias pysparkdl
:
pysparkdl
Persistiendo en Delta Lake¶
Si partimos de los datos que creamos en la sesión anterior en el DataFrame de clientes, podemos persistirlos en Delta indicando su formato mediante format("delta")
:
spark.sql("use s8a")
df = spark.table("clientes")
# DeltaLake en Local
df.write.format("delta").save("/tmp/raw/clientes")
# DeltaLake en DataBricks
df.write.format("delta").save("/delta/raw/clientes")
# DeltaLake en HDFS
df.write.format("delta").save("hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
# Guardando una tabla
df.write.format("delta").saveAsTable("clientes_delta")
# Guardando una tabla
df.write.format("delta").save("/delta/raw/clientes_local").saveAsTable("clientes_delta_local")
¿Dónde se almacenan las tablas que no indicamos la ruta?
Dependerá de la ruta del metastore que utilice Spark, la cual configuramos en en la sesión anterior en la propiedad spark.sql.warehouse.dir
. Por ejemplo, en nuestra máquina virtual podemos poner /opt/spark-3.3.1/warehouse/
para almacenamiento local o hdfs://iabd-virtualbox:9000/user/hive/warehouse/
si queremos almacenar los metadatos en HDFS.
Si intentamos volver a escribir los datos en la misma ruta, obtendremos un error. Si queremos sobrescribir los datos, necesitamos indicarle el modo overwrite
:
df.write.format("delta").mode("overwrite").save("/tmp/raw/clientes")
Particionando los datos
Para particionar los datos utilizaremos partitionBy
para indicar el campo que hace de clave:
df.write.format("delta").mode("overwrite").partitionBy("customer_city").save("/tmp/raw/clientes-particionados")
De manera que creará una carpeta para cada ciudad:
ls -l /tmp/raw/clientes-particionados/
total 2252
drwxr-xr-x 2 iabd iabd 4096 feb 19 13:11 'customer_city=Aguadilla'
drwxr-xr-x 2 iabd iabd 4096 feb 19 13:11 'customer_city=Alameda'
drwxr-xr-x 2 iabd iabd 4096 feb 19 13:11 'customer_city=Albany'
drwxr-xr-x 2 iabd iabd 4096 feb 19 13:11 'customer_city=Albuquerque'
...
Cargando desde Delta Lake¶
Para recuperar los datos, realizamos una lectura indicando siempre el formato delta'
:
# DeltaLake el Local
dfdeltal = spark.read.format("delta").load("/tmp/raw/clientes")
dfdeltahdfs = spark.read.format("delta").load("hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
# Uso de tablas
tabla_delta = spark.read.format("delta").table("clientes_delta")
Delta Lake por dentro¶
Si accedemos al sistema de archivos local, HDFS o Databricks DBFS, podemos analizar la estructura de archivos que ha utilizado para almacenar la información.
Delta Lake almacena los datos en formato Parquet en la ruta indicada (y si hubiéramos indicando particiones, en sus subcarpetas), y luego crea una carpeta denominada _delta_log
donde almacena el DeltaLog o log transaccional en formato JSON, en el cual va registrando los cambios delta que se realizan sobre los datos.
Vamos a comprobar qué datos se han almacenado en HDFS:
iabd@iabd-virtualbox:~/datos$ hdfs dfs -ls -R /user/iabd/delta/raw/clientes
drwxr-xr-x - iabd supergroup 0 2023-01-29 12:23 /user/iabd/delta/raw/clientes/_delta_log
-rw-r--r-- 1 iabd supergroup 2605 2023-01-29 12:23 /user/iabd/delta/raw/clientes/_delta_log/00000000000000000000.json
-rw-r--r-- 3 iabd supergroup 251875 2023-01-29 12:23 /user/iabd/delta/raw/clientes/part-00000-05cb7b9c-c529-4f5e-83ab-0dc79d0422bf-c000.snappy.parquet
Si realizamos otra operación, por ejemplo, sobrescribimos la tabla, generará nuevos datos y otro fichero de log:
df.write.format("delta").mode("overwrite").save("hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
Lo comprobamos volviendo a listar los archivos almacenados:
iabd@iabd-virtualbox:~/datos$ hdfs dfs -ls -R /user/iabd/delta/raw/clientes
drwxr-xr-x - iabd supergroup 0 2023-01-29 12:29 /user/iabd/delta/raw/clientes/_delta_log
-rw-r--r-- 1 iabd supergroup 2605 2023-01-29 12:23 /user/iabd/delta/raw/clientes/_delta_log/00000000000000000000.json
-rw-r--r-- 1 iabd supergroup 1592 2023-01-29 12:29 /user/iabd/delta/raw/clientes/_delta_log/00000000000000000001.json
-rw-r--r-- 3 iabd supergroup 251875 2023-01-29 12:23 /user/iabd/delta/raw/clientes/part-00000-05cb7b9c-c529-4f5e-83ab-0dc79d0422bf-c000.snappy.parquet
-rw-r--r-- 3 iabd supergroup 251875 2023-01-29 12:29 /user/iabd/delta/raw/clientes/part-00000-1f226209-881f-4ff7-af04-6eedd64e1581-c000.snappy.parquet
Ahora vamos a añadir nuevos datos, utilizando un nuevo dataframe e indicando el modo de escritura con append
:
cols = ['customer_id', 'customer_fname', 'customer_lname']
datos = [
(88888, "Aitor", "Medrano"),
(99999, "Pedro", "Casas")
]
nuevosClientes = spark.createDataFrame(datos, cols)
# cambiamos el tipo a int pq por defecto le asigna long
nuevosClientes = nuevosClientes.withColumn("customer_id", nuevosClientes.customer_id.cast("int"))
nuevosClientes.write.format("delta").mode("append").save("hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
Fusionando el esquema
Si los tipos de los datos no cuadran con el esquema almacenado en DeltaLake, tendremos un error. Para habilitar que fusione los esquemas podemos indicarlo con .option("mergeSchema", "true")
:
nuevosClientes.write.format("delta").mode("append").option("mergeSchema", "true").save("hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
Ahora podemos ver como ha creado un nuevo archivo de log pero dos archivos de datos. En concreto, el archivo de log ha registrado las dos inserciones, y cada archivo de Parquet contiene únicamente cada uno de los registros:
iabd@iabd-virtualbox:~/datos$ hdfs dfs -ls -R /user/iabd/delta/raw/clientes
drwxr-xr-x - iabd supergroup 0 2023-01-29 12:39 /user/iabd/delta/raw/clientes/_delta_log
-rw-r--r-- 1 iabd supergroup 2605 2023-01-29 12:23 /user/iabd/delta/raw/clientes/_delta_log/00000000000000000000.json
-rw-r--r-- 1 iabd supergroup 1592 2023-01-29 12:29 /user/iabd/delta/raw/clientes/_delta_log/00000000000000000001.json
-rw-r--r-- 1 iabd supergroup 1309 2023-01-29 12:39 /user/iabd/delta/raw/clientes/_delta_log/00000000000000000002.json
-rw-r--r-- 3 iabd supergroup 251875 2023-01-29 12:23 /user/iabd/delta/raw/clientes/part-00000-05cb7b9c-c529-4f5e-83ab-0dc79d0422bf-c000.snappy.parquet
-rw-r--r-- 3 iabd supergroup 251875 2023-01-29 12:29 /user/iabd/delta/raw/clientes/part-00000-1f226209-881f-4ff7-af04-6eedd64e1581-c000.snappy.parquet
-rw-r--r-- 3 iabd supergroup 1035 2023-01-29 12:39 /user/iabd/delta/raw/clientes/part-00000-b7124036-ce26-4b95-a759-080e829b8de6-c000.snappy.parquet
-rw-r--r-- 3 iabd supergroup 1020 2023-01-29 12:39 /user/iabd/delta/raw/clientes/part-00001-6164f53a-ca34-4b73-96ef-8d2f2cfbbb47-c000.snappy.parquet
Por ejemplo, si descargamos y visualizamos uno de los archivos de Parquet veremos sus datos:
iabd@iabd-virtualbox:~/Descargas$ parquet-tools show part-00000-b7124036-ce26-4b95-a759-080e829b8de6-c000.snappy.parquet
+---------------+------------------+------------------+
| customer_id | customer_fname | customer_lname |
|---------------+------------------+------------------|
| 88888 | Aitor | Medrano |
+---------------+------------------+------------------+
Finalmente, si queremos comprobar los datos, a partir del dataframe que habíamos leído desde HDFS, podemos comprobar como los datos ya aparecen:
dfdeltahdfs.sort("customer_id", ascending=False).show(3)
# +-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
# |customer_id|customer_fname|customer_lname|customer_email|customer_password| customer_street|customer_city|customer_state|customer_zipcode|
# +-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
# | 99999| Pedro| Casas| null| null| null| null| null| null|
# | 88888| Aitor| Medrano| null| null| null| null| null| null|
# | 12435| Laura| Horton| XXXXXXXXX| XXXXXXXXX|5736 Honey Downs| Summerville| SC| 29483|
# +-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
# only showing top 3 rows
Optimizando las tablas
Tras realizar muchos cambios en una tabla, es muy probable que tengamos muchos archivos pequeños. Para mejorar la velocidad de las consultas de lectura, podemos utilizar OPTIMIZE
para unir los ficheros pequeños en más grandes:
OPTIMIZE clientes_delta
Trabajando con tablas Delta¶
Para trabajar con tablas Delta, aunque podemos realizar todas las operaciones mediante SQL, Delta Lake ofrece un API para realizar modificaciones condicionales, borrados o upserts de datos en las tablas.
Para ello, el primer paso obtener una tabla delta mediante el método DeltaTable.forPath
:
from delta.tables import *
dtabla = DeltaTable.forPath(spark, "hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
dtablaDatabricks = DeltaTable.forPath(spark, "dbfs:/user/hive/warehouse/clientes_delta")
Una vez tenemos la tabla, ya podemos, por ejemplo, modificar las ciudades haciendo uso del método update(condición, valor)
:
dtabla.update("customer_city = 'Bruklyn'", {"customer_city": "'Brooklyn'"})
O borrar los clientes de Brownsville
, mediante el método delete(condición)
:
dtabla.delete("customer_city = 'Brownsville'"})
# borramos los últimos clientes insertados
dtabla.delete("customer_id > 33333")
Si queremos realizar operaciones sobre DataFrames, convertiremos la tabla mediante toDF()
:
df_tabla = dtabla.toDF()
df_tabla.sort("customer_id", ascending=False).show(3)
Fusionando datos¶
Si queremos fusionar un conjunto de actualizaciones e inserciones en una tabla Delta existente, podemos emplear la sentencia MERGE INTO
, que recoge los datos de una tabla origen y los fusiona con la tabla Delta destino.
Por ejemplo, vamos a crear una vista con los datos que queremos fusionar, de manera que los que tengan el mismo id serán actualizaciones, y los que no, inserciones:
CREATE OR REPLACE TEMP VIEW clientes_updates (
customer_id, customer_fname, customer_lname
) AS VALUES
(88888, 'Andreu', 'Medrano'),
(77777, 'Marina', 'Medrano'),
(77778, 'Javier', 'Hernández'),
(77779, 'Paola', 'Girona');
clientes_delta
(customer_email
, etc...) y por defecto, nos dará error. Para que fusione los esquemas, con Python utilizaríamos .option("mergeSchema", "true")
, pero mediante Spark SQL, necesitamos configurar la siguiente propiedad:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled","true")
A continuación, realizamos la operación de MERGE INTO
:
MERGE INTO clientes_delta
USING clientes_updates
ON clientes_delta.customer_id = clientes_updates.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- +-----------------+----------------+----------------+-----------------+
-- |num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
-- +-----------------+----------------+----------------+-----------------+
-- | 4| 1| 0| 3|
-- +-----------------+----------------+----------------+-----------------+
Viajando en el tiempo¶
Podemos realizar consultas sobre snapshots de nuestras tablas Delta mediante el time travel. DeltaLake permite viajar en el tiempo a versiones previas de las tablas a partir de una versión o timestamp, por ejemplo, para volver a realizar analíticas o informes que necesitemos depurar o auditar, corregir errores en los datos u obtener aislamiento sobre los datos en tablas que no paran de cambiar.
En cualquier momento podemos consultar el histórico de cambios de una tabla mediante describe history
:
spark.sql("describe history clientes_delta").show(truncate=False);
-- +-------+-----------------------+------+--------+----------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
-- |version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
-- +-------+-----------------------+------+--------+----------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
-- |1 |2024-02-19 13:50:07.205|null |null |WRITE |{mode -> Append, partitionBy -> []} |null|null |null |0 |Serializable |true |{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 2055} |null |Apache-Spark/3.3.1 Delta-Lake/2.1.0|
-- |0 |2024-02-19 13:24:35.651|null |null |CREATE TABLE AS SELECT|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|null|null |null |null |Serializable |true |{numFiles -> 1, numOutputRows -> 12435, numOutputBytes -> 251792}|null |Apache-Spark/3.3.1 Delta-Lake/2.1.0|
-- +-------+-----------------------+------+--------+----------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
cliente_dtable = DeltaTable.forName(spark,"clientes_delta")
historialDF = cliente_dtable.history()
historialDF.show(truncate=False)
# +-------+-----------------------+------+--------+----------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
# |version|timestamp |userId|userName|operation |operationParameters |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics |userMetadata|engineInfo |
# +-------+-----------------------+------+--------+----------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
# |1 |2024-02-19 13:50:07.205|null |null |WRITE |{mode -> Append, partitionBy -> []} |null|null |null |0 |Serializable |true |{numFiles -> 2, numOutputRows -> 2, numOutputBytes -> 2055} |null |Apache-Spark/3.3.1 Delta-Lake/2.1.0|
# |0 |2024-02-19 13:24:35.651|null |null |CREATE TABLE AS SELECT|{isManaged -> true, description -> null, partitionBy -> [], properties -> {}}|null|null |null |null |Serializable |true |{numFiles -> 1, numOutputRows -> 12435, numOutputBytes -> 251792}|null |Apache-Spark/3.3.1 Delta-Lake/2.1.0|
# +-------+-----------------------+------+--------+----------------------+-----------------------------------------------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
Si queremos acceder a datos que hemos sobrescrito, podemos viajar al pasado de la tabla antes de que se sobrescribieran los datos mediante la opción versionAsOf
. Por ejemplo, vamos a recuperar todos los datos y los iniciales:
# tclientes_delta = spark.read.format("delta").table("clientes_delta")
tclientes_delta = spark.read.format("delta").load("hdfs://iabd-virtualbox:9000/user/iabd/delta/raw/clientes")
tclientes_delta.sort("customer_id", ascending=False).show(3)
# +-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
# |customer_id|customer_fname|customer_lname|customer_email|customer_password| customer_street|customer_city|customer_state|customer_zipcode|
# +-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
# | 99999| Pedro| Casas| null| null| null| null| null| null|
# | 88888| Aitor| Medrano| null| null| null| null| null| null|
# | 12435| Laura| Horton| XXXXXXXXX| XXXXXXXXX|5736 Honey Downs| Summerville| SC| 29483|
# +-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
tclientes_delta_inicial = spark.read.format("delta").option("versionAsOf", 0).table("clientes_delta")
tclientes_delta_inicial.sort("customer_id", ascending=False).show(3)
# +-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
# |customer_id|customer_fname|customer_lname|customer_email|customer_password| customer_street|customer_city|customer_state|customer_zipcode|
# +-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
# | 12435| Laura| Horton| XXXXXXXXX| XXXXXXXXX| 5736 Honey Downs| Summerville| SC| 29483|
# | 12434| Mary| Mills| XXXXXXXXX| XXXXXXXXX|9720 Colonial Parade| Caguas| PR| 00725|
# | 12433| Benjamin| Garcia| XXXXXXXXX| XXXXXXXXX|5459 Noble Brook ...| Levittown| NY| 11756|
# +-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
Otras formas de viajar con el tiempo es mediante la propiedad timestampAsOf
, el cual aplicará todas las transformaciones previas a la fecha indicada:
tclientes_delta_inicial_ts = spark.read.format("delta").option('timestampAsOf', '2024-02-19 13:30:00').table("clientes_delta")
tclientes_delta_inicial_ts.sort("customer_id", ascending=False).show(3)
# +-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
# |customer_id|customer_fname|customer_lname|customer_email|customer_password| customer_street|customer_city|customer_state|customer_zipcode|
# +-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
# | 12435| Laura| Horton| XXXXXXXXX| XXXXXXXXX| 5736 Honey Downs| Summerville| SC| 29483|
# | 12434| Mary| Mills| XXXXXXXXX| XXXXXXXXX|9720 Colonial Parade| Caguas| PR| 00725|
# | 12433| Benjamin| Garcia| XXXXXXXXX| XXXXXXXXX|5459 Noble Brook ...| Levittown| NY| 11756|
# +-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
Si queremos interactuar mediante Spark SQL, podemos realizar las mismas operaciones:
-- Trabajando con diferentes versiones
SELECT * FROM clientes_delta VERSION AS OF 0
SELECT * FROM clientes_delta@v0 -- equivalente a VERSION AS OF 0
SELECT * FROM clientes_delta TIMESTAMP AS OF "2024-02-19"
Historial y Databricks
Desde el catálogo de tablas de Databricks, podemos consultar el historial de cambios desde el Catalogo. Tras seleccionar la tabla en la que estamos interesados, en la pestaña Historial podemos obtener la misma información:
Diferencias entre versiones¶
Si queremos saber exactamente que datos han cambiado entre dos versiones podemos hacer uso de las operaciones de conjuntos que realizan la sustracción mediante EXCEPT ALL
:
-- Recuperando las diferencias entre dos versiones
SELECT * FROM clientes_delta VERSION AS OF 1
EXCEPT ALL SELECT * FROM clientes_delta VERSION AS OF 0
-- +-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
-- |customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street|customer_city|customer_state|customer_zipcode|
-- +-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
-- | 88888| Aitor| Medrano| null| null| null| null| null| null|
-- | 99999| Pedro| Casas| null| null| null| null| null| null|
-- +-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
Y el homónimo en Python lo realizaremos a partir de dos DataFrames:
tclientes_delta.exceptAll(tclientes_delta_inicial).show()
# +-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
# |customer_id|customer_fname|customer_lname|customer_email|customer_password|customer_street|customer_city|customer_state|customer_zipcode|
# +-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
# | 88888| Aitor| Medrano| null| null| null| null| null| null|
# | 99999| Pedro| Casas| null| null| null| null| null| null|
# +-----------+--------------+--------------+--------------+-----------------+---------------+-------------+--------------+----------------+
Restaurar una versión específica¶
Finalmente si queremos deshacer los cambios y volver a una versión determinada (similar a realizar un rollback) podemos utilizar RESTORE
RESTORE clientes_delta VERSION AS OF 0
-- +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
-- |table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
-- +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
-- | 251792| 1| 2| 0| 2055| 0|
-- +------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
RESTORE clientes_delta TIMESTAMP AS OF "2024-02-19"
O mediante Python, una vez tenemos una tabla, mediante los métodos restoreToVersion
o restoreToTimestamp
:
tclientes_delta.restoreToVersion(0)
tclientes_delta.restoreToTimestamp('2024-02-19')
Más información en https://docs.databricks.com/en/delta/history.html.
Pasando la aspiradora¶
Delta Lake permite eliminar ficheros de datos que no son referenciados por ninguna tabla y que tienen más antigüedad que un rango que definimos mediante el comando VACUUM
, el cual por defecto es de 7 días.
VACUUM clientes_delta
Si queremos saber los archivos que se verían implicados podrían hacerlo "en seco":
VACUUM clientes_delta DRY RUN
Podríamos considerar que Delta Lake mantiene una papelera de reciclaje sobre los datos eliminados y modificados, y mediante VACUUM
vaciaremos la papelera.
¿Y qué ventaja tiene eliminar estos archivos? Por un lado, reducir la cantidad de datos almacenados, y en consecuencia, pagar menos por ello. Además, nos asegura que aquellos datos que han sido eliminados, realmente desaparezcan de nuestro sistema.
Más información en https://docs.databricks.com/en/delta/vacuum.html.
Caso de Uso - COVID¶
Vamos a cargar en una tabla Delta los datos de COVID disponibles en el repositorio de Bing COVID 19.
Creando una tabla¶
El primer paso es descargar el archivo y a continuación, vamos a crear una tabla para almacenar los casos, en un principio, únicamente con una columna para la fecha:
spark.sql("CREATE DATABASE IF NOT EXISTS iabd")
spark.sql("""
CREATE TABLE IF NOT EXISTS iabd.covid (
fecha DATE
) USING DELTA
TBLPROPERTIES('delta.logRetentionDuration'='interval 7 days');
""")
Podemos comprobar como la tabla está vacía mediante:
spark.table("iabd.covid").inputFiles()
# []
Tras crear la tabla, la vamos a rellenar con los datos descargados:
from pyspark.sql.functions import to_date
df = spark.read.format("parquet").load("*.parquet")
df.printSchema()
Y obtenemos el esquema del archivo cargado:
root
|-- id: integer (nullable = true)
|-- updated: date (nullable = true)
|-- confirmed: integer (nullable = true)
|-- confirmed_change: integer (nullable = true)
|-- deaths: integer (nullable = true)
|-- deaths_change: short (nullable = true)
|-- recovered: integer (nullable = true)
|-- recovered_change: integer (nullable = true)
|-- latitude: double (nullable = true)
|-- longitude: double (nullable = true)
|-- iso2: string (nullable = true)
|-- iso3: string (nullable = true)
|-- country_region: string (nullable = true)
|-- admin_region_1: string (nullable = true)
|-- iso_subdivision: string (nullable = true)
|-- admin_region_2: string (nullable = true)
|-- load_time: timestamp (nullable = true)
Si intentamos persistir la tabla nos fallará porque la tabla ya existe:
df.write.format("delta").saveAsTable("iabd.covid")
# AnalysisException: Table iabd.covid already exists
Así pues, en vez de escribir mediante write
, necesitamos realizar una operación de append
, pero recibimos otro error porque el esquema del archivo Parquet (con varias columnas) no cuadra con la estructura de la tabla que habíamos creado previamente (con una única columna):
df.write.format("delta").mode("append").saveAsTable("iabd.covid")
AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 9c1b48b4-59b8-4712-b536-38d4c59513dd).
Así pues, además de añadir los datos sobre la tabla existente, debemos indicar de forma explicita que fusione los esquemas mediante la opción mergeSchema
:
df.write.format("delta").mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("iabd.covid")
Y ahora la tabla sí que tiene la misma estructura que el archivo cargado:
spark.sql("describe extended iabd.covid").show(truncate=False)
# +----------------+---------+-------+
# |col_name |data_type|comment|
# +----------------+---------+-------+
# |update |date | |
# |id |int | |
# |updated |date | |
# |confirmed |int | |
# |confirmed_change|int | |
# |deaths |int | |
# |deaths_change |smallint | |
# |recovered |int | |
# |recovered_change|int | |
# |latitude |double | |
# |longitude |double | |
# |iso2 |string | |
# |iso3 |string | |
# |country_region |string | |
# |admin_region_1 |string | |
# |iso_subdivision |string | |
# |admin_region_2 |string | |
# |load_time |timestamp| |
# | | | |
# |# Partitioning | | |
# +----------------+---------+-------+
# only showing top 20 rows
Y si comprobamos de nuevo los ficheros, veremos que ahora sí que existen:
spark.table("iabd.covid").inputFiles()
# ['file:/opt/spark/work-dir/spark-warehouse/iabd.db/covid/part-00005-9ad5b550-8ded-4177-8d89-66bd22062e2e-c000.snappy.parquet',
# 'file:/opt/spark/work-dir/spark-warehouse/iabd.db/covid/part-00001-50397be1-9510-4cd1-9d12-34e599a64cae-c000.snappy.parquet']
Hablemos con propiedad¶
Si queremos añadir una propiedad a una tabla, podemos emplear una sentencia ALTER TABLE... SET TBLPROPERTIES
. Por ejemplo:
spark.sql("""
ALTER TABLE iabd.covid
SET TBLPROPERTIES (
'engineering.team_name'='iabd'
)
"""
Con cada modificación realizada sobre la tabla, incluso sobre sus metadatos, los cambios se almacenan en el historial de la tabla.
Tablas particionadas¶
Hemos visto que podemos crear una tabla a partir de una ruta. Si queremos definirla desde cero, podemos hacer uso de DeltaTable.createIfNotExists
:
from pyspark.sql.types import DateType
from delta.tables import DeltaTable
DeltaTable.createIfNotExists(spark)
.tableName("iabd.covid_por_fecha")
.addColumn("updated", DateType(), nullable=False) # la marcamos como VNN para el particionado
.partitionedBy("updated")
.addColumn("county", "STRING")
.addColumn("state", "STRING")
.addColumn("fips", "INT")
.addColumn("cases", "INT")
.addColumn("deaths", "INT")
.execute()
Una vez creada, vamos a realizar la carga con los datos de nuestra tabla anterior:
spark.table("iabd.covid") # cargamos los datos de COVID
.write
.format("delta")
.mode("append")
.option("mergeSchema", "false")
.saveAsTable("iabd.covid_por_fecha"))
Si queremos consultar los metadatos de la tabla creada y comprobar la partición:
DeltaTable.forName(spark, "iabd.covid_por_fecha")
.detail()
.toJSON()
.collect()[0]
Delta Lake y Python¶
Si queremos trabajar desde scripts de Python y Pandas sin utilizar Spark ni Databricks, tendremos que instalar la librería deltalake, la cual es una implementación en Rust/Python:
pip install deltalake
En el siguiente script probamos las diferentes características de Delta Lake, por ejemplo, mediante las operaciones write_deltalake
, la clase DeltaTable
y sus respectivos métodos, como por ejemplo, para obtener el histórico mediante history
o viajar en el tiempo con load_as_version
:
import pandas as pd
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
df = pd.DataFrame({"datos": range(5)})
# Persistimos un DataFrame de Pandas
write_deltalake("/tmp/delta_table", df)
df = pd.DataFrame({"datos": range(6, 11)})
# Añadimos nuevos datos
write_deltalake("/tmp/delta_table", df, mode="append")
# Cargamos los datos como una tabla
dt = DeltaTable("/tmp/delta_table")
# Visualizamos los ficheros de la tabla
dt.files()
# ['0-ba28a612-6c57-4f25-b388-46d3a8cdf914-0.parquet', '1-0f732990-efd2-44da-853d-c08dc8d3d345-0.parquet']
# Histórico de revisiones
dt.history()
# [{'timestamp': 1706638235891, 'operation': 'CREATE TABLE', 'operationParameters': {'protocol': '{"minReaderVersion":1,"minWriterVersion":1}', 'location': 'file:///tmp/delta_table', 'mode': 'ErrorIfExists', 'metadata': '{"configuration":{},"created_time":1706638235887,"description":null,"format":{"options":{},"provider":"parquet"},"id":"cd66f777-a19c-4679-b5cc-a16ad57bbaf8","name":null,"partition_columns":[],"schema":{"fields":[{"metadata":{},"name":"datos","nullable":true,"type":"long"}],"type":"struct"}}'}, 'clientVersion': 'delta-rs.0.10.0'}, {'timestamp': 1706638245541, 'operation': 'WRITE', 'operationParameters': {'mode': 'Append', 'partitionBy': '[]'}, 'clientVersion': 'delta-rs.0.10.0'}]
# Time Travel a la versión inicial
dt.load_as_version(0)
# Mostramos la tabla inicial
dt.to_pandas()
# datos
# 0 0
# 1 1
# 2 2
# 3 3
# 4 4
Para comprobar los datos de la tabla creada, podemos ver el contenido de la carpeta y visualizar los diferentes archivos Parquet creados y el fichero de log con las transacciones:
ls -ls /tmp/delta_table
# -rw-r--r-- 1 NBuser NBuser 1694 Jan 30 18:10 0-ba28a612-6c57-4f25-b388-46d3a8cdf914-0.parquet
# -rw-r--r-- 1 NBuser NBuser 1696 Jan 30 18:10 1-0f732990-efd2-44da-853d-c08dc8d3d345-0.parquet
# drwxr-xr-x 2 NBuser NBuser 4096 Jan 30 18:10 _delta_log
Referencias¶
- Construir data lakes fiables con Delta Lake - Carlos del Cacho
- Delta Lake. The definitive guide (versión incompleta early release)
- Delta Lake Cheat-sheet
- Delta Lake: Up and Running - Bennie Haelen - O'Reilly
- Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores
- The Internals of Delta Lake
Actividades¶
(RASBD.1 / CESBD.1b y CESBD.1d) En las siguientes actividades vamos a practicar con el uso de DeltaLake.
-
(2p) A partir de los ejercicios 3 y 4 de la sesión anterior donde tras consumir unos datos de ventas, realizábamos una limpieza de los datos, renombrábamos columnas y posteriormente una serie de agregaciones para visualizar en gráficos el resultado, vamos a crear diversas tablas en Delta Lake siguiendo la arquitectura Medallion. Así pues:
- Almacena en
/delta/raw/sales/
los datos de las ventas - Realiza las operaciones de limpieza de la actividad 3 y almacena el resultado en
/delta/silver/sales/
particionando los datos por año. - Realiza las consultas agregadas de los apartado 4.c, 4.d y 4.e y almacena sus resultados en diversas tablas en
/delta/gold/sales/<nombreTabla>
- Almacena en
-
(1.5p) Tras una auditoria, hemos descubierto que había un libro oculto de contabilidad con ciertas ventas que no habían sido registradas en el sistema. Así pues, crea datos ficticios extra de ventas para el año 2019, colócalo en la capa raw, y a continuación, actualiza los datos del resto de capas.
-
(0.5p) Obtén el histórico de la capa raw y las tablas gold. Recupera la primera versión de la tabla que obtiene mejor recaudación mensual de 2019 (apartado 4.c) y compara su contenido con el actual.