Saltar a contenido

Spark DataFrames / SQL

Apuntes sin actualizar

Estos apuntes pertenecen al curso 21/22 y, por lo tanto, ya no se actualizan.

Puedes acceder a la última versión de esta sesión en https://aitor-medrano.github.io/iabd2223/spark/02dataframeAPI.html.

En la sesión anterior hemos introducido Spark y el uso de RDD para interactuar con los datos. Tal como comentamos, los RDD permiten trabajar a bajo nivel, siendo más cómodo y eficiente hacer uso de DataFrames y el lenguaje SQL.

DataFrames

Un DataFrame es una estructura equivalente a una tabla de base de datos relacional, con un motor bien optimizado para el trabajo en un clúster. Los datos se almacenan en filas y columnas y ofrece un conjunto de operaciones para manipular los datos.

El trabajo con DataFrames es más sencillo y eficiente que el procesamiento con RDD, por eso su uso es predominante en los nuevos desarrollos con Spark.

A continuación veremos cómo podemos obtener y persistir DataFrames desde diferentes fuentes y formatos de datos

Creando Dataframes

El caso más básico es crear un DataFrame a partir de una SparkSession pasándole un RDD:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() # SparkSession de forma programativa
# Creamos un RDD
datos = [("Aitor", 182), ("Pedro", 178), ("Marina", 161)]
rdd = spark.sparkContext.parallelize(datos)
# Creamos un DataFrame y mostramos su esquema
dfRDD = rdd.toDF()
dfRDD.printSchema()

Y obtenemos un resumen del esquema del DataFrame, donde para cada columna se indica el nombre, el tipo y si admite valores nulos:

root
 |-- _1: string (nullable = true)
 |-- _2: long (nullable = true)

Podemos ver como los nombres de las columnas son _1 y _2. Para asignarle un nombre adecuado podemos pasarle una lista con los nombres a la hora de crear el DataFrame:

columnas = ["nombre","altura"]
dfRDD = rdd.toDF(columnas)
dfRDD.printSchema()

Y ahora obtenemos:

root
 |-- nombre: string (nullable = true)
 |-- altura: long (nullable = true)

Si queremos mostrar sus datos, haremos uso del método show:

df.show()

Obteniendo una vista de los datos en forma de tabla:

+------+------+
|nombre|altura|
+------+------+
| Aitor|   182|
| Pedro|   178|
|Marina|   161|
+------+------+

También podemos crear un DataFrame directamente desde una SparkSession sin crear un RDD previamente:

# También podemos crear un DF desde SparkSession
dfDesdeDatos = spark.createDataFrame(datos).toDF(*columnas)
dfDesdeDatos.printSchema()

Mostrando los datos

Para los siguientes apartados, supongamos que queremos almacenar ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:

clientes = [
    ("Aitor", "Medrano", "Elche", 3000),
    ("Pedro", "Casas", "Elche", 4000),
    ("Laura", "García", "Elche", 5000), 
    ("Miguel", "Ruiz", "Torrellano", 6000),
    ("Isabel", "Guillén", "Alicante", 7000)
]
df = spark.createDataFrame(data=clientes)

Para mostrar los datos, ya hemos visto que podemos utilizar el método show, al cual le podemos indicar o no la cantidad de registros a recuperar, así como si queremos que los datos se trunquen o no, o si los queremos mostrar en vertical:

df.show(2)
# +------+---------+------+------+
# |nombre|apellidos|ciudad|sueldo|
# +------+---------+------+------+
# | Aitor|  Medrano| Elche|  3000|
# | Pedro|    Casas| Elche|  4000|
# +------+---------+------+------+
# only showing top 2 rows
df.show(truncate=False)
# +------+---------+----------+------+
# |nombre|apellidos|ciudad    |sueldo|
# +------+---------+----------+------+
# |Aitor |Medrano  |Elche     |3000  |
# |Pedro |Casas    |Elche     |4000  |
# |Laura |García   |Elche     |5000  |
# |Miguel|Ruiz     |Torrellano|6000  |
# |Isabel|Guillén  |Alicante  |7000  |
# +------+---------+----------+------+
df.show(3, vertical=True)
# -RECORD 0------------
#  nombre    | Aitor   
#  apellidos | Medrano 
#  ciudad    | Elche   
#  sueldo    | 3000    
# -RECORD 1------------
#  nombre    | Pedro   
#  apellidos | Casas   
#  ciudad    | Elche   
#  sueldo    | 4000    
# -RECORD 2------------
#  nombre    | Laura   
#  apellidos | García  
#  ciudad    | Elche   
#  sueldo    | 5000    
# only showing top 3 rows

Si sólo queremos recuperar unos pocos datos, podemos hacer uso de head o first los cuales devuelven objetos Row:

df.first()
# Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)
df.head()
# Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000)
df.head(3)
# [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
#  Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
#  Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000)]

Si queremos obtener un valor en concreto, una vez recuperada una fila, podemos acceder a sus columnas:

nom1 = df.first()[0]           # 'Aitor'
nom2 = df.first()["nombre"]    # 'Aitor'

También podemos obtener un sumario de los datos (igual que con Pandas) mediante describe:

df.describe().show()
# +-------+------+---------+----------+------------------+
# |summary|nombre|apellidos|    ciudad|            sueldo|
# +-------+------+---------+----------+------------------+
# |  count|     5|        5|         5|                 5|
# |   mean|  null|     null|      null|            5000.0|
# | stddev|  null|     null|      null|1581.1388300841897|
# |    min| Aitor|    Casas|  Alicante|              3000|
# |    max| Pedro|     Ruiz|Torrellano|              7000|
# +-------+------+---------+----------+------------------+

Si únicamente nos interesa saber cuantas filas tiene nuestro DataFrame, podemos hacer uso de count:

df.count()  # 5

Por último, como un DataFrame por debajo es un RDD, podemos usar collect y take conforme necesitemos y recuperar objetos de tipo Row:

df.collect()
# [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
#  Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000),
#  Row(nombre='Laura', apellidos='García', ciudad='Elche', sueldo=5000),
#  Row(nombre='Miguel', apellidos='Ruiz', ciudad='Torrellano', sueldo=6000),
#  Row(nombre='Isabel', apellidos='Guillén', ciudad='Alicante', sueldo=7000)]
df.take(2)
# [Row(nombre='Aitor', apellidos='Medrano', ciudad='Elche', sueldo=3000),
#  Row(nombre='Pedro', apellidos='Casas', ciudad='Elche', sueldo=4000)]
nom = df.collect()[0][0]        # 'Aitor'

Cargando diferentes formatos

Lo más usual es cargar los datos desde una archivo externo. Para ello, mediante el API de DataFrameReader cargaremos los datos directamente en un Dataframe mediante diferentes métodos dependiendo del formato (admite tanto el nombre de un recurso como una ruta de una carpeta).

Para cada formato, existe un método corto que se llama como el formato en sí, y un método general que finaliza en con el método load, siempre dentro de spark.read:

dfCSV = spark.read.csv("datos.csv")
dfCSV = spark.read.csv("datos/*.csv")   # Una carpeta entera
dfCSV = spark.read.option("sep", ";").csv("datos.csv")
dfCSV = spark.read.option("header", "true").csv("datos.csv")
dfCSV = spark.read.option("header", True).option("inferSchema", True).csv("datos.csv")
dfCSV = spark.read.options(sep=";", header=True, inferSchema=True).csv("pdi_sales.csv")
dfCSV = spark.read.format("csv").load("datos.csv") 
dfCSV = spark.read.load(format="csv", header="true", inferSchema="true").csv("datos.csv")

Más información en la documentación oficial

dfTXT = spark.read.text("datos.txt")
# cada fichero se lee entero como un registro
dfTXT = spark.read.option("wholetext", true).text("datos/")

dfTXT = spark.read.format("txt").load("datos.txt")

Más información en la documentación oficial

dfJSON = spark.read.json("datos.json")
dfJSON = spark.read.format("json").load("datos.json")

Más información en la documentación oficial

DataFrames desde JSON

Spark espera que cada documento JSON ocupe una única línea. Si cada documento ocupa más de una línea, se lo indicamos mediante la opción multiline:

df = spark.read.option("multiline",True).option("inferSchema", "true").json("datos.json")
dfParquet = spark.read.parquet("datos.parquet")
dfParquet = spark.read.format("parquet").load("datos.parquet")

Más información en la documentación oficial

Si lo que queremos es persistir los datos, en vez de read, utilizaremos write y si usamos la forma general usaremos el método save:

dfCSV.write.csv("datos.csv")
dfCSV.write.format("csv").save("datos.csv")
dfCSV.write.format("csv").mode("overwrite").save("datos.csv")

Más información en la documentación oficial

dfTXT.write.text("datos.txt")
dfTXT.write.option("lineSep",";").text("datos.txt")
dfTXT.write.format("txt").save("datos.txt")

Más información en la documentación oficial

dfJSON.write.json("datos.json")
dfJSON.write.format("json").save("datos.json")

Más información en la documentación oficial

dfParquet.write.parquet("datos.parquet")
dfParquet.write.format("json").save("datos.parquet")

Más información en la documentación oficial

Un único archivo de salida

Por cada partición, Spark generará un archivo de salida. Recuerda que podemos reducir el número de particiones mediante coalesce o repartition.

Una vez vista la sintaxis, vamos a ver un ejemplo completo de lectura de un archivo CSV (el archivo pdi_sales.csv que hemos utilizado durante todo el curso) que está almacenado en HDFS y que tras leerlo, lo guardamos como JSON de nuevo en HDFS:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-dataframe-csv").getOrCreate()

# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("delimiter",";").option("header", "true").csv("hdfs://iabd-virtualbox:9000/user/iabd/pdi_sales.csv")

# df.printSchema()

df.write.json("hdfs://iabd-virtualbox:9000/user/iabd/pdi_sales_json")

Es conveniente destacar que para acceder a HDFS, únicamente hemos de indicar la URL del recurso con el prefijo hdfs:// más el host del namenode.

Comprimiendo los datos

Para configurar el algoritmo de compresión, si los datos está en Parquet o Avro, a nivel de la sesión de Spark, podemos realizar su configuración:

spark.setConf("spark.sql.parquet.compression.codec","snappy")
spark.setConf("spark.sql.parquet.compression.codec","none")
spark.setConf("spark.sql.avro.compression.codec","snappy")

Si sólo queremos hacerlo para una operación en particular, para cada lectura/escritura le añadimos .option("compression", "algoritmo"). Por ejemplo:

dfVentas = spark.read.option("compression", "snappy").option("delimiter",";").option("header", "true").csv("pdi_sales.csv")
dfClientes = spark.read.option("compression", "snappy").parquet("clientes.parquet")
dfVentas.write.option("compression", "snappy").format("avro").save("ventas.avro")

Datos y Esquemas

El esquema completo de un DataFrame se modela mediante un StructType, el cual contiene una colección de objetos StructField. Así pues, cada columna de un DataFrame de Spark se modela mediante un objeto StructField indicando su nombre, tipo y gestión de los nulos.

Hemos visto que al crear un DataFrame desde un archivo externo, podemos inferir el esquema. Si queremos crear un DataFrame desde un esquema propio utilizaremos los tipos StructType, StructField, así como StringType, IntegerType o el tipo necesario para cada columna. Para ello, primero hemos de importarlos (como puedes observar, estas clases pertenecen a las librerías SQL de PySpark):

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

Tipos

Además de cadenas y enteros, flotantes (FloatType) o dobles (DoubleType), tenemos tipos booleanos (BooleanType), de fecha (DateType y TimestampType), así como tipos complejos como ArrayType, MapType y StructType. Para más información, consultar la documentación oficial.

Volvamos al ejemplo anterior donde tenemos ciertos datos de clientes, como son su nombre y apellidos, ciudad y sueldo:

clientes = [
    ("Aitor", "Medrano", "Elche", 3000),
    ("Pedro", "Casas", "Elche", 4000),
    ("Laura", "García", "Elche", 5000), 
    ("Miguel", "Ruiz", "Torrellano", 6000),
    ("Isabel", "Guillén", "Alicante", 7000)
]

Para esta estructura, definiremos un esquema con los campos, indicando para cada uno de ellos su nombre, tipo y si admite valores nulos:

esquema = StructType([
    StructField("nombre", StringType(), False),
    StructField("apellidos", StringType(), False),
    StructField("ciudad", StringType(), True),
    StructField("sueldo", IntegerType(), False)
])

A continuación ya podemos crear un DataFrame con datos propios que cumplen un esquema haciendo uso del método createDataFrame:

df = spark.createDataFrame(data=clientes, schema=esquema)
df.printSchema()
# root
#  |-- nombre: string (nullable = false)
#  |-- apellidos: string (nullable = false)
#  |-- ciudad: string (nullable = true)
#  |-- sueldo: integer (nullable = false)
df.show(truncate=False)
# +------+---------+----------+------+
# |nombre|apellidos|ciudad    |sueldo|
# +------+---------+----------+------+
# |Aitor |Medrano  |Elche     |3000  |
# |Pedro |Casas    |Elche     |4000  |
# |Laura |García   |Elche     |5000  |
# |Miguel|Ruiz     |Torrellano|6000  |
# |Isabel|Guillén  |Alicante  |7000  |
# +------+---------+----------+------+

Si lo que queremos es asignarle un esquema a un DataFrame que vamos a leer desde una fuente de datos externa, hemos de emplear el método schema:

dfClientes = spark.read.option("header", True).schema(esquema).csv("clientes.csv")

Rendimiento y esquema

La inferencia de los tipos de los datos es un proceso computacionalmente costoso. Por ello, si nuestro conjunto de datos es grande, es muy recomendable crear el esquema de forma programativa y configurarlo en la carga de datos.

Se recomienda la lectura del artículo Using schemas to speed up reading into Spark DataFrames.

Respecto al esquema, tenemos diferentes propiedades como columns, dtypes y schema con las que obtener su información:

df.columns
# ['nombre', 'apellidos', 'ciudad', 'sueldo']
df.dtypes
# [('nombre', 'string'),
#  ('apellidos', 'string'),
#  ('ciudad', 'string'),
#  ('sueldo', 'int')]
df.schema
# StructType(List(StructField(nombre,StringType,false),StructField(apellidos,StringType,false),StructField(ciudad,StringType,true),StructField(sueldo,IntegerType,false)))

Si una vez hemos cargado un DataFrame queremos cambiar el tipo de una de sus columnas, podemos hacer uso del método withColumn:

# Forma larga
from pyspark.sql.types import DoubleType
df = df.withColumn("sueldo", df.sueldo.cast(DoubleType())
# Forma corta
df = df.withColumn("sueldo", df.sueldo.cast("double"))

# df = df.withColumn("fnac", to_date(df.Date, "M/d/yyy"))

DataFrame API

Una vez tenemos un DataFrame podemos trabajar con los datos mediante un conjunto de operaciones estructuradas, muy similares al lenguaje relacional. Estas operaciones también se clasifican en transformaciones y acciones, recordando que las transformaciones utilizan una evaluación perezosa.

Es muy importante tener en cuenta que todas las operaciones que vamos a realizar a continuación son immutables, es decir, nunca van a modificar el DataFrame sobre el que realizamos la transformación. Así pues, realizaremos encadenamiento de transformaciones (transformation chaining) o asignaremos el resultado a un nuevo DataFrame.

Preparación

Para los siguientes apartados, vamos a trabajar sobre el siguiente DataFrame con el fichero de ventas que hemos utilizado a lo largo del curso:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-dataframes-api").getOrCreate()
# Lectura de CSV con el ; como separador de columnas y con encabezado
df = spark.read.option("sep",";").option("header", "true").option("inferSchema", "true").csv("pdi_sales_small.csv")
df.printSchema()

El esquema generado es:

root
|-- ProductID: integer (nullable = true)
|-- Date: string (nullable = true)
|-- Zip: string (nullable = true)
|-- Units: integer (nullable = true)
|-- Revenue: double (nullable = true)
|-- Country: string (nullable = true)

Proyectando

La operación select permite indicar las columnas a recuperar pasándolas como parámetros:

df.select("ProductID","Revenue").show(3)
+---------+-------+
|ProductID|Revenue|
+---------+-------+
|      725|  115.5|
|      787|  314.9|
|      788|  314.9|
+---------+-------+
only showing top 3 rows

También podemos realizar cálculos (referenciando a los campos con nombreDataframe.nombreColumna) sobre las columnas y crear un alias (operación asociada a un campo):

df.select(df.ProductID,(df.Revenue+10).alias("VentasMas10")).show(3)
+---------+-----------+
|ProductID|VentasMas10|
+---------+-----------+
|      725|      125.5|
|      787|      324.9|
|      788|      324.9|
+---------+-----------+
only showing top 3 rows

Si tenemos un DataFrame con un gran número de columnas y queremos recuperarlas todas a excepción de unas pocas, es más cómodo utilizar la transformación drop, la cual funciona de manera opuesta a select, es decir, indicando las columnas que queremos quitar del resultado:

# Obtenemos el mismo resultado
df.select("ProductID", "Date", "Zip")
df.drop("Units", "Revenue", "Country")

Trabajando con columnas

Para acceder a las columnas, debemos crear objetos Column. Para ello, podemos seleccionarlos a partir de un DataFrame como una propiedad o mediante la función col:

nomCliente = df.nombre
nomCliente = df["ProductID"]
nomCliente = col("ProductID")

Así pues, podemos recuperar ciertas columnas de un DataFrame con cualquier de las siguientes expresiones:

from pyspark.sql.functions import col

df.select("ProductID", "Revenue").show()
df.select(df.ProductID, df.Revenue).show()
df.select(df["ProductID"], df["Revenue"]).show()
df.select(col("ProductID"), col("Revenue")).show()

col vs expr

En ocasiones se confunde el uso de la función col con expr. Aunque podemos referenciar a una columna haciendo uso de expr, su uso provoca que se parseé la cadena recibida para interpretarla.

Para el siguiente ejemplo, supongamos que tenemos un DataFrame con datos de clientes. Utilizaremos también la función concat_ws para concatenar textos utilizado un separador.

from pyspark.sql.functions import col,expr

df.select(concat_ws(" ", col("nombre"), col("apellidos")).alias("nombreCompleto"),
          "sueldo",
          (col("sueldo")*1.1).alias("nuevoSueldo")).show() 
df.select(concat_ws(" ", col("nombre"), col("apellidos")).alias("nombreCompleto"),
          "sueldo",
          expr("sueldo*1.1").alias("nuevoSueldo")).show() 

Añadiendo columnas

Una vez tenemos un DataFrame, podemos añadir columnas mediante el método withColumn:

dfNuevo = df.withColumn("total", df.Units * df.Revenue)
dfNuevo.show()
# +---------+----------+---------------+-----+-------+-------+------+
# |ProductID|      Date|            Zip|Units|Revenue|Country| total|
# +---------+----------+---------------+-----+-------+-------+------+
# |      725|1999-01-15|41540          |    1|  115.5|Germany| 115.5|
# |      787|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
# |      788|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
# |      901|1999-02-15|13587          |    2|  818.9|Germany|1637.8|
# ...

withColumn

Anteriormente hemos utilizado el método withColumn para cambiarle el tipo a un campo ya existente. Así pues, si referenciamos a una columna que ya existe, en vez de crearla, la sustituirá.

Otra forma de añadir una columna con una expresión es mediante la transformación selectExpr. Por ejemplo, podemos conseguir el mismo resultado que en el ejemplo anterior de la siguiente manera:

df.selectExpr("*", "Units * Revenue as total").show()`
# +---------+----------+---------------+-----+-------+-------+------+
# |ProductID|      Date|            Zip|Units|Revenue|Country| total|
# +---------+----------+---------------+-----+-------+-------+------+
# |      725|1999-01-15|41540          |    1|  115.5|Germany| 115.5|
# |      787|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
# |      788|2002-06-06|41540          |    1|  314.9|Germany| 314.9|
# |      901|1999-02-15|13587          |    2|  818.9|Germany|1637.8|
# ...

Aunque más adelante veremos como realizar transformaciones con agregaciones, mediante selectExpr también podemos realizar analítica de datos aprovechando la potencia de SQL:

df.selectExpr("count(distinct(ProductID)) as productos","count(distinct(Country)) as paises").show()
# +---------+------+
# |productos|paises|
# +---------+------+
# |      799|     4|
# +---------+------+

Cambiando el nombre

Si por algún extraño motivo necesitamos cambiarle el nombre a una columna (por ejemplo, vamos a unir dos DataFrames que tienen columnas con el mismo nombre pero en posiciones diferentes, o que al inferir el esquema tenga un nombre críptico o demasiado largo y queremos que sea más legible) podemos utilizar la transformación withColumnRenamed:

df.withColumnRenamed("Zip", "PostalCode").show(5)
# +---------+----------+---------------+-----+-------+-------+
# |ProductID|      Date|     PostalCode|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# |      725|1999-01-15|41540          |    1|  115.5|Germany|
# |      787|2002-06-06|41540          |    1|  314.9|Germany|
# |      788|2002-06-06|41540          |    1|  314.9|Germany|
# |      940|1999-01-15|22587          |    1|  687.7|Germany|
# |      396|1999-01-15|22587          |    1|  857.1|Germany|
# +---------+----------+---------------+-----+-------+-------+
# only showing top 5 rows

Filtrando

Si queremos eliminar filas, usaremos el método filter:

df.filter(df.Country=="Germany").show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID|      Date|            Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# |      725|1999-01-15|41540          |    1|  115.5|Germany|
# |      787|2002-06-06|41540          |    1|  314.9|Germany|
# |      788|2002-06-06|41540          |    1|  314.9|Germany|
# |      940|1999-01-15|22587          |    1|  687.7|Germany|

Por similitud con SQL, podemos utilizar también where como un alias de filter:

df.where(df.Units>20).show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID|      Date|            Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# |      495|1999-03-15|75213 CEDEX 16 |   77|43194.1| France|
# |     2091|1999-05-15|9739           |   24| 3652.7| Mexico|
# |     2091|1999-06-15|40213          |   41| 6240.1|Germany|
# |     2091|1999-10-15|40213          |   41| 6347.7|Germany|
# |     2091|1999-12-15|40213          |   23| 3560.9|Germany|
# +---------+----------+---------------+-----+-------+-------+

Podemos utilizar los operadores lógicos (& para conjunción y | para la disyunción) para crear condiciones compuestas (recordad rodear cada condición entre paréntesis):

df.filter((df.Country=="Germany") & (df.Units>20)).show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID|      Date|            Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# |     2091|1999-06-15|40213          |   41| 6240.1|Germany|
# |     2091|1999-10-15|40213          |   41| 6347.7|Germany|
# |     2091|1999-12-15|40213          |   23| 3560.9|Germany|
# +---------+----------+---------------+-----+-------+-------+
df.filter((df.ProductID==2314) | (df.ProductID==1322)).show()
# +---------+----------+---------------+-----+-------+-------+
# |ProductID|      Date|            Zip|Units|Revenue|Country|
# +---------+----------+---------------+-----+-------+-------+
# |     2314|1999-05-15|46045          |    1|   13.9|Germany|
# |     1322|2000-01-06|75593 CEDEX 12 |    1|  254.5| France|
# +---------+----------+---------------+-----+-------+-------+

Un caso particular de filtrado es la eliminación de los registros repetidos, lo cual lo podemos hacer de dos maneras:

  • Haciendo uso del método distinct tras haber realizado alguna transformación
  • Utilizando dropDuplicates sobre un DataFrame:
df.select("Country").distinct().show()
df.dropDuplicates(["Country"]).select("Country").show()
# +-------+
# |Country|
# +-------+
# |Germany|
# | France|
# | Mexico|
# | Canada|
# +-------+

Ordenando

Una vez recuperados los datos deseados, podemos ordenarlos mediante sort u orderBy (son operaciones totalmente equivalentes):

df.select("ProductID","Revenue").sort("Revenue").show(5)
df.sort("Revenue").show(5)
df.sort("Revenue", ascending=True).show(5)
df.sort(df.Revenue.asc()).show(5)

# Ordenación descendiente
df.sort(df.Revenue.desc()).show(5)
df.sort("Revenue", ascending=False).show(5)
from pyspark.sql.functions import desc

# Ordenación diferente en cada columna
df.sort(df.Revenue.desc(), df.Units.asc()).show(5)
df.sort(["Revenue","Units"], ascending=[0,1]).show(5)

Por ejemplo, con la última operación obtendríamos:

+---------+----------+---------------+-----+-------+-------+
|ProductID|      Date|            Zip|Units|Revenue|Country|
+---------+----------+---------------+-----+-------+-------+
|      495|1999-03-15|75213 CEDEX 16 |   77|43194.1| France|
|      495|2000-03-01|75391 CEDEX 08 |   18|10395.0| France|
|      464|2003-06-11|75213 CEDEX 16 |   16|10075.8| France|
|      464|2000-08-01|22397          |   17| 9817.5|Germany|
|      495|2000-03-01|06175 CEDEX 2  |   16| 9240.0| France|
+---------+----------+---------------+-----+-------+-------+
only showing top 5 rows

Normalmente, tras realizar una ordenación, es habitual quedarse con un subconjunto de los datos. Para ello, podemos utilizar la transformación limit.

Por ejemplo, la siguiente transformación es similar al ejemplo anterior, sólo que ahora al driver únicamente le llegan 5 registros, en vez de traerlos todos y sólo mostrar 5:

df.sort(df.Revenue.desc(), df.Units.asc()).limit(5).show()

Añadiendo filas

La única manera de añadir filas a un DataFrame es creando uno nuevo que sea el resultado de unir dos DataFrames que compartan el mismo esquema (mismo nombres de columnas y en el mismo orden). Para ello, utilizaremos la transformación union que realiza la unión por el orden de las columnas:

nuevasVenta = [
    (6666, "2022-03-24", "03206", 33, 3333.33, "Spain"),
    (6666, "2022-03-25", "03206", 22, 2222.22, "Spain"),
]
# Creamos un nuevo DataFrame con las nuevas Ventas
nvDF = spark.createDataFrame(nuevasVenta)
# Unimos los dos DataFrames
dfUpdated = df.union(nvDF)

Cogiendo muestras

Si necesitamos recoger un subconjunto de los datos, ya sea para preparar los datos para algún modelo de machine learning como para una muestra aleatoria de los mismos, podemos utilizar las siguientes transformaciones:

  • sample permite obtener una muestra a partir de un porcentaje (no tiene porqué obtener una cantidad exacta). También admite un semilla e indicar si queremos que pueda repetir los datos.

    df.count()                  # 120239
    muestra = df.sample(0.10)
    muestra.count()             # 11876
    muestraConRepetidos = df.sample(True, 0.10)
    muestraConRepetidos.count() # 11923
    
  • randomSplit recupera diferentes DataFrames cuyos tamaños en porcentaje se indican como parámetros (si no suman uno, los parámetros se normalizan):

    dfs = df.randomSplit([0.8, 0.2])
    dfEntrenamiento = dfs[0]
    dfPrueba = dfs[1]
    dfEntrenamiento.count()     # 96194
    dfPrueba.count()            # 24045
    

Trabajando con datos sucios

Hay tres formas de gestionar la suciedad de los datos o la omisión completa de los mismos:

  1. Eliminar las filas que tienen valores vacíos en una o más columnas.
  2. Rellenar los valores nulos con valores que definimos nosotros.
  3. Sustituir los datos erróneos por algún valor que sepamos como gestionarlo.

Vamos a ver cada uno de estos casos a partir del siguiente dataset:

malasVentas = [
    (6666, "2022-03-22", "03206", 33, 3333.33, "Spain"),
    (6666, "2022-03-22", None, 33, 3333.33, "Spain"),
    (6666, "2022-03-23", "03206", None, 2222.22, "Spain"),
    (6666, "2022-03-24", "03206", None, None, "Espain"),
    (None, None, None, None, None, None)
]
malDF = spark.createDataFrame(malasVentas, ["ProductID", "Date", "Zip", "Units", "Revenue" , "Country"])
malDF.show()
# +---------+----------+-----+-----+-------+-------+
# |ProductID|      Date|  Zip|Units|Revenue|Country|
# +---------+----------+-----+-----+-------+-------+
# |     6666|2022-03-22|03206|   33|3333.33|  Spain|
# |     6666|2022-03-22| null|   33|3333.33|  Spain|
# |     6666|2022-03-23|03206| null|2222.22|  Spain|
# |     6666|2022-03-24|03206| null|   null| Espain|
# |     6666|2022-03-25|03206| null|2222.22|   null|
# +---------+----------+-----+-----+-------+-------+

Si queremos saber si una columna contiene nulos, podemos hacer un filtrado utilizando el método isNull sobre los campos deseados (también podemos utilizar isNotNull si queremos el caso contrario):

malDF.filter(malDF.Zip.isNull()).show()
# +---------+----------+----+-----+-------+-------+
# |ProductID|      Date| Zip|Units|Revenue|Country|
# +---------+----------+----+-----+-------+-------+
# |     6666|2022-03-22|null|   33|3333.33|  Spain|
# +---------+----------+----+-----+-------+-------+

Para trabajar con las filas que contengan algún dato nulo, podemos acceder a la propiedad na, la cual devuelve un DataFrameNaFunctions sobre la que podemos indicarle:

  • que la elimine mediante el método drop / dropna. Puede recibir "any" (borrará las filas que contengan algún nulo) o "all" (borrará las filas que todas sus columnas contengan nulos) y una lista con las columnas a considerar:

    # Elimina todos los nulos
    malDF.na.drop().show()
    # +---------+----------+-----+-----+-------+-------+
    # |ProductID|      Date|  Zip|Units|Revenue|Country|
    # +---------+----------+-----+-----+-------+-------+
    # |     6666|2022-03-22|03206|   33|3333.33|  Spain|
    # +---------+----------+-----+-----+-------+-------+
    
    # Elimina las filas que todas sus columnas son nulas
    malDF.na.drop("all").show()
    # +---------+----------+-----+-----+-------+-------+
    # |ProductID|      Date|  Zip|Units|Revenue|Country|
    # +---------+----------+-----+-----+-------+-------+
    # |     6666|2022-03-22|03206|   33|3333.33|  Spain|
    # |     6666|2022-03-22| null|   33|3333.33|  Spain|
    # |     6666|2022-03-23|03206| null|2222.22|  Spain|
    # |     6666|2022-03-24|03206| null|   null| Espain|
    # +---------+----------+-----+-----+-------+-------+
    
    # Elimina las filas que tienen el Zip nulo
    malDF.na.drop(subset=["Zip"]).show()
    # +---------+----------+-----+-----+-------+-------+
    # |ProductID|      Date|  Zip|Units|Revenue|Country|
    # +---------+----------+-----+-----+-------+-------+
    # |     6666|2022-03-22|03206|   33|3333.33|  Spain|
    # |     6666|2022-03-23|03206| null|2222.22|  Spain|
    # |     6666|2022-03-24|03206| null|   null| Espain|
    # +---------+----------+-----+-----+-------+-------+
    

    También podemos indicar la cantidad de nulos que ha de contener cada fila para eliminarla mediante el parámetro thresh:

    # Elimina las filas con 3 o más nulos
    malDF.dropna(thresh = 3)
    
  • que la rellene mediante el método fill / fillna, indicando el valor y si queremos, sobre qué columnas aplicar la modificación:

    # Rellenamos los zips vacíos por 99999
    malDF.na.fill("99999", subset=["Zip"]).show()
    # malDF.na.fill("99999", ["Zip"]).show()
    # malDF.fillna({"Zip": "99999"})
    # +---------+----------+-----+-----+-------+-------+
    # |ProductID|      Date|  Zip|Units|Revenue|Country|
    # +---------+----------+-----+-----+-------+-------+
    # |     6666|2022-03-22|03206|   33|3333.33|  Spain|
    # |     6666|2022-03-22|99999|   33|3333.33|  Spain|
    # |     6666|2022-03-23|03206| null|2222.22|  Spain|
    # |     6666|2022-03-24|03206| null|   null| Espain|
    # |     null|      null|99999| null|   null|   null|
    # +---------+----------+-----+-----+-------+-------+
    
  • que la sustituya mediante el método replace

    # Cambiamos Espain por Spain
    malDF.na.replace("Espain", "Spain").show()
    # +---------+----------+-----+-----+-------+-------+
    # |ProductID|      Date|  Zip|Units|Revenue|Country|
    # +---------+----------+-----+-----+-------+-------+
    # |     6666|2022-03-22|03206|   33|3333.33|  Spain|
    # |     6666|2022-03-22| null|   33|3333.33|  Spain|
    # |     6666|2022-03-23|03206| null|2222.22|  Spain|
    # |     6666|2022-03-24|03206| null|   null|  Spain|
    # |     null|      null| null| null|   null|   null|
    # +---------+----------+-----+-----+-------+-------+
    

Otro caso muy común es realizar una operación sobre una columna para transformar su valor, por ejemplo, pasar todo el texto a minúsculas o dividir una columna entre 100 para cambiar la escala.

En nuestro caso, vamos a modificar las columnas Zip y Country para realizar un trim y borrar los espacios en blanco:

from pyspark.sql.functions import col, trim
df = df.withColumn("Country", trim(col("Country"))).withColumn("Zip", trim(col("Zip")))

Usando SQL

En la era del big data, SQL es la lengua franca, permitiendo a perfiles con pocos conocimientos de programación trabajar de forma eficiente con los datos (siempre poniendo el foco en la analítica de datos, no en el procesamiento transaccional).

Spark soporta el ANSI SQL 2003, ampliamente establecido en el mundo de las bases de datos.

Para correr SQL en Spark podemos hacerlo a través de:

  • El cliente SQL, es cual se ofrece como un comando en ./bin/spark-sql
  • Mediante un servidor ODBC/JDBC
  • De forma programativa mediante aplicaciones Spark.

Las dos primeras opciones se integran con Apache Hive para utilizar su metastore. Ahora nos vamos a centrar en la última.

Vistas temporales

Ya hemos visto que los DataFrames tienen una estructura similar a una tabla de una base de datos relacional. Para poder realizar consultas, necesitaremos crear vistas temporales mediante el método createOrReplaceTempView para posteriormente realizar una consulta sobre la vista creada a través de spark.sql:

# 1. definimos la vista
df.createOrReplaceTempView("ventas")
# 2. realizamos la consulta
ventasCanada = spark.sql("select * from ventas where trim(Country)='Canada'")
ventasCanada.show(3)
# +---------+---------+---------------+-----+-------+-------+
# |ProductID|     Date|            Zip|Units|Revenue|Country|
# +---------+---------+---------------+-----+-------+-------+
# |      725|1/15/1999|H1B            |    1|  115.4|Canada |
# |     2235|1/15/1999|H1B            |    2|  131.1|Canada |
# |      713|1/15/1999|H1B            |    1|  160.1|Canada |
# +---------+---------+---------------+-----+-------+-------+
# only showing top 3 rows

Vistas globales

Las vistas temporales tienen un alcance de SparkSession, de manera que desaparecen una vez finalice la sesión que ha creado la vista. Si necesitamos tener una vista que se comparta entre todas las sesiones y que permanezca viva hasta que la aplicación Spark finalice, podemos crear una vista temporal global.

Estas vistas se almacenan en la base de datos global_temp y en las consultas es necesario poner el prefijo global_temp para acceder a la sus vistas.

# 1. definimos la vista global
df.createOrReplaceGlobalTempView("ventasg")
# 2. realizamos la consulta
ventasCanadaG = spark.sql("select * from global_temp.ventasg where trim(Country)='Canada'")
ventasCanadaG.show(3)
# +---------+---------+---------------+-----+-------+-------+
# |ProductID|     Date|            Zip|Units|Revenue|Country|
# +---------+---------+---------------+-----+-------+-------+
# |      725|1/15/1999|H1B            |    1|  115.4|Canada |
# |     2235|1/15/1999|H1B            |    2|  131.1|Canada |
# |      713|1/15/1999|H1B            |    1|  160.1|Canada |
# +---------+---------+---------------+-----+-------+-------+
# only showing top 3 rows

# Creamos otra sesión y vemos como funciona
spark.newSession().sql("select count(*) from global_temp.ventasg where trim(Country)='Canada'").show()

Tablas

Spark permite crear bases de datos y tablas y almacenar los datos y metadatos en estructuras similares a como lo realiza Hive en un metastore central. De esta manera, vamos a poder crear tablas gestionadas y no gestionadas.

Para organizar las tablas y las vistas, es recomendable crear una base de datos. De la misma manera que hemos creado sentencias SQL, podemos generar sentencias DML:

spark.sql("create database if not exists s8a")
spark.sql("use database s8a")

Si queremos consultar las bases de datos existentes podemos:

spark.catalog.listDatabases()
# [Database(name='default', description='Default Hive database', locationUri="file:...spark-warehouse"),
#  Database(name='s8a', description='', locationUri="file:.../spark-warehouse/s8a.db")]
spark.sql("show databases").show()
# +---------+
# |namespace|
# +---------+
# |  default|
# |      s8a|
# +---------+

A continuación, podemos crear una tabla gestionada mediante saveAsTable:

df.write.format("parquet").mode("overwrite").saveAsTable("ventast")

De manera que podemos comprobar su estructura con SQL:

spark.sql("describe table ventast").show()
# +---------+---------+-------+
# | col_name|data_type|comment|
# +---------+---------+-------+
# |ProductID|      int|   null|
# |     Date|   string|   null|
# |      Zip|   string|   null|
# |    Units|      int|   null|
# |  Revenue|   double|   null|
# |  Country|   string|   null|
# +---------+---------+-------+

Si queremos crear una tabla no gestionada, también conocida como tabla externa, necesitamos indicar la ruta de los datos en el momento de creación:

spark.sql("""CREATE TABLE ventase(ProductID INT, Date STRING, 
  Zip STRING, Units INT, Revenue DOUBLE, Country STRING) 
  USING csv OPTIONS (PATH 
  '/pdi_sales_small.csv')""")

Para ello, necesitamos colocar el archivo de datos dentro del almacén de metastore, que en nuestro caso es spark-warehouse/s8a.db/

También podemos crear la tabla indicando la opción path:

df.write.option("path", "/tmp/datos/ventas").saveAsTable("ventaset")

Internamente, podemos configurar Spark para trabajar con Hive como motor de almacenamiento. Más información en la documentación oficial.

Trabajando con Databricks

En la sesión anterior ya vimos como crear RDDs con Databricks. En esta ocasión, vamos a trabajar mediante DataFrames y SQL para ver toda la flexibilidad que nos aporta.

Una vez creado de nuevo el cluster, vamos a cargar los datos mediante la opción Data, subiendo el archivo pdi_sales_small.csv:

Subiendo datos a Databricks

Una vez cargado el archivo, pulsamos sobre el botón Create table in notebook de manera que nos crea un cuaderno Jupyter donde podemos consultar los datos y crear una vista temporal:

Cargados los datos en un DataFrame

Para que funcione correctamente con nuestro datos, vamos a modificar el código:

infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"

Y tras cargar el dataset, antes de crear la vista, vamos a limpiar los países:

from pyspark.sql.functions import trim
df = df.withColumn("Country", trim(df.Country))

Datos visuales

Si volvemos a ejecutar el cuaderno, ahora sí que cargará correctamente los datos. Si nos vamos a la celda que realiza una consulta sobre todos los datos, podemos ver en la parte superior derecha como el lenguaje empleado en la celda es SQL, por ello la primera línea comienza con %sql, y a continuación ya podemos introducir directamente código SQL, teniendo la opción de visualizar los datos tanto en modo texto como mediante gráficos:

Datos y gráficos mediante SQL

Cuadro de mandos

Además, con las tablas y/o gráficos que generamos dentro de Databricks, podemos generar un sencillo cuadro de mandos.

Vamos a crear un par de consultas, una para obtener las ventas medias por país:

%sql
select Country, avg(Revenue) as ventas
from pdi_sales_small_csv
group by Country
order by ventas desc

Y otra para las unidas pedidas por cada país:

%sql
select Country, sum(Units) as pedidos
from pdi_sales_small_csv
group by Country
order by pedidos desc

Si pulsamos sobre el icono del gráfico de barras de la esquina superior derecha de una celda SQL, podemos añadir el resultado de la celda a un dashboard:

Creando un cuadro de mandos

Una vez creado, sólo tenemos que seleccionar las celdas que queramos, e ir añadiéndolas al cuadro de mandos creado. Posteriormente, podemos abrirlo, resituar los elementos y visualizarlo:

Añadiendo elementos a un cuadro de mandos

Agregaciones

Una vez tenemos un DataFrame, podemos realizar analítica de datos sobre el dataset entero, o sobre una o más columnas y aplicar una función de agregación que permita sumar, contar o calcular la media de cualquier grupo, entre otras opciones.

Para ello, PySpark ofrece un amplio conjunto de funciones. En nuestro caso, vamos a realizar algunos ejemplos para practicar con las funciones más empleadas.

Contando

count: Devuelve la cantidad de elementos no nulos:

from pyspark.sql.functions import count
df.select(count("Country")).show()
# +--------------+
# |count(Country)|
# +--------------+
# |        120239|
# +--------------+

count_distinct / countDistinct: Devuelve la cantidad de elementos no nulos diferentes:

from pyspark.sql.functions import count_distinct
df.select(count_distinct("Country"), count_distinct("Zip")).show()
# +-----------------------+-------------------+
# |count(DISTINCT Country)|count(DISTINCT Zip)|
# +-----------------------+-------------------+
# |                      4|               2585|
# +-----------------------+-------------------+

approx_count_distinct / approxCountDistinct: Devuelve aproximadamente la cantidad de elementos no nulos diferentes (puede recibir un segundo parámetro la máximo desviación estándar admitida). Este método es mucho más rápido que contar exactamente el número de resultado, y para datasets muy grandes, en ocasiones puede ser útil:

from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("Country"), approx_count_distinct("Zip")).show()
# +------------------------------+--------------------------+
# |approx_count_distinct(Country)|approx_count_distinct(Zip)|
# +------------------------------+--------------------------+
# |                             4|                      2737|
# +------------------------------+--------------------------+

Calculando

min y max permiten obtener el menor y el mayor valor respectivamente:

from pyspark.sql.functions import min, max
df.select(min("Units"), max("Units")).show()
# +----------+----------+
# |min(Units)|max(Units)|
# +----------+----------+
# |         1|        77|
# +----------+----------+

sum permite sumar todos los valores de una columna:

from pyspark.sql.functions import sum
df.select(sum("Units"), sum("Revenue")).show()
# +----------+--------------------+
# |sum(Units)|        sum(Revenue)|
# +----------+--------------------+
# |    125728|5.0107274999986745E7|
# +----------+--------------------+

sum_distinct / sumDistinct suma los valores diferentes de una columna:

from pyspark.sql.functions import sum_distinct
df.select(sum_distinct("Units"), sum_distinct("Revenue")).show()
# +-------------------+---------------------+
# |sum(DISTINCT Units)|sum(DISTINCT Revenue)|
# +-------------------+---------------------+
# |                308|   1189127.0999999985|
# +-------------------+---------------------+

avg calcula la media aritmética:

from pyspark.sql.functions import sum, count, avg
df.select(avg("Revenue"), sum("Revenue")/count("Revenue")).show()
# +-----------------+-------------------------------+
# |     avg(Revenue)|(sum(Revenue) / count(Revenue))|
# +-----------------+-------------------------------+
# |416.7306364822291|              416.7306364822291|
# +-----------------+-------------------------------+

Asimetría, varianza y desviación estándar

Si nos interesa obtener información estadística sobre los datos, también disponemos de las funciones skewness, kurtosis, variance, var_pop, stddev y stddev_pop.

Agrupando

Si agrupamos varias columnas de tipo categóricas (con una cardinalidad baja), podemos realizar cálculos sobre el resto de columnas.

Sobre un DataFrame, podemos agrupar los datos por la columna que queramos utilizando el método groupBy, el cual nos devuelve un GroupedData, sobre el que posteriormente realizar operaciones como avg(cols), count(), mean(cols), min(cols), max(cols) o sum(cols):

from pyspark.sql.functions import sum
df.groupBy("Country").count().show()
# +-------+-----+
# |Country|count|
# +-------+-----+
# |Germany|30059|
# | France|30060|
# | Mexico|30060|
# | Canada|30060|
# +-------+-----+
from pyspark.sql.functions import sum
df.groupBy("Country").sum("Revenue").show()
# +-------+--------------------+
# |Country|        sum(Revenue)|
# +-------+--------------------+
# |Germany|1.4982119999999512E7|
# | France|1.2087942100000832E7|
# | Mexico| 1.139459870000116E7|
# | Canada|1.1642614200001905E7|
# +-------+--------------------+

Si necesitamos realizar más de un agregación sobre el mismo grupo, mediante agg podemos indicar una o más expresiones de columnas:

df.groupBy("Country").agg(sum("Revenue"), count("Revenue")).show()
# +-------+--------------------+--------------+
# |Country|        sum(Revenue)|count(Revenue)|
# +-------+--------------------+--------------+
# |Germany|1.4982119999999512E7|         30059|
# | France|1.2087942100000832E7|         30060|
# | Mexico| 1.139459870000116E7|         30060|
# | Canada|1.1642614200001905E7|         30060|
# +-------+--------------------+--------------+

También podemos indicar los elementos a calcular mediante un diccionario donde las claves son los campos y los valores la función a calcular:

df.groupBy("Country").agg({"Zip":"count", "Revenue":"avg"}).show()
# +-------+----------+------------------+
# |Country|count(Zip)|      avg(Revenue)|
# +-------+----------+------------------+
# |Germany|     30059| 498.4237665923521|
# | France|     30060| 402.1271490352905|
# | Mexico|     30060| 379.0618330007039|
# | Canada|     30060|387.31251497012323|
# +-------+----------+------------------+

Agrupando colecciones

En ocasiones necesitamos agrupar en una colección todos los valores para un grupo en particular. Para ello, podemos usar collect_list (con repetidos) o collect_set (sin repeticiones):

Por ejemplo, para cada país, vamos a recuperar un listado con los códigos postales de aquellos pedidos que hayan superado las 5 unidades:

from pyspark.sql.functions import collect_list, collect_set
df.where("Units > 5").groupBy("Country").agg(collect_list("Zip"), collect_set("Zip")).show()
# +-------+--------------------+--------------------+
# |Country|   collect_list(Zip)|    collect_set(Zip)|
# +-------+--------------------+--------------------+
# |Germany|[22397, 22111, 40...|[22111, 12589, 22...|
# | France|[75213 CEDEX 16, ...|[06082 CEDEX 1, 0...|
# | Mexico|[7100, 7810, 9739...|[9739, 10300, 781...|
# | Canada|[T2X, V6G, V6G, T6V]|     [V6G, T2X, T6V]|
# +-------+--------------------+--------------------+

Tablas pivote

Las tablas pivote permite obtener un resumen de los datos a partir de columnas categóricas sobre la que realizar cálculos, tal como se hace en las hojas de cálculo con las tablas dinámicas.

Por ejemplo, vamos a obtener la cantidad recaudada por las ventas de cada año por cada pais:

df.groupBy(year("Date")).pivot("Country").sum("Revenue").show()
# +----------+------------------+------------------+------------------+------------------+
# |year(Date)|            Canada|            France|           Germany|            Mexico|
# +----------+------------------+------------------+------------------+------------------+
# |      2003| 2360085.999999947|1105230.9000000046|1407120.0000000007|         1049457.5|
# |      2004| 1539140.499999946|              null|              null|              null|
# |      2001| 2193437.799999908|              null|              null|233419.20000000004|
# |      2000|1806678.3999999042|1108846.8999999764| 4510606.799999941| 4240448.399999928|
# |      1999|1382756.6999999764| 7594921.200000435| 5928459.100000297|3419368.2000001906|
# |      2002|2360514.7999998857| 2278943.099999957| 3135934.099999964|2451905.3999999263|
# +----------+------------------+------------------+------------------+------------------+

También podemos hacer más de un cálculo sobre la tabla pivote:

df.groupBy(year("Date")).pivot("Country").agg(sum("Revenue").alias("total"), sum("Units").alias("cantidad")).show()
# +----------+------------------+---------------+------------------+---------------+------------------+----------------+------------------+---------------+
# |year(Date)|      Canada_total|Canada_cantidad|      France_total|France_cantidad|     Germany_total|Germany_cantidad|      Mexico_total|Mexico_cantidad|
# +----------+------------------+---------------+------------------+---------------+------------------+----------------+------------------+---------------+
# |      2003| 2360085.999999947|           6375|1105230.9000000046|           2794|1407120.0000000007|            3099|         1049457.5|           2510|
# |      2004| 1539140.499999946|           3636|              null|           null|              null|            null|              null|           null|
# |      2001| 2193437.799999908|           5976|              null|           null|              null|            null|233419.20000000004|            583|
# |      2000|1806678.3999999042|           5049|1108846.8999999764|           2456| 4510606.799999941|            9738| 4240448.399999928|          11935|
# |      1999|1382756.6999999764|           3964| 7594921.200000435|          20432| 5928459.100000297|           12266|3419368.2000001906|           9895|
# |      2002|2360514.7999998857|           6148| 2278943.099999957|           6057| 3135934.099999964|            6643|2451905.3999999263|           6172|
# +----------+------------------+---------------+------------------+---------------+------------------+----------------+------------------+---------------+

Joins

Hasta ahora todo la analítica la hemos realizado sobre un único DataFrame. Aunque si seguimos un proceso ELT es probable que tengamos todos los datos en un único lugar, en ocasiones necesitamos cruzar la información de dos datasets.

Si nos basamos en el planteamiento de una base de datos relacional, Para unir dos DataFrames necesitamos unir la clave ajena de uno con la clave primaria del otro.

Para estos ejemplos, vamos a cambiar de datasets y utilizar datos de vuelos de avión que han tenido algún tipo de retraso (departure_delays.csv) y otro con los códigos de los aeropuertos (airport-codes-na.tsv).

Fichero CSV con la coma como separador de campos.

departure_delays.csv
date,delay,distance,origin,destination
01011245,6,602,ABE,ATL
01020600,-8,369,ABE,DTW
01021245,-2,602,ABE,ATL
01020605,-4,602,ABE,ATL

Fichero TSV con el tabulador como separador campos

airport-codes-na.tsv
City State Country IATA
Abbotsford BC Canada YXX
Aberdeen SD USA ABR
Abilene TX USA ABI
Akron OH USA CAK
Alamosa CO USA ALS
Albany GA USA ABY

Así pues, lo primero que vamos a hacer es cargar ambos DataFrames:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-dataframes-joins").getOrCreate()

df_vuelos = spark.read.option("sep",",").option("header", "true").option("inferSchema", "true").csv("departure_delays.csv")
# df_vuelos.printSchema()
# df_vuelos.count()   # 1391578
df_aeropuertos = spark.read.option("sep","\t").option("header", "true").option("inferSchema", "true").csv("airport-codes-na.tsv")
# df_aeropuertos.printSchema()

Mediante SQL

Si queremos hacer un join mediante SQL, sólo tenemos que emplear la misma sintaxis que con cualquier sistema relacional, de manera que primero crearemos las vistas temporales y luego realizaremos la consulta:

df_vuelos.createOrReplaceTempView("vuelos")
df_aeropuertos.createOrReplaceTempView("aeropuertos")

df_join = spark.sql("select v.*, a.City as originCity, b.City as destinationCity from vuelos v JOIN aeropuertos a on v.origin == a.IATA join aeropuertos b on v.destination = b.IATA")
# df_join.count()   # 1361141
df_join.show(3)


# +-------+-----+--------+------+-----------+----------+---------------+
# |   date|delay|distance|origin|destination|originCity|destinationCity|
# +-------+-----+--------+------+-----------+----------+---------------+
# |1011245|    6|     602|   ABE|        ATL| Allentown|        Atlanta|
# |1020600|   -8|     369|   ABE|        DTW| Allentown|        Detroit|
# |1021245|   -2|     602|   ABE|        ATL| Allentown|        Atlanta|
# +-------+-----+--------+------+-----------+----------+---------------+
# only showing top 3 rows

Si tuviéramos algún vuelo con algún código que no tuviéramos disponible en el dataset con los códigos de aeropuertos no nos aparecería. Por tanto, sería más conveniente realizar un left join:

df_left_join = spark.sql("select v.*, a.City as originCity, b.City as destinationCity from vuelos v LEFT JOIN aeropuertos a on v.origin == a.IATA LEFT JOIN aeropuertos b on v.destination = b.IATA")
df_left_join.show(3)
df_left_join.count()    # 1391578

Un caso particular que conviene conocer es el left anti join. Este tipo de join permite obtener aquellos registros de la izquierda que no aparecen en la parte derecha, de manera que si seguimos con el ejemplo, podemos recuperar aquellos vuelos cuyos aeropuertos no tenemos en el dataset con los códigos:

df_left_anti_join = spark.sql("select * from vuelos v LEFT ANTI JOIN aeropuertos a ON v.origin == a.IATA ")
df_left_anti_join.count()   # 14416

Mediante Python

Si no queremos utilizar SQL o ya tenemos fragmentos de código que interactuar con el DataFrame API, podemos utilizar el método join.

Este método une dos DataFrames, indicando la expresión de unión y opcionalmente el tipo:

exprJoin1 = df_vuelos.origin == df_aeropuertos.IATA
df_joinp1 = df_vuelos.join(df_aeropuertos, exprJoin1, "inner")
df_joinp1.count()    # 1377162

Forma corta

Si las columnas que unen los DataFrames tienen el mismo nombre, podemos simplificar el código indicando únicamente su nombre:

df1.join(df2, "user_id")

Además, si queremos hacer un inner join, podemos no indicarlo ya que es el tipo por defecto.

Como en nuestro caso, teníamos dos joins, tanto para los vuelos de origen como los de destino, necesitamos volver a unir:

from pyspark.sql.functions import col
# le indicamos alias a los campos para eliminar ambigüedades
df_joinp2 = (df_joinp1.alias("a")).join((df_aeropuertos.alias("b")), col("a.destination") == col("b.IATA"), "inner")
df_joinp2.count()    # 1361141

En vez de pasarle inner, le podemos indicar el tipo de join: left, right, cross, left_anti, etc...

exprJoin1 = df_vuelos.origin == df_aeropuertos.IATA
df_left_anti_join = df_vuelos.join(df_aeropuertos, exprJoin1, "left_anti")
df_left_anti_join.count()   # 14416

Todo tipo de joins

Además de los casos vistos, podemos realizar otros tipos de joins como cross, semi, full, outer, etc... Más información en la documentación oficial

Funciones

Para dominar realmente Spark, hay que tener destreza en todas las funciones existente para el tratamiento de fechas, cadenas, operaciones matemáticas, para trabajar con colecciones, etc...

Además, siempre podemos crear nuestras propias funciones de usuario para ampliar el lenguaje.

Aunque ya hemos utilizado algunas a lo largo de los apuntes, a continuación vamos a repasar las funciones más empleadas.

Fechas

Más información en la documentación oficial

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date

spark = SparkSession.builder.appName("s8a-dataframes-sql").getOrCreate()

df = spark.read.option("sep",";").option("header", "true").option("inferSchema", "true").csv("pdi_sales_small.csv")
# Cambiamos el tipo de dato a fecha
df = df.withColumn("Date", to_date(df.Date, "M/d/yyy"))

import pyspark.sql.functions
df.select("Date", date_format("Date", "dd-MM-yyy"),
            next_day("Date", "Sun"), last_day("Date"),
            dayofmonth("Date"), dayofyear("Date"),
            month("Date"), year("Date")).show(2)
# +----------+----------------------------+-------------------+--------------+----------------+---------------+-----------+----------+
# |      Date|date_format(Date, dd-MM-yyy)|next_day(Date, Sun)|last_day(Date)|dayofmonth(Date)|dayofyear(Date)|month(Date)|year(Date)|
# +----------+----------------------------+-------------------+--------------+----------------+---------------+-----------+----------+
# |1999-01-15|                  15-01-1999|         1999-01-17|    1999-01-31|              15|             15|          1|      1999|
# |2002-06-06|                  06-06-2002|         2002-06-09|    2002-06-30|               6|            157|          6|      2002|
# +----------+----------------------------+-------------------+--------------+----------------+---------------+-----------+----------+
# only showing top 2 rows

Cadenas

Por ejemplo, tenemos las funciones para quitar espacios (ltrim, rtrim, trim) y pasar a mayúsculas/minúsculas (lower, upper):

df.select("Zip", ltrim("Zip").alias("l"), rtrim("Zip").alias("r"), 
         lower("Zip"), upper("Zip")
         ).where(trim(df.Country)=="Canada").show(3)
# +---------------+---------------+---+---------------+---------------+
# |            Zip|              l|  r|     lower(Zip)|     upper(Zip)|
# +---------------+---------------+---+---------------+---------------+
# |H1B            |H1B            |H1B|h1b            |H1B            |
# +---------------+---------------+---+---------------+---------------+
# only showing top 1 row

O funciones para poner la inicial en mayúsculas (initcap), darle la vuelta (reverse), obtener su tamaño o reemplazar caracteres (translate):

df.select("Country", initcap("Country"), reverse("Country"),
          length("Country"), translate("Country", "na", "pe")
         ).where(trim(df.Country)=="Canada").show(1)
# +-------+----------------+----------------+---------------+--------------------------+
# |Country|initcap(Country)|reverse(Country)|length(Country)|translate(Country, na, pe)|
# +-------+----------------+----------------+---------------+--------------------------+
# |Canada |         Canada |          adanaC|              7|                   Cepede |
# +-------+----------------+----------------+---------------+--------------------------+
# only showing top 1 row

También podemos trabajar con subcadenas (substring), encontrar ocurrencias (locate) o partir una cadena en trozos (split):

df.select("Country", split("Country", "a"), locate("a", "Country"),
          substring("Country",3,2)
         ).where(trim(df.Country)=="Canada").show(1)
+-------+---------------------+---------------------+------------------------+
|Country|split(Country, a, -1)|locate(a, Country, 1)|substring(Country, 3, 2)|
+-------+---------------------+---------------------+------------------------+
|Canada |         [C, n, d,  ]|                    2|                      na|
+-------+---------------------+---------------------+------------------------+
only showing top 1 row

Otras funciones que se suelen utilizar son concat y concat_ws para unir cadenas, levenshtein para calcular la distancia entre dos cadenas, lpad y rpad para completar con espacios, etc... Si necesitas trabajar con expresiones regulares puedes utilizar regexp_extract para extraer parte de una cadena como regexp_replace para sustituir.

Colecciones

Para probar las funciones que trabajan con colecciones, vamos a cambiar de dataset y trabajar con uno compartido por Kaggle con datos de negocios de Yelp que tenemos almacenados en una versión reducida en yelp_academic_dataset_business.json. Los negocios tienen una propiedad denominada categories que contiene un array con las categorías de los mismos:

persons.json
{
   "business_id":"O_X3PGhk3Y5JWVi866qlJg",
   "full_address":"1501 W Bell Rd\nPhoenix, AZ 85023",
   "hours":{
      "Monday":{
         "close":"18:00",
         "open":"11:00"
      },
      "Tuesday":{
         "close":"18:00",
         "open":"11:00"
      },
        ...
   },
   "open":true,
   "categories":[
      "Active Life",
      "Arts & Entertainment",
      "Stadiums & Arenas",
      "Horse Racing"
   ],
   "city":"Phoenix",
   ...
}

El primer paso es cargar el documento y ver el esquema leído por Spark:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("s8a-dataframes-arrays").getOrCreate()

df = spark.read.option("inferSchema", "true").option("multiline",True).json("yelp_academic_dataset_business.json")

df.printSchema()

Como podemos observar, sigue una estructura de elementos anidados:

root
 |-- attributes: struct (nullable = true)
 |    |-- Accepts Credit Cards: boolean (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: struct (nullable = true)
 |    |    |-- casual: boolean (nullable = true)
 |    |    |-- classy: boolean (nullable = true)
 |    |    |-- divey: boolean (nullable = true)
 |    |    |-- hipster: boolean (nullable = true)
 |    |    |-- intimate: boolean (nullable = true)
 |    |    |-- romantic: boolean (nullable = true)
 |    |    |-- touristy: boolean (nullable = true)
 |    |    |-- trendy: boolean (nullable = true)
 |    |    |-- upscale: boolean (nullable = true)
 |    |-- Attire: string (nullable = true)
 ...
 |    |-- Wi-Fi: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- city: string (nullable = true)
 |-- full_address: string (nullable = true)
 ...

Por ejemplo, vamos a ver mediante un ejemplo las siguientes funciones:

  • size: devuelve el tamaño de la colección
  • sort_array: ordena la colección
  • array_contains: comprueba si hay un elemento en la colección
from pyspark.sql.functions import *
df.select("name", "hours.Sunday", size("categories").alias("totalCategorias"),
               sort_array("categories").alias("categorias"),
               array_contains("categories", "Restaurants").alias("Restaurantes")).show(10, truncate=False)
# +-------------------------------+--------------+---------------+---------------------------------------------------------------------------------+------------+
# |name                           |Sunday        |totalCategorias|categorias                                                                       |Restaurantes|
# +-------------------------------+--------------+---------------+---------------------------------------------------------------------------------+------------+
# |Turf Paradise Race Course      |{18:00, 11:00}|4              |[Active Life, Arts & Entertainment, Horse Racing, Stadiums & Arenas]             |false       |
# |Sam's Club Members Only        |null          |5              |[Automotive, Department Stores, Fashion, Shopping, Tires]                        |false       |
# |Forever 21                     |{18:00, 11:00}|5              |[Accessories, Fashion, Men's Clothing, Shopping, Women's Clothing]               |false       |
# |Loving Hands Pet Care          |{19:00, 06:00}|3              |[Pet Boarding/Pet Sitting, Pet Services, Pets]                                   |false       |
# |Amec Mid-City Animal Hospital  |null          |2              |[Pets, Veterinarians]                                                            |false       |
# |Los Armandos Asadero Y Mariscos|{03:00, 20:00}|2              |[Mexican, Restaurants]                                                           |true        |
# |Clayton Companies              |null          |4              |[Home Services, Property Management, Real Estate, Real Estate Services]          |false       |
# |Bertha's Café                  |null          |5              |[Bakeries, Breakfast & Brunch, Food, Restaurants, Sandwiches]                    |true        |
# |Jerry's Artarama               |{17:00, 11:00}|4              |[Art Supplies, Arts & Crafts, Framing, Shopping]                                 |false       |
# |Shauna Brown Fitness           |null          |5              |[Active Life, Fitness & Instruction, Health & Medical, Massage Therapy, Trainers]|false       |
# +-------------------------------+--------------+---------------+---------------------------------------------------------------------------------+------------+
only showing top 10 rows

Tip

Recuerda que en el apartado Agrupando colecciones vimos como podemos crear colecciones al realizar una agrupación.

Así pues, además del nombre, hemos obtenido el horario de los domingos utilizando la notación . para acceder a los campos anidados, la cantidad de categorías de cada comercio, un listado ordenado con sus categorías y finalmente si es un restaurante.

Otro tipo de operación que podemos realizar es desenrollar una colección mediante la función explode y generar una fila nueva por cada elemento de la colección:

df.select("name", explode("categories")).show(10, truncate=False)
# +-------------------------+--------------------+
# |name                     |col                 |
# +-------------------------+--------------------+
# |Turf Paradise Race Course|Active Life         |
# |Turf Paradise Race Course|Arts & Entertainment|
# |Turf Paradise Race Course|Stadiums & Arenas   |
# |Turf Paradise Race Course|Horse Racing        |
# |Sam's Club Members Only  |Tires               |
# |Sam's Club Members Only  |Automotive          |
# |Sam's Club Members Only  |Fashion             |
# |Sam's Club Members Only  |Shopping            |
# |Sam's Club Members Only  |Department Stores   |
# |Forever 21               |Women's Clothing    |
# +-------------------------+--------------------+
# only showing top 10 rows

JSON

Es común que se de el caso de que los datos que leemos desde un sistema externo estén en formato JSON pero que el proceso de ingesta lo haya realizado como si fuera una cadena de texto.

Supongamos que tenemos la siguiente cadena y generados un DataFrame a partir de un RDD:

tareas = ["""{"dia": "Lunes", "tareas": ["Corregir ejercicios", "Ir a nadar", "Comprar pan"]}"""]
# ['{"dia": "Lunes", "tareas": ["Corregir ejercicios", "Ir a nadar", "Comprar pan"]}']
tareasRDD = spark.sparkContext.parallelize(tareas)
tareasStrDF = tareasRDD.toDF("string")
# tareasStrDF es un DF con una columna con nombre value de tipo string
tareasStrDF.printSchema()
# root
#  |-- value: string (nullable = true)
tareasStrDF.show()
# +--------------------+
# |               value|
# +--------------------+
# |{"dia": "Lunes", ...|
# +--------------------+

Para pasarlo a JSON, necesitamos definir un esquema con la estructura del documento JSON:

from pyspark.sql.types import StructType, StructField, StringType, ArrayType

esquemaTareas = StructType([
    StructField("dia", StringType(), False),
    StructField("tareas", ArrayType(StringType(), False), False)
])

Y a continuación ya podemos transformar el formato mediante la función from_json:

todosDF = tareasStrDF.select(from_json("value", esquemaTareas).alias("datos"))
todosDF.printSchema()
# root
#  |-- datos: struct (nullable = true)
#  |    |-- dia: string (nullable = true)
#  |    |-- tareas: array (nullable = true)
#  |    |    |-- element: string (containsNull = true)

Y ahora ya podemos acceder a los datos (en el siguiente ejemplo empleamos la función getItem para acceder a un elemento de una columna):

todosDF.select(col("datos").getItem("dia"),
     "datos.tareas",
     (todosDF.datos.getItem("tareas")[0]).alias("tarea1")).show(truncate=False)
+---------+----------------------------------------------+-------------------+
|datos.dia|tareas                                        |tarea1             |
+---------+----------------------------------------------+-------------------+
|Lunes    |[Corregir ejercicios, Ir a nadar, Comprar pan]|Corregir ejercicios|
+---------+----------------------------------------------+-------------------+

Para terminar, si necesitamos la operación inversa, y lo que queremos es crear una representación JSON de una columna, podemos utilizar la función getItem:

todosDF.select(to_json("datos")).show(truncate=False)
# +---------------------------------------------------------------------------+
# |to_json(datos)                                                             |
# +---------------------------------------------------------------------------+
# |{"dia":"Lunes","tareas":["Corregir ejercicios","Ir a nadar","Comprar pan"]}|
# +---------------------------------------------------------------------------+

UDF

Además de las funciones que ofrece Spark, en cualquier momento podemos crear nuestras funciones de usuario (User-Defined Functions) para ampliar la expresividad de Spark. Antes de utilizarlas, las hemos de definir y registrar.

Si volvemos al dataset de ventas, teníamos la siguiente información:

df.select("ProductID", "Revenue", "Units").sort("Units", ascending=False).show(5)
# +---------+-------+-----+
# |ProductID|Revenue|Units|
# +---------+-------+-----+
# |      495|43194.1|   77|
# |     2091| 6347.7|   41|
# |     2091| 6240.1|   41|
# |     2091| 3652.7|   24|
# |     2091| 3560.9|   23|
# +---------+-------+-----+
# only showing top 5 rows

Vamos a crear una función para que, si vende más de una unidad, se le asigne a cada producto un bonus de un 1%. Para ello, primero definiremos la función mediante Python, y posteriormente, la registraremos mediante la función udf:

from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def bonus(unidades, ventas):
    if unidades == 1 :
        return 0.0
    else:
        return unidades * ventas / 100

udfBonus = udf(bonus, DoubleType())

Así pues, si realizamos una consulta, ya podemos utilizar la función recién creada como si fuera una propia de Spark:

df.select("ProductID", "Revenue", "Units", udfBonus(df.Units, df.Revenue)).sort("Units", ascending=False).show(5)
# +---------+-------+-----+---------------------+
# |ProductID|Revenue|Units|bonus(Units, Revenue)|
# +---------+-------+-----+---------------------+
# |      495|43194.1|   77|   33259.456999999995|
# |     2091| 6347.7|   41|             2602.557|
# |     2091| 6240.1|   41|   2558.4410000000003|
# |     2091| 3652.7|   24|    876.6479999999999|
# |     2091| 3560.9|   23|              819.007|
# +---------+-------+-----+---------------------+
# only showing top 5 rows

Si queremos definir la función para poder utilizarla dentro de Spark SQL y obtener el mismo resultado, hemos de registrar la función mediante spark.udf.register, la cual recibe el nombre que le asignaremos a la función, el nombre de la función Python a invocar, y el tipo de dato que devuelve:

spark.udf.register("udfBonus", bonus, DoubleType())
spark.sql("select ProductID, Revenue, Units,  udfBonus(Units, Revenue) as bonus from ventas order by Units desc").show(5)

UDF y Python

En un principio, se desaconseja la creación de UDF mediante Python, ya que su uso penalizar de forma significativa el rendimiento. Los ejecutores son procesos en máquinas virtuales de Java que están escritos en Java, y por ello, ejecutan código Java o Scala de forma nativa. En cambio, para Python tiene que ejecutar un proceso separado para ejecutar la UDF, lo que implica un coste extra para serializar y volver a deserializar los datos para cada fila del dataset.

Persistencia

Un DataFrame se puede persistir/cachear en memoria conforme necesitemos (también lo podemos hacer con los RDD). Su principal propósito es cuando vamos a acceder a un DataFrame una y otra vez y no necesitamos que se vuelvan a evaluar todas las operaciones (como pueden ser los algoritmos iterativos utilizados en Machine Learning).

Cuando persistimos un dataset, cada nodo almacena sus datos particionados en memoria y/o disco y los reutiliza en otras operaciones sobre dicho dataset.

Para ello, se emplean los métodos cache / persist y unpersist para cachear y liberar los datos.

df.persist()
df.count()  # forzamos la evaluación perezosa

Si queremos realizarlo con SparkSQL:

ventasCanada.createOrReplaceTempView("ventasCanada")
// Si queremos cachear la tabla mediante SQl
spark.catalog.cacheTable("ventasCanada")

Una vez persistidos los datos, si accedemos a http://localhost:4040 veremos en la pestaña Storage que se ha creado la tabla, su tipo de almacenamiento y particiones cacheadas:

Elementos cacheados con Spark UI

Una diferencia fundamental a la hora de persistir un DataFrame en comparación con un RDD, es que como Spark SQL conoce el esquema de los datos en el DataFrame, puede organizarlos de forma columnar y aplicar compresión sobre éstos para minimizar el espacio necesario.

DataFrames y Pandas

En cualquier momento podemos pasar los datos de un DataFrame de PySpark a uno de Pandas para poder aprovechar su API.

Si seguimos con el dataset de Yelp, vamos a preparar una consulta de nos devuelva la cantidad de votos recibidos y puntuación media de cada ciudad:

from pyspark.sql.functions import count, avg, round
dfVotosCiudades = df.groupBy("city").agg(count("city").alias("votos"), round(avg("stars"), 3).alias("media")).orderBy("votos", ascending=False).limit(10)
dfVotosCiudades.show()
# +----------+-----+-----+
# |      city|votos|media|
# +----------+-----+-----+
# |   Phoenix| 5492|3.658|
# |Scottsdale| 2617|3.809|
# |     Tempe| 1444| 3.64|
# |      Mesa| 1348|3.644|
# |  Chandler| 1178|3.677|
# |  Glendale|  821|3.588|
# |   Gilbert|  630|3.755|
# |    Peoria|  385|3.614|
# |  Surprise|  241|3.598|
# |  Goodyear|  214|3.498|
# +----------+-----+-----+

Nos traemos esos datos a Pandas mediante el método .toPandas().:

pdVC = dfVotosCiudades.toPandas()

A partir de este momento pdVC es un DataFrame de Pandas:

Conversión a un DataFrame de Pandas

Y con el DataFrame de Pandas, ya podemos generar gráficos:

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(10,6))
plt.ticklabel_format(useOffset=False, style="plain")
sns.set_theme(style="whitegrid")
sns.barplot(x="votos", y="city", data=pdVC).set_title("Votos por Ciudad")
plt.xlabel("Votos emitidos")
plt.ylabel("Ciudades")

plt.show()
Gráfico generado mediante Pandas y Spark

O por ejemplo, si queremos unir dos gráficos:

plt.figure(figsize=(10,6))
sns.set_theme(style="white")
ax = sns.barplot(data = pdVC, y="votos", x="city")

ax2 = ax.twinx()
sns.lineplot(data = pdVC['media'], marker='o', color='crimson', ax=ax2)
plt.show()

Obteniendo:

Gráfico generado mediante Pandas y Spark

Out of Memory

Mucho cuidado al utilizar Pandas, ya que al convertir el DataFrame nos vamos a traer todos los datos al driver, perdiendo la distribución de los datos y pudiendo provocar un error de falta de memoria.

Así pues, hay que evitar a toda costa utilizar Pandas para tratar los datos, ya que perdemos toda la potencia de trabajo en clúster (Pandas sólo puede utilizar los recursos del nodo principal). Únicamente lo utilizaremos cuando vayamos a visualizar los datos mediante Matplotlib / Seaborn como requisito de estas librerías.

Referencias

Actividades

  1. A partir del archivo nombres.json, crea un DataFrame y realiza las siguientes operaciones:

    1. Crea una nueva columna (columna Mayor30) que indique si la persona es mayor de 30 años.
    2. Crea una nueva columna (columna FaltanJubilación) que calcule cuantos años le faltan para jubilarse (supongamos que se jubila a los 67 años)
    3. Crea una nueva columna (columna Apellidos) que contenga XYZ (puedes utilizar la función lit)
    4. Elimina las columna Mayor30 y Apellidos?
    5. Crea una nueva columna (columna AnyoNac) con el año de nacimiento de cada persona (puedes utilizar la función current_date).
    6. Añade un id incremental para cada fila (campo Id) y haz que al hacer un show se vea en primer lugar (puedes utilizar la función monotonically_increasing_id) seguidos del Nombre, Edad, AnyoNac, FaltaJubilacion y Ciudad

    Al realizar los seis pasos, el resultado del DataFrame será similar a :

    +---+------+----+-------+----------------+--------+
    | Id|Nombre|Edad|AnyoNac|FaltanJubilacion|  Ciudad|
    +---+------+----+-------+----------------+--------+
    |  0| Aitor|  45|   1977|              22|   Elche|
    |  1| María|  23|   1999|              44|Alicante|
    |  2| Laura|  19|   2003|              48|   Elche|
    |  3| Sonia|  45|   1977|              22|    Aspe|
    |  4| Pedro|null|   null|            null|   Elche|
    +---+------+----+-------+----------------+--------+
    
  2. (opcional) A partir del archivo VentasNulos.csv:

    1. Elimina las filas que tengan al menos 4 nulos.

    2. Con las filas restantes, sustituye:

      1. Los nombres nulos por Empleado
      2. Las ventas nulas por la media de las ventas de los compañeros(redondeado a entero).
      3. Los euros nulos por el valor del compañero que menos € ha ganado
      4. La ciudad nula por C.V.
      5. El identificador nulo por XYZ

      Para los pasos ii) y iii) puedes crear un DataFrame que obtenga el valor a asignar y luego pasarlo como parámetro al método para rellenar los nulos.

  3. A partir del archivo movies.tsv, crea una esquema de forma declarativa con los campos:

    • interprete de tipo string
    • pelicula de tipo string
    • anyo de tipo int

    Cada fila del fichero implica que el actor/actriz ha trabajado en dicha película en el año indicado.

    1. Una vez creado el esquema, carga los datos en un DataFrame.
    2. ¿Cuantas películas diferentes hay?
    3. ¿En cuantas películas ha trabajado Murphy, Eddie (I)?
    4. Muestra los intérpretes que aparecen tanto en Superman como en Superman II.
    5. ¿Cuáles son los actores que han aparecido en más de 30 películas?
    6. ¿En que película anterior a 1980 aparecen al menos 25 intérpretes?
    7. Muestra la cantidad de películas producidas cada año (solo debe mostrar el año y la cantidad), ordenando el listado por la cantidad de forma descendente.
    8. A partir de la consulta anterior, crea un gráfico de barras que muestre el año y la cantidad de películas, ordenados por fecha.
  4. Vamos a seguir realizando analíticas de datos sobre las películas, ya que nos han enviado un nuevo archivo llamado movie-ratings.tsv que contiene las calificaciones de las películas.

    1. Crea un DataFrame que contenga los datos de ambos datasets.
    2. Muestra para cada año, la película con mayor puntuación (año, título de la película, puntuación)
    3. Sobre los datos anteriores, obtén también una lista con los nombres de los intérpretes.
    4. Averigua las tres parejas de intérpretes han trabajado juntos en más ocasiones. La salida debe tener tres columnas: interprete1, interprete2 y cantidad. (necesitas utilizar un self-join)
  5. Hemos recibido un dataset con las ventas de 2019 de una tienda americana de productos de tecnología, mediante un conjunto de ficheros en formato CSV comprimidos en salesdata.zip.

    1. Una vez descomprimidos los datos, crea un DataFrame con todos los datos, infiriendo el esquema.
    2. Vuelve a realizar la lectura de los datos pero con el siguiente esquema:

      from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
      esquema = StructType([
          StructField("Order ID", IntegerType(), False),
          StructField("Product", StringType(), False),
          StructField("Quantity Ordered", IntegerType(), True),
          StructField("Price Each", DoubleType(), False),
          StructField("Order Date", StringType(), False),
          StructField("Purchase Address", StringType(), False)
      ])
      
    3. Tras la lectura, vamos a realizar la limpieza de datos. El primer paso será renombrar la columnas para eliminar los espacios en blanco.

    4. Elimina las filas que contengan algún campo nulo.
    5. Comprueba si las cabeceras de los archivos aparecen como datos del dataset (por ejemplo, un producto cuyo nombre sea Product). Si fuera el caso, elimina dichas filas.
    6. A partir del campo dirección, crea dos nuevas columnas para almacenar la ciudad (City) y el estado (State). Por ejemplo, para la dirección 136 Church St, New York City, NY 10001, la ciudad es New York City y el estado es NY.
    7. Modifica el campo con la fecha del pedido para que su formato sea timestamp.
    8. Sobre el campo anterior, crea dos nuevas columnas, con el mes (Month) y el año (Year) del pedido.
    9. Almacena los datos en formato Parquet en la carpeta salesoutput particionando los datos por año y mes. Tras ejecutar esta operación, comprueba en disco la estructura de archivos creada.
    10. Sobre los datos almacenados, realiza una nueva lectura pero solo leyendo los datos de 2019 los cuales deberían estar almacenados en ./salesdataoutput/Year=2019.
    11. Averigua cual ha sido el mes que ha recaudado más. Para ello, deberás multiplicar el precio por la cantidad de unidades, y posteriormente, realizar alguna agregación. Sobre el resultado, crea un gráfico similar al siguiente:

      Ventas por mes

    12. Obtén un gráfico con las 10 ciudades que más unidades han vendido.

      Ciudades con más unidades vendidas

    13. Cantidad de pedidos por Horas en las que se ha realizado un pedido que contenía al menos dos productos:

      Pedidos de al menos dos productos por horas

    14. Listado con los productos del estado de NY que se han comprado a la vez, obteniendo un resultado similar a:

      +------------------------------------------------------------+-----+
      |Productos                                                   |count|
      +------------------------------------------------------------+-----+
      |[iPhone, Lightning Charging Cable]                          |126  |
      |[Google Phone, USB-C Charging Cable]                        |124  |
      |[Google Phone, Wired Headphones]                            |52   |
      ...
      
  6. (opcional) Vuelve a realizar todo el ejercicio anterior pero utilizando únicamente Spark SQL.