Changelog:
- UDF con Spark 4.0 — Abril 2026
- Refactor de Pandas y Spark debido a Spark 4.0 — Abril 2026
- Añadimos FAQ — Abril 2026
Uso avanzado de Spark DataFrames¶
Esta sesión y Spark 4
Los ejemplos de esta sesión son compatibles con Spark 3.x y Spark 4.x. Los apartados de JSON y UDF incluyen novedades específicas de Spark 4 (tipo VARIANT, UDFs optimizadas con Arrow y UDFs SQL).
Además, desde Spark 4.0, el modo ANSI SQL está activo por defecto (spark.sql.ansi.enabled=true). Esto implica que operaciones como divisiones por cero, cast inválidos o overflow en sumas/multiplicaciones lanzan una excepción en lugar de devolver null o un valor silenciosamente incorrecto. Para restaurar el comportamiento anterior puedes desactivarlo
temporalmente:
spark.conf.set("spark.sql.ansi.enabled", False)
Agregaciones¶
Una vez que tenemos un DataFrame, podemos realizar analítica de datos sobre el dataset entero, o sobre una o más columnas y aplicar una función de agregación que permita sumar, contar o calcular la media de cualquier grupo, entre otras opciones.
Para ello, PySpark ofrece un amplio conjunto de funciones. En nuestro caso, vamos a realizar algunos ejemplos para practicar con las funciones más empleadas.
Contando¶
count: Devuelve la cantidad de elementos no nulos:
from pyspark.sql.functions import count
df.select(count("Country")).show()
# +--------------+
# |count(Country)|
# +--------------+
# | 120239|
# +--------------+
count_distinct: Devuelve la cantidad de elementos no nulos diferentes:
from pyspark.sql.functions import count_distinct
df.select(count_distinct("Country"), count_distinct("Zip")).show()
# +-----------------------+-------------------+
# |count(DISTINCT Country)|count(DISTINCT Zip)|
# +-----------------------+-------------------+
# | 4| 2585|
# +-----------------------+-------------------+
approx_count_distinct: Devuelve aproximadamente la cantidad de elementos no nulos diferentes (puede recibir un segundo parámetro la máximo desviación estándar admitida). Este método es mucho más rápido que contar exactamente el número de resultado, y para datasets muy grandes, en ocasiones puede ser útil:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("Country"), approx_count_distinct("Zip")).show()
# +------------------------------+--------------------------+
# |approx_count_distinct(Country)|approx_count_distinct(Zip)|
# +------------------------------+--------------------------+
# | 4| 2737|
# +------------------------------+--------------------------+
Calculando¶
min y max permiten obtener el menor y el mayor valor respectivamente:
from pyspark.sql.functions import min, max
df.select(min("Units"), max("Units")).show()
# +----------+----------+
# |min(Units)|max(Units)|
# +----------+----------+
# | 1| 77|
# +----------+----------+
sum permite sumar todos los valores de una columna:
from pyspark.sql.functions import sum
df.select(sum("Units"), sum("Revenue")).show()
# +----------+--------------------+
# |sum(Units)| sum(Revenue)|
# +----------+--------------------+
# | 125728|5.0107274999986745E7|
# +----------+--------------------+
sum_distinct suma los valores diferentes de una columna:
from pyspark.sql.functions import sum_distinct
df.select(sum_distinct("Units"), sum_distinct("Revenue")).show()
# +-------------------+---------------------+
# |sum(DISTINCT Units)|sum(DISTINCT Revenue)|
# +-------------------+---------------------+
# | 308| 1189127.0999999985|
# +-------------------+---------------------+
avg calcula la media aritmética:
from pyspark.sql.functions import sum, count, avg
df.select(avg("Revenue"), sum("Revenue")/count("Revenue")).show()
# +-----------------+-------------------------------+
# | avg(Revenue)|(sum(Revenue) / count(Revenue))|
# +-----------------+-------------------------------+
# |416.7306364822291| 416.7306364822291|
# +-----------------+-------------------------------+
Asimetría, varianza y desviación estándar
Si nos interesa obtener información estadística sobre los datos, también disponemos de las funciones skewness, kurtosis, variance, var_pop, stddev y stddev_pop.
Agrupando¶
Si agrupamos varias columnas de tipo categóricas (con una cardinalidad baja), podemos realizar cálculos sobre el resto de columnas.
Sobre un DataFrame, podemos agrupar los datos por la columna que queramos utilizando el método groupBy, el cual nos devuelve un GroupedData, sobre el que posteriormente realizar operaciones como avg(cols), count(), mean(cols), min(cols), max(cols) o sum(cols):
from pyspark.sql.functions import sum
df.groupBy("Country").count().show()
# +-------+-----+
# |Country|count|
# +-------+-----+
# |Germany|30059|
# | France|30060|
# | Mexico|30060|
# | Canada|30060|
# +-------+-----+
df.groupBy("Country").sum("Revenue").show()
# +-------+--------------------+
# |Country| sum(Revenue)|
# +-------+--------------------+
# |Germany|1.4982119999999512E7|
# | France|1.2087942100000832E7|
# | Mexico| 1.139459870000116E7|
# | Canada|1.1642614200001905E7|
# +-------+--------------------+
Si necesitamos realizar más de un agregación sobre el mismo grupo, mediante agg podemos indicar una o más expresiones de columnas:
from pyspark.sql.functions import sum, count
df.groupBy("Country").agg(sum("Revenue"), count("Revenue")).show()
# +-------+--------------------+--------------+
# |Country| sum(Revenue)|count(Revenue)|
# +-------+--------------------+--------------+
# |Germany|1.4982119999999512E7| 30059|
# | France|1.2087942100000832E7| 30060|
# | Mexico| 1.139459870000116E7| 30060|
# | Canada|1.1642614200001905E7| 30060|
# +-------+--------------------+--------------+
También podemos indicar los elementos a calcular mediante un diccionario donde las claves son los campos y los valores la función a calcular:
df.groupBy("Country").agg({"Zip":"count", "Revenue":"avg"}).show()
# +-------+----------+------------------+
# |Country|count(Zip)| avg(Revenue)|
# +-------+----------+------------------+
# |Germany| 30059| 498.4237665923521|
# | France| 30060| 402.1271490352905|
# | Mexico| 30060| 379.0618330007039|
# | Canada| 30060|387.31251497012323|
# +-------+----------+------------------+
Agrupando colecciones¶
En ocasiones necesitamos agrupar en una colección todos los valores para un grupo en particular. Para ello, podemos usar collect_list (con repetidos) o collect_set (sin repeticiones):
Por ejemplo, para cada país, vamos a recuperar un listado con los códigos postales de aquellos pedidos que hayan superado las 5 unidades:
from pyspark.sql.functions import collect_list, collect_set
df.where(df.Units > 5).groupBy("Country").agg(collect_list("Zip"), collect_set("Zip")).show()
# +-------+--------------------+--------------------+
# |Country| collect_list(Zip)| collect_set(Zip)|
# +-------+--------------------+--------------------+
# |Germany|[22397, 22111, 40...|[22111, 12589, 22...|
# | France|[75213 CEDEX 16, ...|[06082 CEDEX 1, 0...|
# | Mexico|[7100, 7810, 9739...|[9739, 10300, 781...|
# | Canada|[T2X, V6G, V6G, T6V]| [V6G, T2X, T6V]|
# +-------+--------------------+--------------------+
Tablas pivote¶
Las tablas pivote permiten obtener un resumen de los datos a partir de columnas categóricas sobre la que realizar cálculos, tal como se hace en las hojas de cálculo con las tablas dinámicas.
Por ejemplo, vamos a obtener la cantidad recaudada por las ventas de cada año por cada país:
df.groupBy(year("Date")).pivot("Country").sum("Revenue").show()
# Más eficiente: evita un job extra que calcula los valores distintos
# df.groupBy(year("Date")).pivot("Country", ["Canada", "France", "Germany", "Mexico"]).sum("Revenue").show()
# +----------+------------------+------------------+------------------+------------------+
# |year(Date)| Canada| France| Germany| Mexico|
# +----------+------------------+------------------+------------------+------------------+
# | 2003| 2360085.999999947|1105230.9000000046|1407120.0000000007| 1049457.5|
# | 2004| 1539140.499999946| null| null| null|
# | 2001| 2193437.799999908| null| null|233419.20000000004|
# | 2000|1806678.3999999042|1108846.8999999764| 4510606.799999941| 4240448.399999928|
# | 1999|1382756.6999999764| 7594921.200000435| 5928459.100000297|3419368.2000001906|
# | 2002|2360514.7999998857| 2278943.099999957| 3135934.099999964|2451905.3999999263|
# +----------+------------------+------------------+------------------+------------------+
También podemos hacer más de un cálculo sobre la tabla pivote:
df.groupBy(year("Date")).pivot("Country").agg(sum("Revenue").alias("total"), sum("Units").alias("cantidad")).show()
# +----------+------------------+---------------+------------------+---------------+------------------+----------------+------------------+---------------+
# |year(Date)| Canada_total|Canada_cantidad| France_total|France_cantidad| Germany_total|Germany_cantidad| Mexico_total|Mexico_cantidad|
# +----------+------------------+---------------+------------------+---------------+------------------+----------------+------------------+---------------+
# | 2003| 2360085.999999947| 6375|1105230.9000000046| 2794|1407120.0000000007| 3099| 1049457.5| 2510|
# | 2004| 1539140.499999946| 3636| null| null| null| null| null| null|
# | 2001| 2193437.799999908| 5976| null| null| null| null|233419.20000000004| 583|
# | 2000|1806678.3999999042| 5049|1108846.8999999764| 2456| 4510606.799999941| 9738| 4240448.399999928| 11935|
# | 1999|1382756.6999999764| 3964| 7594921.200000435| 20432| 5928459.100000297| 12266|3419368.2000001906| 9895|
# | 2002|2360514.7999998857| 6148| 2278943.099999957| 6057| 3135934.099999964| 6643|2451905.3999999263| 6172|
# +----------+------------------+---------------+------------------+---------------+------------------+----------------+------------------+---------------+
Desde Spark 3.4 disponemos de unpivot (y su alias melt), que realiza la operación inversa: transforma un conjunto de columnas en filas, lo que resulta útil para normalizar datos en formato wide a formato long.
# Partimos del resultado de un pivote
paises = ["Canada", "France", "Germany", "Mexico"]
df_pivot = df.groupBy(year("Date").alias("anyo")).pivot("Country", paises).sum("Revenue")
# Lo revertimos a formato largo
df_pivot.unpivot(
ids=["anyo"],
values=paises,
variableColumnName="Country",
valueColumnName="Revenue"
).show()
# +----+-------+------------------+
# |anyo|Country| Revenue |
# +----+-------+------------------+
# |2003| Canada| 2360085.999999947|
# |2003| France| 1105230.9000000046|
# |2003|Germany| 1407120.0000000007|
# ...
Joins¶
Hasta ahora todo la analítica la hemos realizado sobre un único DataFrame. Aunque si seguimos un proceso ELT es probable que tengamos todos los datos en un único lugar, en ocasiones necesitamos cruzar la información de dos datasets.
Si nos basamos en el planteamiento de una base de datos relacional, para unir dos DataFrames necesitamos unir la clave ajena de uno con la clave primaria del otro.
Para estos ejemplos, vamos a cambiar de datasets y utilizar datos de vuelos de avión que han tenido algún tipo de retraso (departure_delays.csv) y otro con los códigos de los aeropuertos (airport-codes-na.tsv).
Fichero CSV con la coma como separador de campos.
date,delay,distance,origin,destination
01011245,6,602,ABE,ATL
01020600,-8,369,ABE,DTW
01021245,-2,602,ABE,ATL
01020605,-4,602,ABE,ATL
Fichero TSV con el tabulador como separador campos, donde el campo IATA es la clave de cada aeropuerto.
City State Country IATA
Abbotsford BC Canada YXX
Aberdeen SD USA ABR
Abilene TX USA ABI
Akron OH USA CAK
Alamosa CO USA ALS
Albany GA USA ABY
Así pues, lo primero que vamos a hacer es cargar ambos DataFrames:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("s8a-dataframes-joins").getOrCreate()
df_vuelos = spark.read.option("sep",",").option("header", "true").option("inferSchema", "true").csv("departure_delays.csv")
# df_vuelos.printSchema()
# df_vuelos.count() # 1391578
df_aeropuertos = spark.read.option("sep","\t").option("header", "true").option("inferSchema", "true").csv("airport-codes-na.tsv")
# df_aeropuertos.printSchema()
Mediante SQL¶
Si queremos hacer un join mediante SQL, sólo tenemos que emplear la misma sintaxis que con cualquier sistema relacional, de manera que primero crearemos las vistas temporales:
df_vuelos.createOrReplaceTempView("vuelos")
df_aeropuertos.createOrReplaceTempView("aeropuertos")
Y a continuación realizamos la consulta:
df_join = spark.sql("select v.origin, a.city from vuelos v join aeropuertos a on v.origin == a.IATA")
df_join.show(3)
# +------+---------+
# |origin| city|
# +------+---------+
# | ABE|Allentown|
# | ABE|Allentown|
# | ABE|Allentown|
# +------+---------+
# only showing top 3 rows
Si quisiéramos obtener el nombre de los dos aeropuertos, necesitamos realizar dos veces el join:
df_join = spark.sql("select v.*, a.City as originCity, b.City as destinationCity from vuelos v JOIN aeropuertos a on v.origin == a.IATA join aeropuertos b on v.destination = b.IATA")
df_join.show(3)
# +-------+-----+--------+------+-----------+----------+---------------+
# | date|delay|distance|origin|destination|originCity|destinationCity|
# +-------+-----+--------+------+-----------+----------+---------------+
# |1011245| 6| 602| ABE| ATL| Allentown| Atlanta|
# |1020600| -8| 369| ABE| DTW| Allentown| Detroit|
# |1021245| -2| 602| ABE| ATL| Allentown| Atlanta|
# +-------+-----+--------+------+-----------+----------+---------------+
# only showing top 3 rows
SQL Pipe
Si utilizamos la sintaxis pipe, que permite encadenar operaciones siguiendo el flujo lógico de los datos en lugar del orden tradicional SELECT-FROM-WHERE. Nuestro join anterior se puede reescribir así:
FROM vuelos v
|> JOIN aeropuertos a ON v.origin = a.IATA
|> JOIN aeropuertos b ON v.destination = b.IATA
|> SELECT v.*, a.City AS originCity, b.City AS destinationCity
Cada operador |> recibe una tabla y produce otra, lo que facilita la lectura y depuración de consultas complejas. Para agregaciones existe el operador específico |> AGGREGATE ... GROUP BY ...:
FROM vuelos
|> AGGREGATE count(*) AS total, avg(delay) AS retraso_medio
GROUP BY origin
|> ORDER BY total DESC
Si existiera algún vuelo cuyos códigos de aeropuerto no tuviéramos disponible en el dataset de los códigos de aeropuertos, no nos aparecería. Por tanto, sería más conveniente realizar un left join:
df_left_join = spark.sql("select v.*, a.City as originCity, b.City as destinationCity from vuelos v LEFT JOIN aeropuertos a on v.origin == a.IATA LEFT JOIN aeropuertos b on v.destination = b.IATA")
df_left_join.show(3)
df_left_join.count() # 1391578
Todo tipo de joins
Además de los casos vistos, podemos realizar otros tipos de joins como cross, semi, full, outer, etc... Más información en la documentación oficial
Un caso particular que conviene conocer es el left anti join. Este tipo de join permite obtener aquellos registros de la izquierda que no aparecen en la parte derecha, de manera que si seguimos con el ejemplo, podemos recuperar aquellos vuelos cuyos aeropuertos no tenemos en el dataset con los códigos:
df_left_anti_join = spark.sql("select * from vuelos v LEFT ANTI JOIN aeropuertos a ON v.origin == a.IATA ")
df_left_anti_join.count() # 14416
Estrategias de join en Spark
Spark elige automáticamente la estrategia según el tamaño de los DataFrames, pero se puede forzar mediante hints:
| Estrategia | Cuándo | Cómo forzarla |
|---|---|---|
| Broadcast Hash Join | Un lado pequeño (< spark.sql.autoBroadcastJoinThreshold, por defecto 10 MB) |
df.join(broadcast(df_pequeño), ...) |
| Sort Merge Join | Ambos lados grandes; estrategia por defecto | Automático o /*+ MERGE */ |
| Shuffle Hash Join | Un lado moderado que cabe en memoria de un executor | /*+ SHUFFLE_HASH */ |
El Broadcast Join es la optimización más importante: evita el shuffle del lado grande enviando el lado pequeño a todos los nodos.
Mediante Python¶
Si no queremos utilizar SQL o ya tenemos fragmentos de código que interactúan con el DataFrame API, podemos utilizar el método join.
Este método une dos DataFrames, indicando la expresión de unión y opcionalmente el tipo:
expr_join1 = df_vuelos.origin == df_aeropuertos.IATA
df_joinp1 = df_vuelos.join(df_aeropuertos, expr_join1, "inner")
df_joinp1.count() # 1377162
Forma corta
Si las columnas que unen los DataFrames tienen el mismo nombre, podemos simplificar el código indicando únicamente su nombre:
df1.join(df2, "user_id")
Además, si queremos hacer un inner join, podemos no indicarlo ya que es el tipo por defecto.
En vez de pasarle inner, le podemos indicar el tipo de join: left, right, cross, left_anti, etc...
expr_join1 = df_vuelos.origin == df_aeropuertos.IATA
df_left_anti_join = df_vuelos.join(df_aeropuertos, expr_join1, "left_anti")
df_left_anti_join.count() # 14416
Finalmente, como en nuestro caso teníamos dos joins, tanto para los vuelos de origen como los de destino, necesitamos volver a unir:
from pyspark.sql.functions import col
# le indicamos alias a los campos para eliminar ambigüedades
expr_join2 = col("a.destination") == col("b.IATA")
df_joinp2 = (df_joinp1.alias("a")).join((df_aeropuertos.alias("b")), expr_join2, "inner")
df_joinp2.count() # 1361141
Funciones¶
Para dominar realmente Spark, hay que tener destreza en todas las funciones existente para el tratamiento de fechas, cadenas, operaciones matemáticas, para trabajar con colecciones, etc...
Además, siempre podemos crear nuestras propias funciones de usuario para ampliar el lenguaje.
Aunque ya hemos utilizado algunas a lo largo de los apuntes, a continuación vamos a repasar las funciones más empleadas.
Fechas¶
- Si necesitamos convertir de texto a fecha:
to_date,to_timestamp,unix_timestamp - Para formatear las fechas:
date_format,from_unixtime(patrones de fechas) - Para realizar cálculos sobre fechas:
datediff,months_between,last_day,date_add,date_sub,next_day - Extraer un valor de una fecha:
year,month,weekofyear,dayofmonth,dayofyear,hour,minute,second.
Más información en la documentación oficial
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
spark = SparkSession.builder.appName("s8a-dataframes-sql").getOrCreate()
df = spark.read.option("sep",";").option("header", "true").option("inferSchema", "true").csv("pdi_sales_small.csv")
# Cambiamos el tipo de dato a fecha
df = df.withColumn("Date", to_date(df.Date, "M/d/yyyy"))
import pyspark.sql.functions
df.select("Date", date_format("Date", "dd-MM-yyyy"),
next_day("Date", "Sun"), last_day("Date"),
dayofmonth("Date"), dayofyear("Date"),
month("Date"), year("Date")).show(2)
# +----------+----------------------------+-------------------+--------------+----------------+---------------+-----------+----------+
# | Date|date_format(Date, dd-MM-yyyy)|next_day(Date, Sun)|last_day(Date)|dayofmonth(Date)|dayofyear(Date)|month(Date)|year(Date)|
# +----------+----------------------------+-------------------+--------------+----------------+---------------+-----------+----------+
# |1999-01-15| 15-01-1999| 1999-01-17| 1999-01-31| 15| 15| 1| 1999|
# |2002-06-06| 06-06-2002| 2002-06-09| 2002-06-30| 6| 157| 6| 2002|
# +----------+----------------------------+-------------------+--------------+----------------+---------------+-----------+----------+
# only showing top 2 rows
Cadenas¶
Por ejemplo, tenemos las funciones para quitar espacios (ltrim, rtrim, trim) y pasar a mayúsculas/minúsculas (lower, upper):
df.select("Zip", ltrim("Zip").alias("l"), rtrim("Zip").alias("r"),
lower("Zip"), upper("Zip")
).where(trim(df.Country)=="Canada").show(3)
# +---------------+---------------+---+---------------+---------------+
# | Zip| l| r| lower(Zip)| upper(Zip)|
# +---------------+---------------+---+---------------+---------------+
# |H1B |H1B |H1B|h1b |H1B |
# +---------------+---------------+---+---------------+---------------+
# only showing top 1 row
O funciones para poner la inicial en mayúsculas (initcap), darle la vuelta (reverse), obtener su tamaño (length) o reemplazar caracteres (translate):
df.select("Country", initcap("Country"), reverse("Country"),
length("Country"), translate("Country", "na", "pe")
).where(trim(df.Country)=="Canada").show(1)
# +-------+----------------+----------------+---------------+--------------------------+
# |Country|initcap(Country)|reverse(Country)|length(Country)|translate(Country, na, pe)|
# +-------+----------------+----------------+---------------+--------------------------+
# |Canada | Canada | adanaC| 7| Cepede |
# +-------+----------------+----------------+---------------+--------------------------+
# only showing top 1 row
También podemos trabajar con subcadenas (substring), encontrar ocurrencias (locate) o partir una cadena en trozos (split):
df.select("Country", split("Country", "a"), locate("a", "Country"),
substring("Country",3,2)
).where(trim(df.Country)=="Canada").show(1)
+-------+---------------------+---------------------+------------------------+
|Country|split(Country, a, -1)|locate(a, Country, 1)|substring(Country, 3, 2)|
+-------+---------------------+---------------------+------------------------+
|Canada | [C, n, d, ]| 2| na|
+-------+---------------------+---------------------+------------------------+
only showing top 1 row
Otras funciones que se suelen utilizar son concat y concat_ws para unir cadenas, levenshtein para calcular la distancia entre dos cadenas, lpad y rpad para completar con espacios, etc... Si necesitas trabajar con expresiones regulares puedes utilizar regexp_extract para extraer parte de una cadena como regexp_replace para sustituir.
Colecciones¶
Para probar las funciones que trabajan con colecciones, vamos a cambiar de dataset y trabajar con uno compartido por Kaggle con datos de negocios de Yelp que tenemos almacenados en una versión reducida en yelp_academic_dataset_business.json. Los negocios tienen una propiedad denominada categories que contiene un array con las categorías de los mismos:
{
"business_id":"O_X3PGhk3Y5JWVi866qlJg",
"full_address":"1501 W Bell Rd\nPhoenix, AZ 85023",
"hours":{
"Monday":{
"close":"18:00",
"open":"11:00"
},
"Tuesday":{
"close":"18:00",
"open":"11:00"
},
...
},
"open":true,
"categories":[
"Active Life",
"Arts & Entertainment",
"Stadiums & Arenas",
"Horse Racing"
],
"city":"Phoenix",
...
}
El primer paso es cargar el documento y ver el esquema inferido por Spark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("s8a-dataframes-arrays").getOrCreate()
df = spark.read.option("inferSchema", "true").option("multiline",True).json("yelp_academic_dataset_business.json")
df.printSchema()
Como podemos observar, sigue una estructura de elementos anidados:
root
|-- attributes: struct (nullable = true)
| |-- Accepts Credit Cards: boolean (nullable = true)
| |-- Alcohol: string (nullable = true)
| |-- Ambience: struct (nullable = true)
| | |-- casual: boolean (nullable = true)
| | |-- classy: boolean (nullable = true)
| | |-- divey: boolean (nullable = true)
| | |-- hipster: boolean (nullable = true)
| | |-- intimate: boolean (nullable = true)
| | |-- romantic: boolean (nullable = true)
| | |-- touristy: boolean (nullable = true)
| | |-- trendy: boolean (nullable = true)
| | |-- upscale: boolean (nullable = true)
| |-- Attire: string (nullable = true)
...
| |-- Wi-Fi: string (nullable = true)
|-- business_id: string (nullable = true)
|-- categories: array (nullable = true)
| |-- element: string (containsNull = true)
|-- city: string (nullable = true)
|-- full_address: string (nullable = true)
...
Por ejemplo, vamos a ver mediante un ejemplo las siguientes funciones:
size: devuelve el tamaño de la colecciónsort_array: ordena la colecciónarray_contains: comprueba si hay un elemento en la colección
from pyspark.sql.functions import *
df.select("name", "hours.Sunday", size("categories").alias("totalCategorias"),
sort_array("categories").alias("categorias"),
array_contains("categories", "Restaurants").alias("Restaurantes")).show(10, truncate=False)
# +-------------------------------+--------------+---------------+---------------------------------------------------------------------------------+------------+
# |name |Sunday |totalCategorias|categorias |Restaurantes|
# +-------------------------------+--------------+---------------+---------------------------------------------------------------------------------+------------+
# |Turf Paradise Race Course |{18:00, 11:00}|4 |[Active Life, Arts & Entertainment, Horse Racing, Stadiums & Arenas] |false |
# |Sam's Club Members Only |null |5 |[Automotive, Department Stores, Fashion, Shopping, Tires] |false |
# |Forever 21 |{18:00, 11:00}|5 |[Accessories, Fashion, Men's Clothing, Shopping, Women's Clothing] |false |
# |Loving Hands Pet Care |{19:00, 06:00}|3 |[Pet Boarding/Pet Sitting, Pet Services, Pets] |false |
# |Amec Mid-City Animal Hospital |null |2 |[Pets, Veterinarians] |false |
# |Los Armandos Asadero Y Mariscos|{03:00, 20:00}|2 |[Mexican, Restaurants] |true |
# |Clayton Companies |null |4 |[Home Services, Property Management, Real Estate, Real Estate Services] |false |
# |Bertha's Café |null |5 |[Bakeries, Breakfast & Brunch, Food, Restaurants, Sandwiches] |true |
# |Jerry's Artarama |{17:00, 11:00}|4 |[Art Supplies, Arts & Crafts, Framing, Shopping] |false |
# |Shauna Brown Fitness |null |5 |[Active Life, Fitness & Instruction, Health & Medical, Massage Therapy, Trainers]|false |
# +-------------------------------+--------------+---------------+---------------------------------------------------------------------------------+------------+
only showing top 10 rows
Tip
Recuerda que en el apartado Agrupando colecciones vimos como podemos crear colecciones al realizar una agrupación.
Así pues, además del nombre, hemos obtenido el horario de los domingos utilizando la notación . para acceder a los campos anidados, la cantidad de categorías de cada comercio, un listado ordenado con sus categorías y finalmente si es un restaurante.
Otro tipo de operación que podemos realizar es desenrollar una colección mediante la función explode y generar una fila nueva por cada elemento de la colección:
df.select("name", explode("categories")).show(10, truncate=False)
# +-------------------------+--------------------+
# |name |col |
# +-------------------------+--------------------+
# |Turf Paradise Race Course|Active Life |
# |Turf Paradise Race Course|Arts & Entertainment|
# |Turf Paradise Race Course|Stadiums & Arenas |
# |Turf Paradise Race Course|Horse Racing |
# |Sam's Club Members Only |Tires |
# |Sam's Club Members Only |Automotive |
# |Sam's Club Members Only |Fashion |
# |Sam's Club Members Only |Shopping |
# |Sam's Club Members Only |Department Stores |
# |Forever 21 |Women's Clothing |
# +-------------------------+--------------------+
# only showing top 10 rows
Funciones de orden superior¶
Además de las funciones básicas sobre colecciones, Spark permite aplicar higher-order functions que reciben una expresión lambda y operan sobre cada elemento del array sin necesidad de hacer explode. Las más útiles son filter, transform, exists, forall y aggregate.
Veamos un ejemplo de cada una de ellas:
from pyspark.sql.functions import filter, transform, exists, upper
df.select(
"name",
# Sólo categorías que empiezan por "R"
filter("categories", lambda c: c.startswith("R")).alias("categorias_R"),
# Todas las categorías en mayúsculas
transform("categories", lambda c: upper(c)).alias("cat_upper"),
# ¿Tiene al menos una categoría que contiene "Food"?
exists("categories", lambda c: c.contains("Food")).alias("tiene_food")
).show(3, truncate=False)
La ventaja frente a explode es que no multiplicamos filas, ya que la operación se mantiene a nivel de array dentro de la misma fila.
JSON¶
Es común que los datos que leemos desde un sistema externo estén en formato JSON pero que el proceso de ingesta los haya almacenado como una cadena de texto (por ejemplo, cuando provienen de un campo payload de Kafka o de una columna de una tabla relacional). En esos casos, necesitamos parsear la
cadena para poder acceder a sus campos.
Supongamos que partimos de un conjunto de tareas donde cada registro tiene un identificador y una cadena JSON con las tareas que realizamos cada día:
tareas = [
(1, '{"dia": "Lunes", "tareas": ["Corregir ejercicios", "Ir a nadar", "Comprar pan"]}'),
(2, '{"dia": "Martes", "tareas": ["Preparar clase", "Revisar correos"]}'),
(3, '{"dia": "Miércoles", "tareas": ["Reunión departamento", "Tutorías"]}'),
]
tareasDF = spark.createDataFrame(tareas, ["id", "json_str"])
tareasDF.printSchema()
# root
# |-- id: long (nullable = true)
# |-- json_str: string (nullable = true)
tareasDF.show(truncate=False)
# +---+--------------------------------------------------------------------------------+
# |id |json_str |
# +---+--------------------------------------------------------------------------------+
# |1 |{"dia": "Lunes", "tareas": ["Corregir ejercicios", "Ir a nadar", "Comprar pan"]}|
# |2 |{"dia": "Martes", "tareas": ["Preparar clase", "Revisar correos"]} |
# |3 |{"dia": "Miércoles", "tareas": ["Reunión departamento", "Tutorías"]} |
# +---+--------------------------------------------------------------------------------+
Para convertir la cadena en una estructura consultable, definimos un esquema que describa cómo es el documento JSON:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import from_json, col
esquemaTareas = StructType([
StructField("dia", StringType(), False),
StructField("tareas", ArrayType(StringType(), False), False)
])
Y a continuación ya podemos transformar el formato mediante la función from_json:
tareasParsedDF = tareasDF.select(
"id",
from_json("json_str", esquemaTareas).alias("datos")
)
tareasParsedDF.printSchema()
# root
# |-- id: long (nullable = true)
# |-- datos: struct (nullable = true)
# | |-- dia: string (nullable = true)
# | |-- tareas: array (nullable = true)
# | | |-- element: string (containsNull = true)
Una vez parseado, accedemos a los campos con la notación punto o con
getItem
para indexar arrays:
tareasParsedDF.select(
"id",
col("datos.dia").alias("dia"),
col("datos.tareas").alias("tareas"),
col("datos.tareas").getItem(0).alias("primera_tarea")
).show(truncate=False)
# +---+---------+----------------------------------------------+-------------------+
# |id |dia |tareas |primera_tarea |
# +---+---------+----------------------------------------------+-------------------+
# |1 |Lunes |[Corregir ejercicios, Ir a nadar, Comprar pan]|Corregir ejercicios|
# |2 |Martes |[Preparar clase, Revisar correos] |Preparar clase |
# |3 |Miércoles|[Reunión departamento, Tutorías] |Reunión departamento|
# +---+---------+----------------------------------------------+-------------------+
Para terminar, si necesitamos la operación inversa, y lo que queremos es crear una representación JSON de una columna, podemos utilizar la función to_json:
from pyspark.sql.functions import to_json
tareasParsedDF.select("id", to_json("datos").alias("json_out")).show(truncate=False)
# +---+-----------------------------------------------------------------------------+
# |id |json_out |
# +---+-----------------------------------------------------------------------------+
# |1 |{"dia":"Lunes","tareas":["Corregir ejercicios","Ir a nadar","Comprar pan"]} |
# |2 |{"dia":"Martes","tareas":["Preparar clase","Revisar correos"]} |
# |3 |{"dia":"Miércoles","tareas":["Reunión departamento","Tutorías"]} |
# +---+-----------------------------------------------------------------------------+
VARIANT como alternativa moderna
En la sesión anterior aprendimos que desde Spark 4.0, el tipo VARIANT permite trabajar con JSON heterogéneo sin declarar un StructType previo y con un rendimiento significativamente superior al almacenamiento como cadena.
Cuando el esquema de los datos es estable y conocido —como en el ejemplo de este apartado—, from_json con StructType sigue siendo una opción perfectamente válida, y además es la única disponible en clústeres Spark 3.x. VARIANT se vuelve la opción preferente cuando el esquema es heterogéneo, evolutivo o desconocido a priori.
UDF¶
Además de las funciones que ofrece Spark, en cualquier momento podemos crear nuestras funciones de usuario (User-Defined Functions) para ampliar la expresividad de Spark. Antes de utilizarlas, las hemos de definir y registrar.
Si volvemos al dataset de ventas, teníamos la siguiente información:
df.select("ProductID", "Revenue", "Units").sort("Units", ascending=False).show(5)
# +---------+-------+-----+
# |ProductID|Revenue|Units|
# +---------+-------+-----+
# | 495|43194.1| 77|
# | 2091| 6347.7| 41|
# | 2091| 6240.1| 41|
# | 2091| 3652.7| 24|
# | 2091| 3560.9| 23|
# +---------+-------+-----+
# only showing top 5 rows
Vamos a crear una función para que, si vende más de una unidad, se le asigne a cada producto un bonus de un 1%.
UDF con Python¶
Es la forma más sencilla pero también la más lenta en PySpark. Spark serializa cada fila con pickle, la envía a un proceso Python separado, ejecuta la función y devuelve el resultado por serialización inversa.
Para ello, primero definiremos la función mediante Python, y posteriormente, la registraremos mediante la función udf:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
@udf(returnType=DoubleType())
def bonus(unidades, ventas):
if unidades == 1:
return 0.0
return unidades * ventas / 100
Así pues, si realizamos una consulta, ya podemos utilizar la función recién creada como si fuera una propia de Spark:
df.select("ProductID", "Revenue", "Units", bonus(df.Units, df.Revenue)).sort("Units", ascending=False).show(5)
# +---------+-------+-----+---------------------+
# |ProductID|Revenue|Units|bonus(Units, Revenue)|
# +---------+-------+-----+---------------------+
# | 495|43194.1| 77| 33259.456999999995|
# | 2091| 6347.7| 41| 2602.557|
# | 2091| 6240.1| 41| 2558.4410000000003|
# | 2091| 3652.7| 24| 876.6479999999999|
# | 2091| 3560.9| 23| 819.007|
# +---------+-------+-----+---------------------+
# only showing top 5 rows
UDF con Arrow¶
Desde Spark 3.5 podemos activar la optimización Arrow en las UDF escalares de Python. Spark utiliza entonces Apache Arrow como formato columnar para la (de)serialización entre la JVM y Python, eliminando gran parte del overhead de pickle.
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
# Activación por UDF concreta (Spark 3.5+)
@udf(returnType=DoubleType(), useArrow=True)
def bonus(unidades, ventas):
if unidades == 1:
return 0.0
return unidades * ventas / 100
# O activación global para toda la sesión
spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
Default en Spark 4.2
En Spark 4.2 la optimización Arrow pasa a estar activada por defecto (spark.sql.execution.pythonUDF.arrow.enabled=true). Si necesitas el comportamiento clásico, tendrás que ponerla explícitamente a false.
UDF con Pandas¶
Cuando la lógica opera sobre bloques de datos (y no fila a fila), las Pandas UDFs (también llamadas vectorized UDFs) son la opción más rápida. Reciben y devuelven pandas.Series completas, lo que permite aprovechar las operaciones vectorizadas de NumPy/Pandas.
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf("double")
def bonus_pandas(unidades: pd.Series, ventas: pd.Series) -> pd.Series:
# Operación vectorizada sobre Series completas
return pd.Series([0.0 if u == 1 else u * v / 100
for u, v in zip(unidades, ventas)])
df.select("ProductID", "Revenue", "Units",
bonus_pandas("Units", "Revenue").alias("bonus")).show(5)
Las Pandas UDFs soportan varios patrones: Series -> Series (la más habitual), Series -> Scalar (para agregaciones) e Iterator -> Iterator (para operaciones con estado o inicialización costosa). También existe el equivalente a nivel de agrupación: groupBy().applyInPandas(fn, schema).
UDF en SQL¶
Si queremos registrar una UDF de Python para usarla también desde Spark SQL, utilizamos spark.udf.register, la cual recibe el nombre que le asignaremos a la función, el nombre de la función Python a invocar, y el tipo de dato que devuelve:
spark.udf.register("udfBonus", bonus, DoubleType())
spark.sql("select ProductID, Revenue, Units, udfBonus(Units, Revenue) as bonus from ventas order by Units desc").show(5)
Además, Spark 4.0 introduce una novedad muy útil la cual nos permite definir UDFs directamente en SQL mediante CREATE FUNCTION ... RETURN <expresión>. Estas funciones se ejecutan de forma nativa en el motor (sin overhead de Python) y el optimizador Catalyst las puede inlinear, por lo que su rendimiento es comparable al de las funciones nativas de Spark.
CREATE OR REPLACE TEMPORARY FUNCTION bonus4(unidades INT, ventas DOUBLE) RETURNS DOUBLE
RETURN CASE WHEN unidades = 1 THEN 0.0
ELSE unidades * ventas / 100 END;
SELECT ProductID, Revenue, Units, bonus4(Units, Revenue) AS bonus
FROM ventas
ORDER BY Units DESC
LIMIT 5;
Las SQL UDFs admiten también parámetros con valores por defecto, comentarios y marcar la función como determinista o no.
Rendimiento de las UDF en Python
El orden de preferencia, de más rápido a más lento, es:
- Funciones nativas de Spark (
col,when,regexp_extract…): compiladas por Catalyst/Tungsten, siempre que sea posible. - SQL UDFs (Spark 4+): nativas en la JVM, inlined por el optimizador.
- Pandas UDFs: vectorizadas sobre bloques con Arrow.
- UDFs Python con Arrow (
useArrow=True): fila a fila pero con (de)serialización columnar. - UDFs Python clásicas: fila a fila con
pickle, penalización significativa en datasets grandes.
Siempre que una lógica pueda expresarse con funciones nativas, hay que preferir esa vía. Solo cuando no sea posible, elegir el tipo de UDF según el patrón de la lógica.
Cacheando¶
Un DataFrame se puede persistir/cachear en memoria conforme necesitemos (también lo podemos hacer con los RDD). Su principal propósito es cuando vamos a acceder a un DataFrame una y otra vez y no necesitamos que se vuelvan a evaluar todas las operaciones (como pueden ser los algoritmos iterativos utilizados en Machine Learning).
Más información
Si estás interesado en optimizar el uso de memoria al trabajar con DataFrames, Brayan Buitrago tiene una serie de artículos sobre Spark Performace: Cache() & Persist() muy interesantes.
Cuando persistimos un dataset, cada nodo almacena sus datos particionados en memoria y/o disco y los reutiliza en otras operaciones sobre dicho dataset.
Para ello, se emplean los métodos cache / persist y unpersist para cachear y liberar los datos.
df.persist()
df.count() # forzamos la evaluación perezosa
Si queremos realizarlo con SparkSQL:
ventasCanada.createOrReplaceTempView("ventasCanada")
// Si queremos cachear la tabla mediante SQl
spark.catalog.cacheTable("ventasCanada")
Una vez persistidos los datos, si accedemos a http://localhost:4040 veremos en la pestaña Storage que se ha creado la tabla, su tipo de almacenamiento y particiones cacheadas:
Una diferencia fundamental a la hora de persistir un DataFrame en comparación con un RDD, es que como Spark SQL conoce el esquema de los datos en el DataFrame, puede organizarlos de forma columnar y aplicar compresión sobre éstos para minimizar el espacio necesario.
DataFrames y Pandas¶
En cualquier momento podemos pasar los datos de un DataFrame de PySpark a uno de Pandas para poder aprovechar su API.
Si seguimos con el dataset de Yelp, vamos a preparar una consulta de nos devuelva la cantidad de votos recibidos y puntuación media de cada ciudad:
from pyspark.sql.functions import count, avg, round
dfVotosCiudades = df.groupBy("city").agg(count("city").alias("votos"), round(avg("stars"), 3).alias("media")).orderBy("votos", ascending=False).limit(10)
dfVotosCiudades.show()
# +----------+-----+-----+
# | city|votos|media|
# +----------+-----+-----+
# | Phoenix| 5492|3.658|
# |Scottsdale| 2617|3.809|
# | Tempe| 1444| 3.64|
# | Mesa| 1348|3.644|
# | Chandler| 1178|3.677|
# | Glendale| 821|3.588|
# | Gilbert| 630|3.755|
# | Peoria| 385|3.614|
# | Surprise| 241|3.598|
# | Goodyear| 214|3.498|
# +----------+-----+-----+
Nos traemos esos datos a Pandas mediante el método .toPandas().:
pdVC = dfVotosCiudades.toPandas()
A partir de este momento pdVC es un DataFrame de Pandas:
Y con el DataFrame de Pandas, ya podemos generar gráficos:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
plt.figure(figsize=(10,6))
plt.ticklabel_format(useOffset=False, style="plain")
sns.set_theme(style="whitegrid")
sns.barplot(x="votos", y="city", data=pdVC).set_title("Votos por Ciudad")
plt.xlabel("Votos emitidos")
plt.ylabel("Ciudades")
plt.show()
O por ejemplo, si queremos unir dos gráficos:
plt.figure(figsize=(10,6))
sns.set_theme(style="white")
ax = sns.barplot(data = pdVC, y="votos", x="city")
ax2 = ax.twinx()
sns.lineplot(data = pdVC['media'], marker='o', color='crimson', ax=ax2)
plt.show()
Obteniendo:
Cuidado con toPandas()
Al llamar a toPandas() convertimos el Spark DataFrame distribuido en un pandas.DataFrame nativo, trayendo todos los datos al nodo driver. Esto provoca la pérdida de la distribución y puede causar un error de falta de memoria (OutOfMemoryError) si el DataFrame es grande.
Por ello, hay que evitar toPandas() para el procesamiento habitual de los datos: pandas solo puede usar los recursos del nodo driver, perdiendo toda la potencia del clúster. Únicamente lo
usaremos cuando vayamos a visualizar los datos con Matplotlib o Seaborn como requisito de estas librerías, y tras haber reducido previamente el volumen con agregaciones, filtros o limit.
Si realmente necesitas trabajar con la API de pandas manteniendo la ejecución distribuida, utiliza
Pandas API on Spark (pyspark.pandas), que imita la API de pandas pero ejecuta en el clúster.
Pandas API on Spark
Desde Spark 3.2, la librería Koalas se integró en Spark como Pandas API on Spark (módulo pyspark.pandas). El proyecto Koalas independiente está descontinuado; toda la funcionalidad vive ahora en el módulo oficial de PySpark.
Para importarlo:
import pyspark.pandas as ps
A partir de ese momento, ps se comporta como pd pero ejecuta de forma distribuida en el clúster. En Spark 4.1, Pandas API on Spark también respeta el modo ANSI por defecto, por lo que algunas operaciones que antes fallaban silenciosamente (divisiones por cero, overflow) ahora lanzan error.
Conversión bidireccional:
# De Spark DataFrame a pandas-on-Spark
psdf = ps.DataFrame(dfVotosCiudades)
# De pandas-on-Spark a Spark DataFrame
sdf = psdf.to_spark()
# De pandas-on-Spark a pandas "normal" (trae todo al driver)
pdf = psdf.to_pandas()
FAQ¶
A continuación se recogen preguntas habituales sobre los conceptos de esta sesión que suelen realizarse en entrevistas de trabajo para puestos de ingeniería o ciencia de datos.
Despliega cada pregunta para ver una respuesta orientativa; no hay una única respuesta correcta, pero sí aspectos clave que conviene mencionar.
¿Qué estrategias de join existen en Spark y cuándo se usa cada una?
Un join distribuido es una operación costosa porque en general requiere un shuffle: mover los datos entre nodos para que las filas con la misma clave acaben en la misma partición. Spark dispone de varias estrategias y elige automáticamente la más adecuada en función del tamaño de los DataFrames:
| Estrategia | Cuándo se usa |
|---|---|
| Broadcast Hash Join | Uno de los dos lados es pequeño (por defecto, menos de 10 MB, configurable con spark.sql.autoBroadcastJoinThreshold). Spark envía el lado pequeño a todos los nodos y evita el shuffle del lado grande. |
| Sort Merge Join | Ambos lados son grandes. Es la estrategia por defecto: ordena los datos por la clave en cada partición y los fusiona. |
| Shuffle Hash Join | Uno de los lados es moderado y cabe en la memoria de un executor. Menos habitual. |
La optimización más importante es el Broadcast Join: cuando una de las tablas es pequeña(dimensiones, catálogos, tablas de referencia), conviene forzarlo explícitamente con broadcast():
from pyspark.sql.functions import broadcast
dfVuelos.join(broadcast(dfAeropuertos), dfVuelos.origin == dfAeropuertos.IATA)
Puedes inspeccionar qué estrategia ha elegido Spark con explain().
¿Cuándo usar UDFs y cuándo usar funciones nativas de Spark SQL?
Las funciones nativas (col(), when(), regexp_extract(), to_date()…) se compilan mediante Catalyst y Tungsten, siendo muy eficientes. Las UDF escalares clásicas en Python rompen esta optimización porque Spark debe serializar cada fila con pickle, enviarla a un proceso Python separado, ejecutar la función y deserializar el resultado (Python overhead).
Desde Spark 3.5, activando useArrow=True en el decorador udf() (o bien spark.sql.execution.pythonUDF.arrow.enabled, que pasa a ser el default en Spark 4.2), Spark utiliza Apache Arrow para la (de)serialización columnar entre la JVM y Python, reduciendo drásticamente ese overhead.
Si se necesita una UDF personalizada, el orden de preferencia es:
- SQL UDF (Spark 4+) si la lógica se puede expresar con expresiones SQL: rendimiento prácticamente nativo.
- Pandas UDF vectorizada (
@pandas_udf) para lógica que opera sobre columnas enteras usando Arrow. - Python UDF con Arrow (
useArrow=True) para lógica fila a fila. - Python UDF clásica solo como último recurso.
¿Qué diferencia hay entre groupBy().agg() y groupBy().pivot()?
groupBy().agg() agrupa filas por una o más columnas y aplica funciones de agregación (sum, count, avg…) generando una fila por grupo. groupBy().pivot() hace lo mismo pero además transpone los valores únicos de una columna en nuevas columnas del resultado, creando una tabla pivote. Es equivalente a un PIVOT en SQL.
¿Qué ocurre con los nulos en las operaciones de join?
Por defecto, Spark trata los nulos como valores no iguales a ningún otro valor, incluido otro nulo. Esto significa que las filas con null en la clave de join no se emparejarán en un inner join. Si se necesita tratar los nulos como iguales entre sí, hay que usar df.join(df2, df["clave"].eqNullSafe(df2["clave"])). En outer joins, las filas sin pareja aparecen con null en las columnas del lado que no ha podido emparejar.
Referencias¶
- Documentación oficial sobre Spark SQL, DataFrames and Datasets Guide
- Beginning Apache Spark 3: With DataFrame, Spark SQL, Structured Streaming, and Spark Machine Learning Library
- Spark by Examples
- The Most Complete Guide to pySpark DataFrames
- Spark SQL Cheatsheet en PDF y en formato web
Actividades¶
(RABDA.1 / CEBDA.1d y CEBDA.1e) En las siguientes actividades vamos a realizar agregaciones mediante el uso del API de DataFrames de Spark.
-
(2p) Sobre las películas de la sesión anterior:
- ¿Cuantas películas diferentes hay?
- ¿En cuantas películas ha trabajado
Murphy, Eddie (I)? - ¿Cuáles son los actores que han aparecido en más de 30 películas?
- ¿En que película anterior a 1980 aparecen al menos 25 intérpretes?
- Muestra la cantidad de películas producidas cada año (solo debe mostrar el año y la cantidad), ordenando el listado por la cantidad de forma descendente.
- A partir de la consulta anterior, crea un gráfico de barras que muestre el año y la cantidad de películas, ordenados por fecha.
-
(2p) Nos han enviado un nuevo archivo llamado
movie-ratings.tsvque contiene las calificaciones de las películas.- Crea un DataFrame que contenga los datos de ambos datasets (usando algún tipo de join).
- Persiste el DataFrame y comprueba que aparece en el Spark UI. ¿Cuánto ocupa?
- Muestra para cada año, la película con mayor puntuación (año, título de la película, puntuación)
- Sobre los datos anteriores, obtén también una lista con los nombres de los intérpretes.
- Averigua las tres parejas de intérpretes han trabajado juntos en más ocasiones. La salida debe tener tres columnas:
interprete1,interprete2ycantidad. (necesitas utilizar un self-join) - Opcional (Spark 4+): reescribe la consulta anterior usando sintaxis pipe (
|>) y compara la legibilidad con la versión SQL tradicional.
Las siguientes dos actividades se plantean como un proyecto opcional a realizar en casa:
-
Hemos recibido un dataset con las ventas de 2019 de una tienda americana de productos de tecnología, mediante un conjunto de ficheros en formato CSV comprimidos en salesdata.zip.
- Una vez descomprimidos los datos, crea un DataFrame con todos los datos, infiriendo el esquema.
-
Vuelve a realizar la lectura de los datos pero con el siguiente esquema:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType esquema = StructType([ StructField("Order ID", IntegerType(), False), StructField("Product", StringType(), False), StructField("Quantity Ordered", IntegerType(), True), StructField("Price Each", DoubleType(), False), StructField("Order Date", StringType(), False), StructField("Purchase Address", StringType(), False) ]) -
Tras la lectura, vamos a realizar la limpieza de datos. El primer paso será renombrar la columnas para eliminar los espacios en blanco.
- Elimina las filas que contengan algún campo nulo.
- Comprueba si las cabeceras de los archivos aparecen como datos del dataset (por ejemplo, un producto cuyo nombre sea
Product). Si fuera el caso, elimina dichas filas. - A partir del campo dirección, crea dos nuevas columnas para almacenar la ciudad (
City) y el estado (State). Por ejemplo, para la dirección136 Church St, New York City, NY 10001, la ciudad esNew York Cityy el estado esNY. - Modifica el campo con la fecha del pedido para que su formato sea timestamp.
- Sobre el campo anterior, crea dos nuevas columnas, con el mes (
Month) y el año (Year) del pedido.
-
Una vez realizada la transformación de los datos, vamos a realizar su carga y extraer información, utilizando Spark SQL siempre que sea posible:
- Almacena los datos en formato Parquet en la carpeta
salesoutputparticionando los datos por año y mes. Tras ejecutar esta operación, comprueba en disco la estructura de archivos creada. - Sobre los datos almacenados, realiza una nueva lectura pero solo leyendo los datos de 2019 los cuales deberían estar almacenados en
./salesdataoutput/Year=2019. -
Averigua cual ha sido el mes que ha recaudado más. Para ello, deberás multiplicar el precio por la cantidad de unidades, y posteriormente, realizar alguna agregación. Sobre el resultado, crea un gráfico similar al siguiente:
Ventas por mes Matplotlib y Seaborn
En la máquina virtual no están instaladas las librerías gráficas. De hecho ,si instalamos Seaborn, MatplotLib se instalará automáticamente por ser una librería dependiente, por lo que puedes instalarlas mediante:
pip install seaborn -
Obtén un gráfico con las 10 ciudades que más unidades han vendido.
Ciudades con más unidades vendidas -
Cantidad de pedidos por Horas en las que se ha realizado un pedido que contenía al menos dos productos:
Pedidos de al menos dos productos por horas -
Listado con los productos del estado de
NYque se han comprado a la vez, obteniendo un resultado similar a:+------------------------------------------------------------+-----+ |Productos |count| +------------------------------------------------------------+-----+ |[iPhone, Lightning Charging Cable] |126 | |[Google Phone, USB-C Charging Cable] |124 | |[Google Phone, Wired Headphones] |52 | ...
- Almacena los datos en formato Parquet en la carpeta