Changelog:
- Tipo VARIANT en la definición de esquemas — Abril 2026
- Uso de Databricks Free Edition en vez de Community Edition — Abril 2026
- Añadimos FAQ — Abril 2026
Spark DataFrames / SQL¶
En la sesiones anteriores hemos introducido Spark y el uso de RDD para interactuar con los datos. Tal como comentamos, los RDD permiten trabajar a bajo nivel, siendo más cómodo y eficiente hacer uso de DataFrames y el lenguaje SQL.
DataFrames¶
Los DataFrames son la forma principal de trabajar con datos en Spark. A diferencia de los RDD, que operan a bajo nivel, los DataFrames ofrecen una abstracción estructurada similar a una tabla de base de datos relacional: los datos se organizan en filas y columnas con un esquema definido, y el motor de optimización Catalyst se encarga de planificar la ejecución de forma eficiente en el clúster.
Su uso es predominante en cualquier desarrollo moderno con Spark, y permiten trabajar tanto mediante una API de objetos como directamente con SQL estándar. Los RDD siguen existiendo como capa interna, pero en la práctica no es necesario trabajar con ellos directamente.
El trabajo con DataFrames es más sencillo y eficiente que el procesamiento con RDD, por eso su uso es predominante en los nuevos desarrollos con Spark.
A continuación veremos cómo podemos obtener y persistir DataFrames desde diferentes fuentes y formatos de datos
Creando Dataframes¶
La forma más directa de crear un DataFrame es a partir de datos en memoria mediante el método createDataFrame de la SparkSession, indicando los datos y los nombres de las columnas:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
datos = [("Aitor", 182), ("Pedro", 178), ("Marina", 161)]
columnas = ["nombre", "altura"]
df = spark.createDataFrame(datos, columnas)
df.printSchema()
Mediante printSchema obtenemos un resumen del esquema del DataFrame, donde para cada columna se indica el nombre, el tipo y si admite valores nulos:
root
|-- nombre: string (nullable = true)
|-- altura: long (nullable = true)
show:
df.show()
# +------+------+
# |nombre|altura|
# +------+------+
# | Aitor| 182|
# | Pedro| 178|
# |Marina| 161|
# +------+------+
Spark 4.0: Tablas PyArrow
A partir de Spark 4.0, el método createDataFrame() acepta directamente un objeto pyarrow.Table, lo que evita la conversión intermedia a listas o DataFrames de Pandas:
import pyarrow as pa
tabla_arrow = pa.table({
"nombre": ["Aitor", "Pedro", "Laura"],
"sueldo": [3000, 4000, 5000]
})
df = spark.createDataFrame(tabla_arrow)
df.show()
# +------+------+
# |nombre|sueldo|
# +------+------+
# | Aitor| 3000|
# | Pedro| 4000|
# | Laura| 5000|
# +------+------+
Esto es especialmente útil cuando los datos ya están en formato columnar Arrow (por ejemplo, tras leerlos con DuckDB o Polars), ya que se elimina la sobrecarga de serialización al compartir la memoria columnar directamente.
Creando un DataFrame desde un RDD
Si en algún momento trabajamos con código heredado que devuelve un RDD, podemos convertirlo a DataFrame mediante toDF. Si no le indicamos nombres de columna, Spark asignará nombres genéricos (_1, _2, …):
rdd = spark.sparkContext.parallelize(datos)
dfRDD = rdd.toDF() # columnas: _1, _2
dfRDD = rdd.toDF(columnas) # columnas: nombre, altura
Mostrando los datos¶
Para los siguientes apartados, supongamos que queremos almacenar ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:
clientes = [
("Aitor", "Medrano", "Elche", 3000),
("Pedro", "Casas", "Elche", 4000),
("Laura", "García", "Elche", 5000),
("Miguel", "Ruiz", "Torrellano", 6000),
("Isabel", "Guillén", "Alicante", 7000)
]
columnas = ["nombre","apellidos", "ciudad", "sueldo"]
df = spark.createDataFrame(clientes, columnas)
Para mostrar los datos podemos utilizar el método show, al cual le podemos indicar o no la cantidad de registros a recuperar, así como si queremos que los datos se trunquen o no, o si los queremos mostrar en vertical:
df.show(2)
# +------+---------+------+------+
# |nombre|apellidos|ciudad|sueldo|
# +------+---------+------+------+
# | Aitor| Medrano| Elche| 3000|
# | Pedro| Casas| Elche| 4000|
# +------+---------+------+------+
# only showing top 2 rows
df.show(truncate=False)
# +------+---------+----------+------+
# |nombre|apellidos|ciudad |sueldo|
# +------+---------+----------+------+
# |Aitor |Medrano |Elche |3000 |
# |Pedro |Casas |Elche |4000 |
# |Laura |García |Elche |5000 |
# |Miguel|Ruiz |Torrellano|6000 |
# |Isabel|Guillén |Alicante |7000 |
# +------+---------+----------+------+
df.show(3, vertical=True)
# -RECORD 0------------
# nombre | Aitor
# apellidos | Medrano
# ciudad | Elche
# sueldo | 3000
# -RECORD 1------------
# nombre | Pedro
# apellidos | Casas
# ciudad | Elche
# sueldo | 4000
# -RECORD 2------------
# nombre | Laura
# apellidos | García
# ciudad | Elche
# sueldo | 5000
# only showing top 3 rows
Si sólo queremos recuperar unos pocos datos, podemos hacer uso de head o first los cuales devuelven objetos Row:
df.first()
# Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)
df.head()
# Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)
df.head(3)
# [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
# Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
# Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000)]
Si queremos obtener un valor en concreto, una vez recuperada una fila, podemos acceder a sus columnas:
nom1 = df.first()[0] # 'Aitor'
nom2 = df.first()["nombre"] # 'Aitor'
También podemos obtener un sumario de los datos (igual que con Pandas) mediante describe:
df.describe().show()
# +-------+------+---------+----------+------------------+
# |summary|nombre|apellidos| ciudad| sueldo|
# +-------+------+---------+----------+------------------+
# | count| 5| 5| 5| 5|
# | mean| null| null| null| 5000.0|
# | stddev| null| null| null|1581.1388300841897|
# | min| Aitor| Casas| Alicante| 3000|
# | max| Pedro| Ruiz|Torrellano| 7000|
# +-------+------+---------+----------+------------------+
Si únicamente nos interesa saber cuantas filas tiene nuestro DataFrame, podemos hacer uso de count:
df.count() # 5
Por último, además de show, podemos recuperar todos los registros del DataFrame mediante collect, o solo los primeros n mediante take. Ambos métodos devuelven una lista de objetos Row:
df.collect()
# [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
# Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
# Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000),
# Row(nombre='Miguel', apellidos='Ruiz', ciudad='Torrellano', sueldo=6000),
# Row(nombre='Isabel', apellidos='Guillén', ciudad='Alicante', sueldo=7000)]
df.take(2)
# [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
# Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000)]
nom = df.collect()[0][0] # 'Aitor'
Ten en cuenta que collect trae todos los datos al nodo driver, por lo que conviene usarlo solo con conjuntos de datos pequeños o tras haber reducido previamente el DataFrame.
Cargando diferentes formatos¶
Lo más usual es cargar los datos desde una archivo externo. Para ello, mediante el API de DataFrameReader cargaremos los datos directamente en un Dataframe mediante diferentes métodos dependiendo del formato (admite tanto el nombre de un recurso como una ruta de una carpeta).
Para cada formato, existe un método corto que se llama como el formato en sí, y un método general donde mediante format indicamos el formato y que finaliza con el método load siempre dentro de spark.read:
dfCSV = spark.read.csv("datos.csv")
dfCSV = spark.read.csv("datos/*.csv") # Una carpeta entera
dfCSV = spark.read.option("sep", ";").csv("datos.csv")
dfCSV = spark.read.option("header", "true").csv("datos.csv")
dfCSV = spark.read.option("header", True).option("inferSchema", True).csv("datos.csv")
dfCSV = spark.read.options(sep=";", header=True, inferSchema=True).csv("pdi_sales.csv")
dfCSV = spark.read.format("csv").load("datos.csv")
dfCSV = spark.read.load(path="datos.csv", format="csv", header="true", sep=";", inferSchema="true")
Más información en la documentación oficial
dfTXT = spark.read.text("datos.txt")
# cada fichero se lee entero como un registro
dfTXT = spark.read.option("wholetext", True).text("datos/")
dfTXT = spark.read.format("text").load("datos.txt")
Más información en la documentación oficial
dfJSON = spark.read.json("datos.json")
dfJSON = spark.read.format("json").load("datos.json")
Más información en la documentación oficial
DataFrames desde JSON
Spark espera que cada documento JSON ocupe una única línea (JSONL). Si cada documento ocupa más de una línea, se lo indicamos mediante la opción multiline:
df = spark.read.option("multiline",True).option("inferSchema", "true").json("datos.json")
dfParquet = spark.read.parquet("datos.parquet")
dfParquet = spark.read.format("parquet").load("datos.parquet")
Más información en la documentación oficial
Avro
La fuente de datos en formato Avro se incluye como un módulo externo, y por lo tanto, para poder leer o escribir datos en dicho formato, previamente hemos de cargar una librería.
Para ello, al arrancar PySpark, le pasaremos como parámetro --packages org.apache.spark:spark-avro_2.13:4.1.1 (suponiendo que tenemos instalada la versión 4.1.1 de Spark):
pyspark --packages org.apache.spark:spark-avro_2.13:4.1.1
o si estamos usando Spark en modo local o mediante Docker, al arrancar la sesión de Spark le indicamos el mismo parámetro mediante config:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.13:4.1.1")
.getOrCreate())
Una vez arrancado, ya podemos leer y escribir datos en formato Avro de forma similar al resto:
df = spark.read.format("avro").load("datos.avro")
df.write.format("avro").save("archivo.avro")
La librería también nos permite convertir columnas y estructuras de datos con las operaciones to_avro() y from_avro(). Más información en la documentación oficial.
Persistiendo diferentes formatos¶
Si lo que queremos es persistir los datos, en vez de read, utilizaremos write (de manera que obtenemos un DataFrameWriter) y si usamos la forma general usaremos el método save:
dfCSV.write.csv("datos.csv")
dfCSV.write.format("csv").save("datos.csv")
dfCSV.write.format("csv").mode("overwrite").save("datos.csv")
Más información en la documentación oficial
dfTXT.write.text("datos.txt")
dfTXT.write.option("lineSep",";").text("datos.txt")
dfTXT.write.format("txt").save("datos.txt")
Más información en la documentación oficial
dfJSON.write.json("datos.json")
dfJSON.write.format("json").save("datos.json")
Más información en la documentación oficial
dfParquet.write.parquet("datos.parquet")
dfParquet.write.mode("overwrite").partitionBy("fecha").parquet("datos/")
dfParquet.write.format("parquet").save("datos.parquet")
Más información en la documentación oficial
Un único archivo de salida
Por cada partición, Spark generará un archivo de salida. Recuerda que podemos reducir el número de particiones mediante coalesce o repartition.
Por cada partición, Spark generará un archivo de salida independiente. Si necesitamos reducir el número de archivos resultantes (por ejemplo, para obtener un único archivo), podemos reparticionar el DataFrame antes de escribir mediante coalesce o repartition:
df.coalesce(1).write.csv("datos.csv")
df.repartition(1).write.csv("datos.csv")
Mientras que coalesce(1) es la opción más eficiente cuando queremos reducir particiones, ya que evita un shuffle completo de los datos, repartition(1) es más costoso porque implica un shuffle completo para redistribuir los datos en una sola partición.
Ten en cuenta que esto puede afectar al rendimiento, ya que se pierde paralelismo.
Una vez vista la sintaxis, vamos a ver un ejemplo completo de lectura de un archivo CSV (el archivo pdi_sales.csv que hemos utilizado durante todo el curso) que está almacenado en HDFS / Minio (S3) y que tras leerlo, lo guardamos como JSON de nuevo en HDFS / Minio (S3):
Si utilizas la máquina virtual, puedes acceder a los datos almacenados en HDFS utilizando la URL del recurso con el prefijo hdfs:// más el host del namenode (en nuestro caso, iabd-virtualbox:9000):
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("s8a-dataframe-hdfs-csv").getOrCreate()
# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("delimiter",";").option("header", "true").csv("hdfs://iabd-virtualbox:9000/user/iabd/pdi_sales.csv")
# df.printSchema()
df.write.json("hdfs://iabd-virtualbox:9000/user/iabd/pdi_sales_json")
Si utilizamos Docker y Minio para simular un almacenamiento de objetos, para acceder a los datos almacenados en Minio / S3, además de indicar la URL del recurso con el prefijo s3a:// (indicando el nombre del bucket y la ruta del recurso), hemos de configurar las credenciales de acceso, el endpoint, así como el driver JDBC (tanto para el nodo driver como para los nodos executor):
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("s8a-dataframe-minio-csv")
# --- JDBC driver + extra jars (driver y executors) ---
.config("spark.driver.extraClassPath", "/opt/spark/extra-jars/*:/opt/spark/jars/*")
.config("spark.executor.extraClassPath", "/opt/spark/extra-jars/*:/opt/spark/jars/*")
# --- MinIO / S3A ---
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin123")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.getOrCreate()
)
# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("delimiter",";").option("header", "true").csv("s3a://raw-data/pdi_sales.csv")
# df.printSchema()
df.write.json("s3a://processed/pdi_sales_json")
Es conveniente destacar que para acceder a HDFS únicamente hemos de indicar la URL del recurso con el prefijo hdfs:// más el host del namenode. En el caso de Minio / S3 utilizaremos rutas con el prefijo s3a://, indicando el nombre del bucket y la ruta del recurso, por ejemplo s3a://raw-data/pdi_sales.csv.
Comprimiendo los datos¶
Para configurar el algoritmo de compresión, si los datos está en Parquet o Avro, a nivel de la sesión de Spark, podemos realizar su configuración:
spark.setConf("spark.sql.parquet.compression.codec","snappy")
spark.setConf("spark.sql.parquet.compression.codec","none")
spark.setConf("spark.sql.avro.compression.codec","snappy")
Si sólo queremos hacerlo para una operación en particular, para cada lectura/escritura le añadimos .option("compression", "algoritmo"). Por ejemplo:
dfVentas = spark.read.option("compression", "snappy").option("delimiter",";").option("header", "true").csv("pdi_sales.csv")
dfClientes = spark.read.option("compression", "snappy").parquet("clientes.parquet")
dfVentas.write.option("compression", "snappy").format("avro").save("ventas.avro")
Datos y Esquemas¶
El esquema completo de un DataFrame se modela mediante un StructType, el cual contiene una colección de objetos StructField.
Así pues, cada columna de un DataFrame de Spark se modela mediante un objeto StructField indicando su nombre, tipo y gestión de los nulos.
Hemos visto que al crear un DataFrame desde un archivo externo, podemos inferir el esquema. Si queremos crear un DataFrame desde un esquema propio utilizaremos los tipos StructType, StructField, así como StringType, IntegerType o el tipo necesario para cada columna. Para ello, primero hemos de importarlos (como puedes observar, estas clases pertenecen a las librerías SQL de PySpark):
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
Además de cadenas y enteros, flotantes (FloatType) o dobles (DoubleType), tenemos tipos booleanos (BooleanType), de fecha (DateType y TimestampType), así como tipos complejos como ArrayType, MapType y StructType. Para más información sobre estos o más tipos de datos, podéis consultar la documentación oficial.
Volvamos al ejemplo anterior donde tenemos ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:
clientes = [
("Aitor", "Medrano", "Elche", 3000),
("Pedro", "Casas", "Elche", 4000),
("Laura", "García", "Elche", 5000),
("Miguel", "Ruiz", "Torrellano", 6000),
("Isabel", "Guillén", "Alicante", 7000)
]
Para esta estructura, definiremos un esquema con los campos, indicando para cada uno de ellos su nombre, tipo y si admite valores nulos:
esquema = StructType([
StructField("nombre", StringType(), False),
StructField("apellidos", StringType(), False),
StructField("ciudad", StringType(), True),
StructField("sueldo", IntegerType(), False)
])
A continuación ya podemos crear un DataFrame con datos propios que cumplen un esquema haciendo uso del método createDataFrame:
df = spark.createDataFrame(data=clientes, schema=esquema)
df.printSchema()
# root
# |-- nombre: string (nullable = false)
# |-- apellidos: string (nullable = false)
# |-- ciudad: string (nullable = true)
# |-- sueldo: integer (nullable = false)
df.show(truncate=False)
# +------+---------+----------+------+
# |nombre|apellidos|ciudad |sueldo|
# +------+---------+----------+------+
# |Aitor |Medrano |Elche |3000 |
# |Pedro |Casas |Elche |4000 |
# |Laura |García |Elche |5000 |
# |Miguel|Ruiz |Torrellano|6000 |
# |Isabel|Guillén |Alicante |7000 |
# +------+---------+----------+------+
Esquemas y PyArrow
A partir de Spark 4.0, el método createDataFrame() es compatible con objetos pyarrow.Schema y pyarrow.Table, lo que facilita la creación de DataFrames a partir de datos en formato Arrow sin necesidad de convertirlos a listas o DataFrames de Pandas:
import pyarrow as pa
# Definir un esquema Arrow
esquema_arrow = pa.schema([
("nombre", pa.string()),
("apellidos", pa.string()),
("ciudad", pa.string()),
("sueldo", pa.int32())
])
# Creamos la tabla de Arrow con los datos en formato columnar
tabla_arrow = pa.table({
"nombre": ["Aitor", "Pedro", "Casas", "Miguel", "Isabel"],
"apellidos": ["Medrano", "Casas", "García", "Ruiz", "Guillén"],
"ciudad": ["Elche", "Elche", "Elche", "Torrellano", "Alicante"],
"sueldo": [3000, 4000, 5000, 6000, 7000]
}, schema=esquema_arrow)
# Crear un DataFrame de Spark directamente desde la tabla Arrow
df = spark.createDataFrame(tabla_arrow)
df.printSchema()
Esto elimina la sobrecarga de conversión intermedia, permitiendo una creación de DataFrames más eficiente cuando los datos ya están en formato Arrow.
Si lo que queremos es asignarle un esquema a un DataFrame que vamos a leer desde una fuente de datos externa, hemos de emplear el método schema:
dfClientes = spark.read.option("header", True).schema(esquema).csv("clientes.csv")
Rendimiento y esquema
La inferencia de los tipos de los datos es un proceso computacionalmente costoso. Por ello, si nuestro conjunto de datos es grande, es muy recomendable crear el esquema de forma programativa y configurarlo en la carga de datos.
Se recomienda la lectura del artículo Using schemas to speed up reading into Spark DataFrames.
Respecto al esquema, tenemos diferentes propiedades como columns, dtypes y schema con las que obtener su información:
df.columns
# ['nombre', 'apellidos', 'ciudad', 'sueldo']
df.dtypes
# [('nombre', 'string'),
# ('apellidos', 'string'),
# ('ciudad', 'string'),
# ('sueldo', 'int')]
df.schema
# StructType(List(StructField(nombre,StringType,false),StructField(apellidos,StringType,false),StructField(ciudad,StringType,true),StructField(sueldo,IntegerType,false)))
Si una vez hemos cargado un DataFrame queremos cambiar el tipo de una de sus columnas, podemos hacer uso del método withColumn, donde le pasaremos el nombre de la columna a modificar y el nuevo tipo al que queremos convertirla mediante cast:
# Forma larga
from pyspark.sql.types import DoubleType
df = df.withColumn("sueldo", df.sueldo.cast(DoubleType())
# Forma corta
df = df.withColumn("sueldo", df.sueldo.cast("double"))
# df = df.withColumn("fnac", to_date(df.Date, "M/d/yyy"))
Errores al leer datos
Si tenemos un error al leer un dato que contiene un tipo no esperado, por defecto, Spark lanzará una excepción y se detendrá la lectura.
Si queremos que asigne los tipos a los campos pero que no los valide, podemos pasarle el parámetro extra verifySchema a False al crear un DataFrame mediante spark.createDataFrame o enforceSchema también a False al cargar desde una fuente externa mediante spark.read, de manera que los datos que no concuerden con el tipo se quedarán nulos, vacíos o con valor 0, dependiendo del tipo de dato que tiene asignada la columna en el esquema.
dfClientes = spark.read.option("header", True).option("enforceSchema",False).schema(esquema).csv("clientes.csv")
Tipo VARIANT¶
Antes de Spark 4.0, trabajar con datos semiestructurados obligaba a elegir entre dos enfoques con compromisos opuestos:
- Flexibilidad primero: almacenar el JSON como una columna
string. Fácil de ingestar, pero con un rendimiento pobre en consultas, porque cada acceso requería parsear la cadena completa de nuevo. - Rendimiento primero: definir un esquema rígido con
StructTypeanidados. Eficiente, pero costoso de mantener cuando el esquema cambia o es desconocido.
Para solucionar esto, el tipo VARIANT, introducido tanto en Spark 4.0 como en Delta Lake 4.0, facilita almacenar datos semiestructurados en un formato binario optimizado que permite un acceso rápido a campos concretos sin parsear todo el documento, mientras mantiene la flexibilidad de no requerir un esquema previo.
Rendimiento
Según benchmarks de Databricks, consultar campos dentro de una columna VARIANT puede ser hasta 8× más rápido que hacerlo sobre la misma columna almacenada como string JSON.
Así pues, usaremos VARIANTcuando:
- Los datos provengan de APIs o fuentes externas con esquema variable o evolutivo (distintos registros pueden tener campos diferentes).
- Necesitemos explorar datos JSON sin conocer su estructura de antemano, por ejemplo en fases de descubrimiento o data lake raw.
- Trabajemos con colecciones heterogéneas donde cada fila puede contener un subconjunto diferente de atributos.
- El esquema cambia con frecuencia y mantener
StructTypeactualizados supone un coste elevado.
No es la mejor opción cuando el esquema es estable y conocido: en ese caso, un StructType bien definido seguirá siendo más eficiente.
Trabajando con datos VARIANT¶
El tipo VARIANT se puede usar directamente mediante uno de los siguientes enfoques:
-
Definir el tipo VARIANT directamente en el esquema con
VariantType:from pyspark.sql.functions import parse_json, col from pyspark.sql.types import StructType, StructField, IntegerType, VariantType # Opción A: definir el tipo en el esquema esquema = StructType([ StructField("id", IntegerType(), False), StructField("datos", VariantType(), True) ]) -
Convertir strings JSON a VARIANT con
parse_json:eventos = [ (1, '{"nombre": "Aitor", "ciudad": "Elche"}'), (2, '{"nombre": "Laura", "ciudad": "Alicante", "edad": 31}'), (3, '{"nombre": "Pedro", "tags": ["spark", "python"]}'), ] df = spark.createDataFrame(eventos, ["id", "json_str"]) df_variant = df.withColumn("datos", parse_json(col("json_str"))).drop("json_str") df_variant.printSchema() # root # |-- id: long (nullable = true) # |-- datos: variant (nullable = true)try_parse_json
Si los datos de entrada pueden contener JSON malformado, es mejor utilizar
try_parse_jsonen lugar deparse_json, ya que en vez de lanzar una excepción, devuelvenullpara las cadenas no válidas:from pyspark.sql.functions import try_parse_json df_safe = df.withColumn("datos", try_parse_json(col("json_str"))) -
Convertir tipos complejos como un
StructType,ArrayTypeoMapTypeexistente directamente a VARIANT conto_variant_object:esquema_anidado = StructType([ StructField("usuario", StringType(), True), StructField("historial", ArrayType(StructType([ StructField("preferencias", MapType(StringType(), StringType()), True) ]), True)) ]) data = [("Aitor", [{"preferencias": {"deporte": "baloncesto", "música": "indie"}}])] df2 = spark.createDataFrame(data, esquema_anidado) df2.select(sf.to_variant_object(df2.historial)).show(truncate=False) # +------------------------------------------------+ # |to_variant_object(historial) | # +------------------------------------------------+ # |[{"preferencias":{"deporte":"baloncesto",...}}] | # +------------------------------------------------+Cuando el origen ya es un tipo complejo (
StructType,ArrayType,MapType, o cualquier combinación anidada de ellos),to_variant_objectconvierte directamente la estructura en memoria al formato binarioVARIANT, sin serializar a texto JSON y volver a parsearlo.Esta opción es la más eficiente en ese caso, y también la más adecuada cuando se lee un JSONL con inferencia de esquema y Spark ya ha construido el
StructTypepor nosotros.
Para acceder a un campo dentro de una columna VARIANT usaremos variant_get (o try_variant_get), indicando la columna, la ruta JSONPath y el tipo de destino. La diferencia entre ambas variantes es su comportamiento ante errores, de manera que si no encuentran una ruta, lancen una excepción o devuelven null::
from pyspark.sql.functions import parse_json, variant_get, try_variant_get
datos_json = [
(1, '{"nombre": "Aitor", "ciudad": "Elche", "edad": 48}'),
(2, '{"nombre": "Laura", "ciudad": "Alicante", "edad": 33}'),
(3, '{"nombre": "Pedro", "tags": ["baloncesto", "natación"]}'),
]
df = spark.createDataFrame(datos_json, ["id", "json_str"])
df_v = df.withColumn("datos", parse_json("json_str")).drop("json_str")
# Extracción de campos con tipo explícito
df_v.select(
"id",
variant_get("datos", "$.nombre", "string").alias("nombre"),
variant_get("datos", "$.ciudad", "string").alias("ciudad"),
try_variant_get("datos", "$.edad", "int").alias("edad"), # null si no existe
try_variant_get("datos", "$.edad", "boolean").alias("mal") # null si el cast falla
).show()
# +---+------+--------+----+----+
# | id|nombre| ciudad|edad| mal|
# +---+------+--------+----+----+
# | 1| Aitor| Elche| 48|null|
# | 2| Laura|Alicante| 33|null|
# | 3| Pedro| null|null|null|
# +---+------+--------+----+----+
Finalmente, para serializar una columna VARIANT de vuelta a cadena JSON (por ejemplo, para escribirla en un sistema que no soporta el tipo binario), usaremos to_json:
from pyspark.sql.functions import to_json
df_v.select("id", to_json("datos").alias("json_string")).show(truncate=False)
# +---+---------------------------------------------+
# | id|json_string |
# +---+---------------------------------------------+
# | 1|{"nombre":"Aitor","ciudad":"Elche",...} |
# | 2|{"nombre":"Laura","ciudad":"Alicante",...} |
# | 3|{"nombre":"Pedro","tags":["baloncesto",...]} |
# +---+---------------------------------------------+
Recuerda que las rutas JSONPath soportan acceso a objetos anidados y arrays:
datos_anidados = [('{"direccion": {"ciudad": "Elche", "cp": "03206"}, "tags": ["spark", "python"]}',)]
df2 = spark.createDataFrame(datos_anidados, ["json_str"])
df2_v = df2.withColumn("datos", parse_json("json_str")).drop("json_str")
df2_v.select(
variant_get("datos", "$.direccion.ciudad", "string").alias("ciudad"), # objeto anidado
variant_get("datos", "$.tags[0]", "string").alias("primer_tag"), # primer elemento array
variant_get("datos", "$.tags[1]", "string").alias("segundo_tag"),
).show()
# +------+----------+-----------+
# |ciudad|primer_tag|segundo_tag|
# +------+----------+-----------+
# | Elche| spark| python|
# +------+----------+-----------+
Descubriendo esquemas¶
Cuando los datos son desconocidos y queremos descubrir qué campos están presentes en los datos antes de decidir cómo extraerlos, podemos hacer uso de schema_of_variant para descubrir la estructura de un registro concreto, y schema_of_variant_agg el cual infiere el esquema combinado de todos los registros de la columna:
from pyspark.sql.functions import schema_of_variant, schema_of_variant_agg, lit
# Esquema de un único valor
spark.range(1).select(
schema_of_variant(parse_json(lit('{"nombre": "Aitor", "edad": 45}'))).alias("esquema")
).show(truncate=False)
# +--------------------------------+
# |esquema |
# +--------------------------------+
# |OBJECT<edad: BIGINT, nombre: ... |
# +--------------------------------+
# Esquema combinado de toda la columna
df_v.select(schema_of_variant_agg("datos").alias("esquema_global")).show(truncate=False)
# +-------------------------------------------------------+
# |esquema_global |
# +-------------------------------------------------------+
# |OBJECT<ciudad: STRING, edad: BIGINT, nombre: STRING, ...|
# +-------------------------------------------------------+
Trabajando con nulos
VARIANT distingue dos tipos de nulo que conviene no confundir:
- SQL NULL: la fila no contiene ningún valor en esa columna (el dato falta).
- Variant null: el documento JSON contiene explícitamente el valor
nullpara ese campo.
is_variant_null permite detectar el segundo caso:
from pyspark.sql.functions import is_variant_null
nulos = [
('{"campo": null}',), # variant null explícito
('{"campo": 1}',), # valor numérico
(None,), # SQL NULL (la columna entera es nula)
]
df_n = spark.createDataFrame(nulos, ["json_str"])
df_n = df_n.withColumn("datos", try_parse_json("json_str")).drop("json_str")
df_n.select(
"datos",
is_variant_null(variant_get("datos", "$.campo", "variant")).alias("es_variant_null")
).show()
# +----------------+---------------+
# | datos|es_variant_null|
# +----------------+---------------+
# | {"campo":null}| true|
# | {"campo":1}| false|
# | null| false| <- SQL NULL, no variant null
# +----------------+---------------+
Explotando arrays y objetos
Cuando una columna VARIANT contiene un array u objeto y queremos expandirla en múltiples filas, usamos variant_explode (o variant_explode_outer para conservar las filas con valores no explosionables):
from pyspark.sql.functions import parse_json, lit
# Explotar un array VARIANT
spark.tvf.variant_explode(
parse_json(lit('["spark", "python", "delta"]'))
).show()
# +---+----+--------+
# |pos| key| value|
# +---+----+--------+
# | 0|NULL| "spark"|
# | 1|NULL|"python"|
# | 2|NULL| "delta"|
# +---+----+--------+
# Explotar un objeto VARIANT - tvf = Table Valued Function
spark.tvf.variant_explode(
parse_json(lit('{"nombre": "Aitor", "edad": 45}'))
).show()
# +---+------+-------+
# |pos| key| value|
# +---+------+-------+
# | 0| edad| 45|
# | 1|nombre|"Aitor"|
# +---+------+-------+
Para explotar un campo anidado dentro de un DataFrame existente, primero lo extraemos como variant y luego aplicamos el lateralJoin (estudiaremos los join en la siguiente sesión):
from pyspark.sql.functions import variant_get
# DataFrame con arrays en la columna VARIANT
eventos = [
(1, '{"usuario": "Aitor", "tags": ["spark", "python"]}'),
(2, '{"usuario": "Laura", "tags": ["delta", "kafka"]}'),
]
df_e = (spark.createDataFrame(eventos, ["id", "json_str"])
.withColumn("datos", parse_json("json_str")).drop("json_str"))
# Extraer el array como VARIANT y explotar
df_tags = df_e.withColumn("tags", variant_get("datos", "$.tags", "variant")).drop("datos")
df_exploded = df_tags.lateralJoin(spark.tvf.variant_explode(col("tags").outer()))
df_exploded.select("id", col("value").alias("tag")).show()
# +---+--------+
# | id| tag|
# +---+--------+
# | 1| "spark"|
# | 1|"python"|
# | 2| "delta"|
# | 2| "kafka"|
# +---+--------+
DataFrame API¶
Spark permite trabajar con los datos mediante dos tipos de acercamientos, el API de objetos de DataFrames y el lenguaje SQL. Ambos enfoques son equivalentes, ya que el lenguaje SQL se traduce internamente a operaciones sobre DataFrames. En este apartado nos centraremos en el API de objetos, y en el siguiente veremos el lenguaje SQL.
Así pues, una vez tenemos un DataFrame podemos trabajar con los datos mediante un conjunto de operaciones estructuradas, muy similares al lenguaje relacional. Estas operaciones también se clasifican en transformaciones y acciones, recordando que las transformaciones utilizan una evaluación perezosa, es decir, no se ejecutan inmediatamente cuando se definen, sino que se construye un plan de ejecución que se optimiza y se ejecuta solo cuando es necesario obtener un resultado (por ejemplo, al llamar a una acción como show o collect), permitiendo optimizar el plan de ejecución y mejorar el rendimiento al evitar cálculos innecesarios.
Es muy importante tener en cuenta que todas las operaciones que vamos a realizar a continuación son inmutables, es decir, nunca van a modificar el DataFrame sobre el que realizamos la transformación. Así pues, realizaremos encadenamiento de transformaciones (transformation chaining) o asignaremos el resultado a un nuevo DataFrame.
Preparación
Para los siguientes apartados, vamos a trabajar sobre el siguiente DataFrame cargado desde el fichero de ventas que hemos utilizado a lo largo del curso:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("s8a-dataframes-api").getOrCreate()
# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("sep",";").option("header", "true").option("inferSchema", "true").csv("pdi_sales_small.csv")
df.printSchema()
El esquema generado es:
root
|-- ProductID: integer (nullable = true)
|-- Date: string (nullable = true)
|-- Zip: string (nullable = true)
|-- Units: integer (nullable = true)
|-- Revenue: double (nullable = true)
|-- Country: string (nullable = true)
Proyectando¶
La operación select permite indicar las columnas a recuperar pasándolas como parámetros:
df.select("ProductID","Revenue").show(3)
+---------+-------+
|ProductID|Revenue|
+---------+-------+
| 725| 115.5|
| 787| 314.9|
| 788| 314.9|
+---------+-------+
only showing top 3 rows
También podemos realizar cálculos (referenciando a los campos con nombreDataframe.nombreColumna) sobre las columnas y crear un alias (operación asociada a un campo):
df.select(df.ProductID,(df.Revenue+10).alias("VentasMas10")).show(3)
+---------+-----------+
|ProductID|VentasMas10|
+---------+-----------+
| 725| 125.5|
| 787| 324.9|
| 788| 324.9|
+---------+-----------+
only showing top 3 rows
Si tenemos un DataFrame con un gran número de columnas y queremos recuperarlas todas a excepción de unas pocas, es más cómodo utilizar la transformación drop, la cual funciona de manera opuesta a select, es decir, indicando las columnas que queremos quitar del resultado:
# Obtenemos el mismo resultado
df.select("ProductID", "Date", "Zip")
df.drop("Units", "Revenue", "Country")
Trabajando con columnas¶
Para acceder a las columnas, debemos crear objetos Column. Para ello, podemos seleccionarlos a partir de un DataFrame como una propiedad o mediante la función col:
from pyspark.sql.functions import col
miProductoID = df.ProductID
miProductoID = df["ProductID"]
miProductoID = col("ProductID")
Así pues, podemos recuperar ciertas columnas de un DataFrame con cualquier de las siguientes expresiones:
from pyspark.sql.functions import col
df.select("ProductID", "Revenue").show()
df.select(df.ProductID, df.Revenue).show()
df.select(df["ProductID"], df["Revenue"]).show()
df.select(col("ProductID"), col("Revenue")).show()
col vs expr¶
En ocasiones se confunde el uso de la función col con expr. Aunque podemos referenciar una columna haciendo uso de expr, su uso provoca que se parseé la cadena recibida para interpretarla.
Para el siguiente ejemplo, supongamos que tenemos un DataFrame con datos de clientes. Utilizaremos también la función concat_ws para concatenar textos utilizado un separador.
from pyspark.sql.functions import col,expr
df.select(concat_ws(" ", col("nombre"), col("apellidos")).alias("nombreCompleto"),
"sueldo",
(col("sueldo")*1.1).alias("nuevoSueldo")).show()
df.select(concat_ws(" ", col("nombre"), col("apellidos")).alias("nombreCompleto"),
"sueldo",
expr("sueldo*1.1").alias("nuevoSueldo")).show()
Añadiendo columnas¶
Una vez tenemos un DataFrame, podemos añadir columnas mediante el método withColumn:
dfNuevo = df.withColumn("total", df.Units * df.Revenue)
dfNuevo.show()
# +---------+----------+---------------+-----+-------+-------+------+
# |ProductID| Date| Zip|Units|Revenue|Country| total|
# +---------+----------+---------------+-----+-------+-------+------+
# | 725|1999-01-15|41540 | 1| 115.5|Germany| 115.5|
# | 787|2002-06-06|41540 | 1| 314.9|Germany| 314.9|
# | 788|2002-06-06|41540 | 1| 314.9|Germany| 314.9|
# | 901|1999-02-15|13587 | 2| 818.9|Germany|1637.8|
# ...
withColumn
Anteriormente hemos utilizado el método withColumn para cambiarle el tipo a un campo ya existente. Así pues, si referenciamos a una columna que ya existe, en vez de crearla, la sustituirá.
Otra forma de añadir una columna con una expresión es mediante la transformación selectExpr. Por ejemplo, podemos conseguir el mismo resultado que en el ejemplo anterior de la siguiente manera:
df.selectExpr("*", "Units * Revenue as total").show()
# +---------+----------+---------------+-----+-------+-------+------+
# |ProductID| Date| Zip|Units|Revenue|Country| total|
# +---------+----------+---------------+-----+-------+-------+------+
# | 725|1999-01-15|41540 | 1| 115.5|Germany| 115.5|
# | 787|2002-06-06|41540 | 1| 314.9|Germany| 314.9|
# | 788|2002-06-06|41540 | 1| 314.9|Germany| 314.9|
# | 901|1999-02-15|13587 | 2| 818.9|Germany|1637.8|
# ...
Aunque más adelante veremos como realizar transformaciones con agregaciones, mediante selectExpr también podemos realizar analítica de datos aprovechando la potencia de SQL:
df.selectExpr("count(distinct(ProductID)) as productos","count(distinct(Country)) as paises").show()
# +---------+------+
# |productos|paises|
# +---------+------+
# | 799| 4|
# +---------+------+
Cambiando el nombre¶
Si por algún extraño motivo necesitamos cambiarle el nombre a una columna (por ejemplo, vamos a unir dos DataFrames que tienen columnas con el mismo nombre pero en posiciones diferentes, o que al inferir el esquema tenga un nombre críptico o demasiado largo y queremos que sea más legible) podemos utilizar la transformación withColumnRenamed:
df.withColumnRenamed("Zip", "PostalCode").show(5)
# +---------+----------+---------------+-----+-------+-------+
# |ProductID| Date| PostalCode|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# | 725|1999-01-15|41540 | 1| 115.5|Germany|
# | 787|2002-06-06|41540 | 1| 314.9|Germany|
# | 788|2002-06-06|41540 | 1| 314.9|Germany|
# | 940|1999-01-15|22587 | 1| 687.7|Germany|
# | 396|1999-01-15|22587 | 1| 857.1|Germany|
# +---------+----------+---------------+-----+-------+-------+
# only showing top 5 rows
Filtrando¶
Si queremos eliminar filas, usaremos el método filter:
df.filter(df.Country=="Germany").show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# | 725|1999-01-15|41540 | 1| 115.5|Germany|
# | 787|2002-06-06|41540 | 1| 314.9|Germany|
# | 788|2002-06-06|41540 | 1| 314.9|Germany|
# | 940|1999-01-15|22587 | 1| 687.7|Germany|
Por similitud con SQL, podemos utilizar también where como un alias de filter:
df.where(df.Units>20).show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# | 495|1999-03-15|75213 CEDEX 16 | 77|43194.1| France|
# | 2091|1999-05-15|9739 | 24| 3652.7| Mexico|
# | 2091|1999-06-15|40213 | 41| 6240.1|Germany|
# | 2091|1999-10-15|40213 | 41| 6347.7|Germany|
# | 2091|1999-12-15|40213 | 23| 3560.9|Germany|
# +---------+----------+---------------+-----+-------+-------+
Podemos utilizar los operadores lógicos (& para conjunción y | para la disyunción) para crear condiciones compuestas (recordad rodear cada condición entre paréntesis):
df.filter((df.Country=="Germany") & (df.Units>20)).show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# | 2091|1999-06-15|40213 | 41| 6240.1|Germany|
# | 2091|1999-10-15|40213 | 41| 6347.7|Germany|
# | 2091|1999-12-15|40213 | 23| 3560.9|Germany|
# +---------+----------+---------------+-----+-------+-------+
df.filter((df.ProductID==2314) | (df.ProductID==1322)).show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# | 2314|1999-05-15|46045 | 1| 13.9|Germany|
# | 1322|2000-01-06|75593 CEDEX 12 | 1| 254.5| France|
# +---------+----------+---------------+-----+-------+-------+
ilike
Desde Spark 3.3 podemos emplear las operaciones de columna like / ilike para indicar condiciones de forma similar al operador like de SQL.
df.filter(df.Country.like('Franc%'))
df.filter(df.Country.ilike('franc%'))
Un caso particular de filtrado es la eliminación de los registros repetidos, lo cual lo podemos hacer de dos maneras:
- Haciendo uso del método distinct tras haber realizado alguna transformación
- Utilizando dropDuplicates sobre un DataFrame:
df.select("Country").distinct().show()
df.dropDuplicates(["Country"]).select("Country").show()
# Spark 4.0 acepta argumentos directos en dropDuplicates
# df.dropDuplicates("Country").select("Country").show()
# +-------+
# |Country|
# +-------+
# |Germany|
# | France|
# | Mexico|
# | Canada|
# +-------+
Ordenando¶
Una vez recuperados los datos deseados, podemos ordenarlos mediante sort u orderBy (son operaciones totalmente equivalentes):
df.select("ProductID","Revenue").sort("Revenue").show(5)
df.sort("Revenue").show(5)
df.sort("Revenue", ascending=True).show(5)
df.sort(df.Revenue.asc()).show(5)
# Ordenación descendiente
df.sort(df.Revenue.desc()).show(5)
df.sort("Revenue", ascending=False).show(5)
from pyspark.sql.functions import desc
df.sort(desc("Revenue")).show(5)
# Ordenación diferente en cada columna
df.sort(df.Revenue.desc(), df.Units.asc()).show(5)
df.sort(["Revenue","Units"], ascending=[0,1]).show(5)
Por ejemplo, con la última operación obtendríamos:
+---------+----------+---------------+-----+-------+-------+
|ProductID| Date| Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
| 495|1999-03-15|75213 CEDEX 16 | 77|43194.1| France|
| 495|2000-03-01|75391 CEDEX 08 | 18|10395.0| France|
| 464|2003-06-11|75213 CEDEX 16 | 16|10075.8| France|
| 464|2000-08-01|22397 | 17| 9817.5|Germany|
| 495|2000-03-01|06175 CEDEX 2 | 16| 9240.0| France|
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows
Normalmente, tras realizar una ordenación, es habitual quedarse con un subconjunto de los datos. Para ello, podemos utilizar la transformación limit.
Por ejemplo, la siguiente transformación es similar al ejemplo anterior, sólo que ahora al driver únicamente le llegan 5 registros, en vez de traerlos todos y sólo mostrar 5:
df.sort(df.Revenue.desc(), df.Units.asc()).limit(5).show()
Añadiendo filas¶
La única manera de añadir filas a un DataFrame es creando uno nuevo que sea el resultado de unir dos DataFrames que compartan el mismo esquema (mismo nombres de columnas y en el mismo orden). Para ello, utilizaremos la transformación union que realiza la unión por el orden de las columnas:
nuevasVentas = [
(6666, "2022-03-24", "03206", 33, 3333.33, "Spain"),
(6666, "2022-03-25", "03206", 22, 2222.22, "Spain"),
]
# Creamos un nuevo DataFrame con las nuevas ventas
nvDF = spark.createDataFrame(nuevasVentas)
# Unimos los dos DataFrames
dfUpdated = df.union(nvDF)
Trabajando con conjuntos
Considerando dos DataFrames como dos conjuntos, podemos emplear las operaciones union, intersect, intersectAll (mantiene los duplicados), exceptAll (mantiene los duplicados) y subtract .
Cogiendo muestras¶
Si necesitamos recoger un subconjunto de los datos, ya sea para preparar los datos para algún modelo de machine learning como para una muestra aleatoria de los mismos, podemos utilizar las siguientes transformaciones:
-
sample permite obtener una muestra a partir de un porcentaje (no tiene porqué obtener una cantidad exacta). También admite un semilla e indicar si queremos que pueda repetir los datos.
df.count() # 120239 muestra = df.sample(0.10) muestra.count() # 11876 muestraConRepetidos = df.sample(True, 0.10) muestraConRepetidos.count() # 11923 -
randomSplit recupera diferentes DataFrames cuyos tamaños en porcentaje se indican como parámetros (si no suman uno, los parámetros se normalizan):
dfs = df.randomSplit([0.8, 0.2]) dfEntrenamiento = dfs[0] dfPrueba = dfs[1] dfEntrenamiento.count() # 96194 dfPrueba.count() # 24045
Trabajando con datos sucios¶
En cualquier proyecto real, los datos rara vez llegan limpios. Valores nulos, registros incompletos, errores tipográficos o datos fuera de rango son situaciones habituales que hay que resolver antes de analizar o persistir los datos.
Spark ofrece tres estrategias básicas para gestionar este tipo de situaciones: eliminar las filas problemáticas, rellenar los valores que faltan con un valor por defecto, o sustituir valores incorrectos por otros válidos. Vamos a ver cada una de ellas.
Vamos a ver cada uno de estos casos a partir del siguiente dataset:
malasVentas = [
(6666, "2022-03-22", "03206", 33, 3333.33, "Spain"),
(6666, "2022-03-22", None, 33, 3333.33, "Spain"),
(6666, "2022-03-23", "03206", None, 2222.22, "Spain"),
(6666, "2022-03-24", "03206", None, None, "Espain"),
(None, None, None, None, None, None)
]
malDF = spark.createDataFrame(malasVentas, ["ProductID", "Date", "Zip", "Units", "Revenue" , "Country"])
malDF.show()
# +---------+----------+-----+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+----------+-----+-----+-------+-------+
# | 6666|2022-03-22|03206| 33|3333.33| Spain|
# | 6666|2022-03-22| null| 33|3333.33| Spain|
# | 6666|2022-03-23|03206| null|2222.22| Spain|
# | 6666|2022-03-24|03206| null| null| Espain|
# | null| null| null| null| null| null|
# +---------+----------+-----+-----+-------+-------+
Si queremos saber si una columna contiene nulos, podemos hacer un filtrado utilizando el método isNull sobre los campos deseados (también podemos utilizar isNotNull si queremos el caso contrario):
malDF.filter(malDF.Zip.isNull()).show()
# +---------+----------+----+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+----------+----+-----+-------+-------+
# | 6666|2022-03-22|null| 33|3333.33| Spain|
# | null| null|null| null| null| null|
# +---------+----------+----+-----+-------+-------+
Para trabajar con las filas que contengan algún dato nulo, podemos acceder a la propiedad na, la cual devuelve un DataFrameNaFunctions sobre la que podemos indicarle:
-
que la elimine mediante el método
drop/dropna. Puede recibir"any"(borrará las filas que contengan algún nulo) o"all"(borrará las filas que todas sus columnas contengan nulos) y una lista con las columnas a considerar:# Elimina todos los nulos malDF.na.drop().show() # +---------+----------+-----+-----+-------+-------+ # |ProductID| Date| Zip|Units|Revenue|Country| # +---------+----------+-----+-----+-------+-------+ # | 6666|2022-03-22|03206| 33|3333.33| Spain| # +---------+----------+-----+-----+-------+-------+ # Elimina las filas que todas sus columnas son nulas malDF.na.drop("all").show() # +---------+----------+-----+-----+-------+-------+ # |ProductID| Date| Zip|Units|Revenue|Country| # +---------+----------+-----+-----+-------+-------+ # | 6666|2022-03-22|03206| 33|3333.33| Spain| # | 6666|2022-03-22| null| 33|3333.33| Spain| # | 6666|2022-03-23|03206| null|2222.22| Spain| # | 6666|2022-03-24|03206| null| null| Espain| # +---------+----------+-----+-----+-------+-------+ # Elimina las filas que tienen el Zip nulo malDF.na.drop(subset=["Zip"]).show() # +---------+----------+-----+-----+-------+-------+ # |ProductID| Date| Zip|Units|Revenue|Country| # +---------+----------+-----+-----+-------+-------+ # | 6666|2022-03-22|03206| 33|3333.33| Spain| # | 6666|2022-03-23|03206| null|2222.22| Spain| # | 6666|2022-03-24|03206| null| null| Espain| # +---------+----------+-----+-----+-------+-------+También podemos indicar la cantidad de valores no nulos que ha de contener cada fila para no eliminarla mediante el parámetro
thresh:# Elimina las filas que tengan menos de 3 valores rellenados malDF.dropna(thresh = 3) # +---------+----------+-----+-----+-------+-------+ # |ProductID| Date| Zip|Units|Revenue|Country| # +---------+----------+-----+-----+-------+-------+ # | 6666|2022-03-22|03206| 33|3333.33| Spain| # | 6666|2022-03-22| null| 33|3333.33| Spain| # | 6666|2022-03-23|03206| null|2222.22| Spain| # | 6666|2022-03-24|03206| null| null| Espain| # +---------+----------+-----+-----+-------+-------+ -
que la rellene mediante el método
fill/fillna, indicando el valor y si queremos, sobre qué columnas aplicar la modificación:# Rellenamos los zips vacíos por 99999 malDF.na.fill("99999", subset=["Zip"]).show() # malDF.na.fill("99999", ["Zip"]).show() # malDF.fillna({"Zip": "99999"}) # +---------+----------+-----+-----+-------+-------+ # |ProductID| Date| Zip|Units|Revenue|Country| # +---------+----------+-----+-----+-------+-------+ # | 6666|2022-03-22|03206| 33|3333.33| Spain| # | 6666|2022-03-22|99999| 33|3333.33| Spain| # | 6666|2022-03-23|03206| null|2222.22| Spain| # | 6666|2022-03-24|03206| null| null| Espain| # | null| null|99999| null| null| null| # +---------+----------+-----+-----+-------+-------+ -
que la sustituya mediante el método
replace# Cambiamos Espain por Spain malDF.na.replace("Espain", "Spain").show() # +---------+----------+-----+-----+-------+-------+ # |ProductID| Date| Zip|Units|Revenue|Country| # +---------+----------+-----+-----+-------+-------+ # | 6666|2022-03-22|03206| 33|3333.33| Spain| # | 6666|2022-03-22| null| 33|3333.33| Spain| # | 6666|2022-03-23|03206| null|2222.22| Spain| # | 6666|2022-03-24|03206| null| null| Spain| # | null| null| null| null| null| null| # +---------+----------+-----+-----+-------+-------+
na.replace vs replace
La función replace se puede emplear dentro de la propiedad na o a partir del propio Dataframe.
Otro caso muy común es realizar una operación sobre una columna para transformar su valor, por ejemplo, pasar todo el texto a minúsculas o dividir una columna entre 100 para cambiar la escala.
En nuestro caso, vamos a modificar las columnas Zip y Country para realizar un trim y borrar los espacios en blanco:
from pyspark.sql.functions import col, trim
df = df.withColumn("Country", trim(col("Country"))).withColumn("Zip", trim(col("Zip")))
Inspeccionando el plan de ejecución
Puedes ver el plan lógico y físico que genera el Catalyst Optimizer para cualquier DataFrame llamando a explain():
df.filter(col("Revenue") > 2000).select("Country", "Revenue").explain(mode="formatted")
El plan lógico refleja las operaciones tal como las has escrito; el plan físico muestra lo que Spark ejecutará realmente tras aplicar las optimizaciones (reordenación de filtros, predicate pushdown, etc.).
Puedes visualizar el mismo DAG gráficamente en la Spark UI.
Usando SQL¶
En la era del big data, SQL es la lengua franca, permitiendo a perfiles con pocos conocimientos de programación trabajar de forma eficiente con los datos (siempre poniendo el foco en la analítica de datos, no en el procesamiento transaccional).
Spark soporta el ANSI SQL 2003, ampliamente establecido en el mundo de las bases de datos.
Para correr SQL en Spark podemos hacerlo a través de:
- El cliente SQL, es cual se ofrece como un comando en
./bin/spark-sql - Mediante un servidor ODBC/JDBC
- De forma programativa mediante aplicaciones Spark.
Las dos primeras opciones se integran con Apache Hive para utilizar su metastore. Ahora nos vamos a centrar en la última.
Spark 4.0: Modo ANSI activado por defecto
A partir de Spark 4.0, la configuración spark.sql.ansi.enabled está a true por defecto. Esto cambia el comportamiento del DataFrame API de forma significativa:
- División por cero: en vez de devolver
null, lanzaSparkArithmeticException. - Desbordamiento numérico: lanza excepción en vez de dar un resultado incorrecto.
- Conversiones de tipo inválidas: se rechazan en vez de coercionarse silenciosamente.
# En Spark 3.x devolvía null; en Spark 4.0 lanza excepción
df.select(col("sueldo") / lit(0)).show()
# SparkArithmeticException: Division by zero
# Para restaurar el comportamiento anterior:
spark.conf.set("spark.sql.ansi.enabled", "false")
Este cambio acerca Spark al comportamiento de bases de datos como PostgreSQL, mejorando la integridad de los datos al no ocultar errores silenciosamente. Es recomendable adaptar el código para manejar estos casos explícitamente en vez de desactivar el modo ANSI.
Vistas temporales¶
Ya hemos visto que los DataFrames tienen una estructura similar a una tabla de una base de datos relacional. Para poder realizar consultas, necesitaremos crear vistas temporales mediante el método createTempView o createOrReplaceTempView para posteriormente realizar una consulta sobre la vista creada a través de spark.sql:
# 1. definimos la vista
df.createOrReplaceTempView("ventas")
# 2. realizamos la consulta
ventasCanada = spark.sql("select * from ventas where trim(Country)='Canada'")
ventasCanada.show(3)
# +---------+---------+---------------+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+---------+---------------+-----+-------+-------+
# | 725|1/15/1999|H1B | 1| 115.4|Canada |
# | 2235|1/15/1999|H1B | 2| 131.1|Canada |
# | 713|1/15/1999|H1B | 1| 160.1|Canada |
# +---------+---------+---------------+-----+-------+-------+
# only showing top 3 rows
Spark 4.0: Sintaxis PIPE |> en SQL
Spark 4.0 introduce la sintaxis PIPE, que permite encadenar
operaciones SQL de forma secuencial usando el operador |>,
de manera similar a como se encadenan transformaciones en el DataFrame API:
spark.sql("""
FROM clientes
|> WHERE ciudad = 'Elche'
|> SELECT nombre, sueldo
|> ORDER BY sueldo DESC
""").show()
En vez del orden tradicional SELECT ... FROM ... WHERE ..., la sintaxis PIPE permite expresar la lógica en el orden natural de los pasos: primero la tabla, luego el filtro, luego la proyección.
Cada paso |> recibe el resultado del anterior, mejorando la
legibilidad en consultas complejas.
Vistas globales¶
Las vistas temporales tienen un alcance de SparkSession, de manera que desaparecen una vez finalice la sesión que ha creado la vista. Si necesitamos tener una vista que se comparta entre todas las sesiones y que permanezca viva hasta que la aplicación Spark finalice, podemos crear una vista temporal global mediante createOrReplaceGlobalTempView
Estas vistas se almacenan en la base de datos global_temp y en las consultas es necesario poner el prefijo global_temp para acceder a sus vistas.
# 1. definimos la vista global
df.createOrReplaceGlobalTempView("ventasg")
# 2. realizamos la consulta
ventasCanadaG = spark.sql("select * from global_temp.ventasg where trim(Country)='Canada'")
ventasCanadaG.show(3)
# +---------+---------+---------------+-----+-------+-------+
# |ProductID| Date| Zip|Units|Revenue|Country|
# +---------+---------+---------------+-----+-------+-------+
# | 725|1/15/1999|H1B | 1| 115.4|Canada |
# | 2235|1/15/1999|H1B | 2| 131.1|Canada |
# | 713|1/15/1999|H1B | 1| 160.1|Canada |
# +---------+---------+---------------+-----+-------+-------+
# only showing top 3 rows
# Creamos otra sesión y vemos como funciona
spark.newSession().sql("select count(*) from global_temp.ventasg where trim(Country)='Canada'").show()
Eliminando vistas¶
Para borrar una vista que hayamos creado, necesitamos acceder al Spark Catalog que veremos en una sesión posterior, y utilizar el método dropTempView o dropGlobalTempView
dependiendo del tipo de vista:
spark.catalog.dropTempView("ventas")
spark.catalog.dropGlobalTempView("ventasg")
VARIANT y SQL
Dentro de SQL, podemos emplear todas las operaciones relacionadas con el tipo VARIANT que hemos visto anteriormente, como variant_get, try_variant_get, schema_of_variant, etc.
Además, Spark SQL ofrece la sintaxis :campo como atajo de variant_get:
from pyspark.sql.functions import parse_json, try_variant_get, schema_of_variant
datos_json = [
(1, '{"nombre": "Aitor", "ciudad": "Elche", "edad": 48}'),
(2, '{"nombre": "Laura", "ciudad": "Alicante", "edad": 33}'),
(3, '{"nombre": "Pedro", "tags": ["baloncesto", "natación"]}'),
]
df = spark.createDataFrame(datos_json, ["id", "json_str"])
df_v = df.withColumn("datos", parse_json("json_str")).drop("json_str")
# Forma explícita con try_variant_get
spark.sql("""
SELECT
id,
try_variant_get(datos, '$.nombre', 'string') AS nombre,
try_variant_get(datos, '$.ciudad', 'string') AS ciudad,
try_variant_get(datos, '$.edad', 'int') AS edad,
schema_of_variant(datos) AS esquema
FROM personas
""").show(truncate=False)
# Forma abreviada con :campo::tipo — resultado idéntico
spark.sql("""
SELECT
id,
datos:nombre::string AS nombre,
datos:ciudad::string AS ciudad,
datos:edad::int AS edad,
datos:tags[0]::string AS primer_tag
FROM personas
""").show(truncate=False)
# +---+------+--------+----+----------+
# | id|nombre| ciudad|edad|primer_tag|
# +---+------+--------+----+----------+
# | 1| Aitor| Elche| 48| null|
# | 2| Laura|Alicante| 33| null|
# | 3| Pedro| null|null|baloncesto|
# +---+------+--------+----+----------+
La notación :campo::tipo es equivalente a try_variant_get: devuelve null si la ruta no existe o el cast falla, sin lanzar excepción. Admite rutas anidadas (:objeto:subcampo) y acceso a arrays por índice ([0]).
Trabajando con Databricks¶
Databricks es una plataforma en la nube construida sobre Spark que facilita el trabajo con notebooks y clústeres gestionados. En esta sección vamos a ver cómo cargar datos y trabajar con DataFrames y SQL directamente desde su interfaz, aprovechando además sus capacidades de visualización integrada.
Para ello podemos realizarlo de dos maneras:
- Cargar un fichero dentro de un workspace de Databricks y tras leerlo crear una vista temporal.
- Cargar el fichero como una tabla dentro del catálogo de Databricks y trabajar directamente con ella.
Para ambos casos, vamos a utilizar el mismo fichero de ventas que hemos estado utilizando a lo largo de la sesión (pdi_sales_small.csv).
Cargando datos desde un fichero¶
Tal como vimos al trabajar con Dataframes en la sesión anterior, tras crear un volumen, podemos subir un fichero a dicho volumen y posteriormente leerlo mediante spark.read para crear un DataFrame con su contenido (en el siguiente ejemplo hemos subido el fichero a un volumen llamado data dentro de nuestra area de trabajo por defecto workspace.default):
df = spark.read.option("sep",";") \
.option("header","true") \
.option("inferSchema","true") \
.csv("/Volumes/workspace/default/data/pdi_sales_small.csv")
df.createOrReplaceTempView("ventitas")
Cargando datos como una tabla¶
Una vez hemos entrado a Databricks, vamos a cargar los datos mediante la opción Data Engineering → Data Ingestion → Create or modify table, subiendo el archivo pdi_sales_small.csv y poniendo como nombre de la tabla, por ejemplo, ventas. Tras cargarlo, nos aparecerá una visualización de los datos donde veremos que no lo ha hecho correctamente, ya que, en nuestro caso, debemos indicar que el delimitador es ;. Por ello, si pulsamos sobre Advanced attributes podremos configurarlo y crear la tabla:
Una vez creada, se cargará automática la tabla en el catálogo de Databricks y podremos consultar sus datos desde la pestaña Sample Data, o bien crear una consulta SQL sobre la tabla mediante el botón azul Create → Query o un cuaderno Jupyter mediante Create → Notebook donde podemos trabajar con ella:
Como se puede observar, podemos realizar consultas SQL directamente sobre la tabla creada, ya sea accediendo a la ruta completa dentro del area de trabajo, o por el nombre de la tabla, mediante una celda de tipo SQL
%sql
-- SELECT * FROM workspace.default.ventas limit 10;
SELECT * FROM ventas limit 10;
O acceder mediante Python a la tabla mediante spark.table(nombreTabla), lo cual nos devolverá un DataFrame de Spark:
df = spark.table("ventas")
df.show(10)
Sobre este DataFrame ya podemos realizar las operaciones que hemos visto a lo largo de la sesión, como limpiar los países para eliminar los espacios en blanco:
from pyspark.sql.functions import trim
df = df.withColumn("Country", trim(df.Country))
df.select("Country").distinct().show()
Datos visuales¶
Si en cualquier celda, pulsamos sobre el el botón de Python, podremos cambiar el tipo de celda a SQL, y viceversa, lo cual nos permitirá trabajar con ambos lenguajes en el mismo cuaderno. Sobre las celdas SQL, donde la primera línea comienza con %sql, podemos introducir directamente código SQL:
%sql
SELECT trim(Country), count(*) FROM ventas GROUP BY trim(Country);
Que tras ejecutarlo, nos mostrará el resultado en formato de tabla:
Además, si pulsamos sobre el icono del + al lado de la tabla de resultados generada y posteriormente sobre Visualization, podemos crear otra representación de los datos, pudiendo visualizar los datos en modo gráfico, pudiendo elegir entre diferentes tipos de gráficos y configurando sus ejes, leyendas, etc.
Cuadro de mandos¶
Además, con las tablas y/o gráficos que generamos dentro de Databricks, podemos generar un sencillo cuadro de mandos.
Si hemos trabajado con una vista temporal de los datos, podremos crear dashboards asociados al cuaderno de Jupyter, pero si en cambio, hemos utilizado tablas del catálogo de Databricks, podremos crear un cuadro de mandos global para todo el workspace.
Veamos mediante un ejemplo cómo crear un cuadro de mandos con los datos que hemos cargado. Para ello, vamos a crear un par de consultas, una para obtener los ingresos por país (como puedes observar, en este consulta no hacemos ninguna agregación, ya que nos vamos a basar en la visualización para mostrar los ingresos por país, y no en la tabla resultante):
%sql
select trim(Country), Date, Zip, Units, Revenue from ventas
Y otra para los ingresos totales, donde sí que realizamos el cálculo mediante una agregación:
%sql
select floor(sum(Units * Revenue),2) as IngresosTotales from ventas
Si pulsamos sobre el icono del gráfico de barras creado anteriormente, podemos pulsar sobre la opción de Add to notebook dashboard si queremos asignarlo a un cuadro de mandos local al cuaderno, o mediante Add to dashboard para un cuadro de mandos global de nuestro workspace. En nuestro caso, vamos a utilizar Add to dashboard , y crearemos un nuevo cuadro de mandos que llamaremos, por ejemplo, Ventas. A continuación, sobre las diferentes visualizaciones que hemos creado, pulsamos sobre el mismo icono y las añadimos al mismo cuadro de mandos Ventas:
FAQ¶
A continuación se recogen preguntas habituales sobre los conceptos de esta sesión que suelen realizarse en entrevistas de trabajo para puestos de ingeniería o ciencia de datos.
Despliega cada pregunta para ver una respuesta orientativa; no hay una única respuesta correcta, pero sí aspectos clave que conviene mencionar.
¿Cuáles son las diferencias entre RDD, DataFrame y Dataset?
| RDD | DataFrame | Dataset | |
|---|---|---|---|
| Abstracción | Bajo nivel | Alto nivel | Alto nivel |
| Esquema | No | Sí | Sí |
| Tipado estático | Sí (genérico) | No | Sí (Scala/Java) |
| Optimización | Manual | Catalyst + Tungsten | Catalyst + Tungsten |
| API Python | Sí | Sí | No disponible (= DataFrame) |
| Recomendado | Casos de bajo nivel | Uso general | Scala/Java con tipos fuertes |
En PySpark, DataFrame y Dataset son equivalentes (Dataset[Row]). Se recomienda siempre usar DataFrames salvo que se necesite acceso de bajo nivel.
¿Qué significa que los DataFrames sean inmutables y qué implicaciones tiene?
Ninguna transformación modifica el DataFrame original: siempre se devuelve uno nuevo. Esto tiene dos consecuencias prácticas:
- Encadenamiento obligatorio: hay que asignar el resultado o encadenar transformaciones (
df = df.withColumn(...).filter(...)), porque llamar al método sin más no alteradf. - Seguridad en paralelismo: al no haber estado mutable compartido, Spark puede replanificar, reordenar o reejecutar transformaciones sin efectos laterales, lo que es clave para tolerancia a fallos y optimización.
Es el mismo principio que siguen los RDD y encaja con el modelo funcional sobre el que se construye Spark.
¿Por qué es mejor definir el esquema explícitamente en lugar de inferirlo?
La inferencia de esquema requiere que Spark lea los datos una primera vez para determinar los tipos, duplicando el tiempo de lectura en ficheros grandes. Además puede dar resultados incorrectos (por ejemplo, inferir un campo numérico como string si los primeros valores son nulos). Definir el esquema con StructType es más rápido, predecible y evita errores en producción.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("nombre", StringType(), nullable=True)
])
df = spark.read.schema(schema).csv("datos.csv")
¿Cuándo usarías el tipo VARIANT frente a un StructType tradicional?
VARIANT está pensado para datos semiestructurados con esquema variable, evolutivo o desconocido: payloads de APIs externas, eventos heterogéneos, exploración de un data lake raw, etc. Ofrece acceso binario optimizado a
campos concretos sin parsear todo el documento, con un rendimiento hasta 8 veces superior al JSON almacenado como string.
Si el esquema es estable y conocido, un StructType bien definido sigue siendo más eficiente y más seguro (validación de tipos en tiempo de lectura). La regla práctica: VARIANT para flexibilidad, StructType para rendimiento y contratos estables.
¿Qué es el predicate pushdown y cómo se aprovecha con Parquet?
El predicate pushdown es una optimización del Catalyst Optimizer por la que los filtros (filter/where) se aplican lo más cerca posible de la fuente de datos, antes de cargar los datos en memoria.
Con el formato Parquet, que almacena estadísticas por row group (valores mínimo y máximo de cada columna), Spark puede descartar enteros bloques de datos sin siquiera leerlos si no satisfacen el filtro. Esto reduce drásticamente el volumen de datos leído del almacenamiento.
¿Cuál es la diferencia entre col() y expr() para referenciar columnas?
col("nombre") devuelve una referencia a una columna por nombre, lo más habitual para operaciones simples. expr("precio * cantidad") permite escribir expresiones SQL arbitrarias como cadena de texto, incluyendo funciones, operadores y alias.
Son equivalentes en rendimiento ya que ambas pasan por el Catalyst Optimizer, pero expr() resulta más cómodo para expresiones complejas.
¿Cuál es la diferencia entre filter() y where()?
Son exactamente equivalentes: where() es un alias de filter() pensado para que el código resulte más legible a quienes vienen de SQL. Ambos aceptan tanto una expresión de columna como una cadena SQL:
df.filter(col("sueldo") > 4000)
df.where("sueldo > 4000") # idéntico resultado
¿Cuál es la diferencia entre select() y withColumn()?
select() proyecta el DataFrame devolviendo solo las columnas indicadas; si no incluyes las columnas existentes, las pierdes. withColumn() añade o reemplaza ^^una única columna ^^ manteniendo el resto intactas. Como regla práctica: usa withColumn() para transformar o añadir una columna puntual, y select() cuando quieras reestructurar el esquema completo o calcular varias columnas a la vez.
¿Cómo se gestionan los valores nulos en Spark?
Spark representa los nulos con null y ofrece tres mecanismos principales:
dropna(thresh=N)— elimina las filas que tengan menos de N valores no nulos.fillna(valor)— sustituye los nulos por un valor fijo (puede ser un diccionario{columna: valor}).replace(valor_original, valor_nuevo)— reemplaza valores concretos en todo el DataFrame.
En operaciones de agregación, Spark ignora los nulos por defecto (igual que SQL). Para detectarlos se usa col("campo").isNull() o col("campo").isNotNull().
¿Cómo se inspecciona el plan de ejecución de un DataFrame?
Mediante explain(mode="formatted"), que muestra el plan lógico (lo que el código describe) y el plan físico (lo que Catalyst ejecutará realmente tras aplicar sus optimizaciones):
df.filter(col("sueldo") > 4000).groupBy("ciudad").count().explain(mode="formatted")
El plan lógico y el físico pueden diferir si Catalyst reordena filtros, aplica predicate pushdown o elimina operaciones redundantes. El mismo DAG se puede visualizar gráficamente en la Spark UI.
¿Qué diferencia hay entre una vista temporal y una vista global?
Una vista temporal (createOrReplaceTempView) solo es accesible desde la SparkSession actual y desaparece cuando esta termina. Una vista global (createOrReplaceGlobalTempView) se almacena en la base de datos virtual global_temp y es accesible desde cualquier SparkSession dentro de la misma aplicación Spark. Se consulta prefijando el nombre con global_temp.:
spark.sql("SELECT * FROM global_temp.mi_vista")
Referencias¶
- Documentación oficial sobre Spark SQL, DataFrames and Datasets Guide
- Beginning Apache Spark 3: With DataFrame, Spark SQL, Structured Streaming, and Spark Machine Learning Library
- Spark by Examples
- Spark SQL Cheatsheet en PDF y en formato web
Actividades¶
(RASBD.1 / CESBD.1b) En las siguientes actividades vamos a familiarizarnos con el uso del API de DataFrames de Spark.
-
(1p) A partir del archivo
movies.tsv, crea una esquema de forma declarativa con los campos:interpretede tipostringpeliculade tipostringanyode tipoint
Cada fila del fichero implica que el actor/actriz ha trabajado en dicha película en el año indicado.
A continuación, se pide:
- Recupera el nombre de las columnas.
- Persiste el dataframe en formato Parquet en
movies.parquet. - Crea un nuevo dataframe con una muestra de los datos
-
(1p) Nos han encargado analizar los eventos de una plataforma de streaming musical, almacenados en el fichero
eventos.jsonl, el cual está en formato JSONL (un documento JSON por línea). Cada evento tiene una estructura diferente según su tipo (play,like,search,error), lo que los convierte en un caso de uso ideal paraVARIANT. Así pues, se pide:-
Carga el fichero
eventos.jsonlen un DataFrame, y añade una nueva columnaevento_vde tipoVARIANTa partir de la columnaevento. A continuación, muestra el esquema resultante y los tres primeros registros. -
Extrae los campos
usuarioycancionde la columnaevento_ven nuevas columnas de tipostring. Para los eventos que no tengan el campocancion(comosearchoerror), el resultado debe sernullsin lanzar ninguna excepción. -
Obtén el esquema combinado de todos los eventos de tipo
"play". Observa qué campos aparecen y de qué tipo los infiere Spark. -
Filtra los eventos de tipo
playy crea un DataFrame con las columnasid,usuario,cancion,artista,duracion_sycompletada. Calcula además el porcentaje escuchado de cada canción: si el camposegundos_escuchadosno existe en el evento, considera que se escuchó de forma completa (100%).
-
-
(1p) A partir del archivo nombres.json, crea un DataFrame y realiza las siguientes operaciones mediante el API de DataFrames:
- Crea una nueva columna (columna
Mayor30) que indique si la persona es mayor de 30 años. - Crea una nueva columna (columna
FaltanJubilacion) que calcule cuantos años le faltan para jubilarse (supongamos que se jubila a los 67 años) - Crea una nueva columna (columna
Apellidos) que contengaXYZ(puedes utilizar la funciónlit) - Elimina las columna
Mayor30yApellidos. - Crea una nueva columna (columna
AnyoNac) con el año de nacimiento de cada persona (puedes utilizar la funcióncurrent_date). - Añade un id incremental para cada fila (campo
Id) y haz que al hacer unshowse vea en primer lugar (puedes utilizar la funciónmonotonically_increasing_id) seguidos delNombre,Edad,AnyoNac,FaltaJubilacionyCiudad.
Al realizar los seis pasos, el resultado del DataFrame será similar a :
+---+-------+----+-------+----------------+--------+ | Id|Nombre |Edad|AnyoNac|FaltanJubilacion| Ciudad| +---+-------+----+-------+----------------+--------+ | 0| Aitor| 45| 1977| 22| Elche| | 1| Marina| 14| 2008| 53|Alicante| | 2| Laura| 19| 2003| 48| Elche| | 3| Sonia| 45| 1977| 22| Aspe| | 4| Pedro|null| null| null| Elche| +---+-------+----+-------+----------------+--------+- Resuelve los apartados anteriores usando Spark SQL: crea una vista
temporal sobre el DataFrame y reescribe al menos tres de las consultas
anteriores con
spark.sql(...).
- Crea una nueva columna (columna
-
(1p) A partir del archivo
movies.parquetcreado en la primera actividad, mediante el DataFrame API:- Recupera cuantas películas hay almacenadas.
- Muestra las películas del año 2011.
- Muestra las películas en las que ha trabajado
Murphy, Eddie (I). - Muestra los intérpretes que trabajan en la película
Superman. - Muestra los intérpretes que trabajan en películas que contienen la palabra
Scream. - Muestra los intérpretes que aparecen tanto en
Supermancomo enSuperman II.
-
(opcional) A partir del archivo
VentasNulos.csv:-
Elimina las filas que tengan al menos 4 nulos.
-
Con las filas restantes, sustituye:
- Los nombres nulos por
Empleado -
Las ventas nulas por la media de las ventas de los compañeros (redondeado a entero).
Agrupando
Para obtener la media, aunque lo veremos en la próxima sesión, debes agrupar y luego obtener la media de la columna:
valor = df.groupBy().avg('Ventas') -
Los euros nulos por el valor del compañero que menos € ha ganado. (tras agrupar, puedes usar la función
min) - La ciudad nula por
C.V.y el identificador nulo porXYZ
Para los pasos ii) y iii) puedes crear un DataFrame que obtenga el valor a asignar y luego pasarlo como parámetro al método para rellenar los nulos.
- Los nombres nulos por
-