Saltar a contenido

Spark

Apuntes sin actualizar

Estos apuntes pertenecen al curso 21/22 y, por lo tanto, ya no se actualizan.

Puedes acceder a la última versión de esta sesión en https://aitor-medrano.github.io/iabd2223/spark/01spark.html.

La analítica de datos es el proceso de inspeccionar, limpiar, transformar y modelar los datos con el objetivo de descubrir información útil, obtener conclusiones sobre los datos y ayudar en la toma de decisiones.

Para ello, el uso de Spark de la mano de Python, NumPy y Pandas como interfaz de la analítica es clave en el día a día de un científico/ingeniero de datos.

La version 3.0 de Apache Spark se lanzó en 2020, diez años después de su nacimiento. Esta versión incluye mejoras de rendimiento (el doble en consultas adaptativas), facilidad en el uso del API de Pandas, un nuevo interfaz gráfico para el streaming que facilita el seguimiento y depuración de las consultas y ajustes de rendimiento.

Introducción

Logo de Apache Spark

Spark es un framework de computación distribuido en paralelo similar a Hadoop-MapReduce (así pues, Spark no es un lenguaje de programación), pero que en vez de almacenar los datos en un sistema de ficheros distribuidos o utilizar un sistema de gestión de recursos, lo hace en memoria. El hecho de almacenar en memoria los cálculos intermedios implica que sea mucho más eficiente que Hadoop MapReduce.

En el caso de tener la necesidad de almacenar los datos o gestionar los recursos, se apoya en sistemas ya existentes como HDFS, YARN o Apache Mesos. Por lo tanto, Hadoop y Spark son sistemas complementarios.

El diseño de Spark se basa principalmente en cuatro características:

  • Velocidad: enfocado al uso en un clúster de commodity hardware con una gestión eficiente del multihilo y procesamiento paralelo. Spark construye sus consultas mediante un grafo dirigido acíclico (DAG) y utiliza un planificador para descomponer el grafo en tareas que se ejecutan en paralelo en los nodos de los clústers. Finalmente, utiliza un motor de ejecución (Tungsten) que genera código compacto para optimizar la ejecución. Todo ello teniendo en cuenta que los resultados intermedios se almacenan en memoria.
  • Facilidad de uso: Spark ofrece varias capas de abstracción sobre los datos, como son los RDD, DataFrames y Dataset. Al ofrecer un conjunto de transformaciones y acciones como operaciones de su API, Spark facilita el desarrollo de aplicaciones Big data.
  • Modularidad: soporte para todo tipo de cargas mediante cualquiera de los lenguajes de programación soportados: Scala, Java, Python, SQL y R, así como los módulos de Spark SQL para consultas interactivas, Spark Structured Streaming para procesamiento de datos en streaming, Spark MLlib para machine learning y GraphX para trabajar con grafos. De esta manera, mediante una única aplicación Spark se puede hacer todo sin necesidad de utilizar APIs separadas.
  • Extensibilidad: Al centrarse unicamente en el procesamiento, la gestión de los datos se puede realizar a partir de Hadoop, Cassandra, HBase, MongoDB, Hive o cualquier SGBD relacional, haciendo todo en memoria. Además, se puede extender el API para utilizar otras fuentes de datos, como Apache Kafka, Amazon S3 o Azure Storage.

En términos de flexibilidad, Spark ofrece un stack unificado que permite resolver múltiples tipos de procesamiento de datos, tanto aplicaciones batch como consultas interactivas, algoritmos de machine learning que quieren muchas iteraciones, aplicaciones de ingesta en streaming con rendimiento cercado al tiempo real, etc... Antes de Spark, para cada uno de estos tipos de procesamiento necesitábamos una herramienta diferente, ahora con Spark tenemos una bala de plata que reduce los costes y recursos necesarios.

Spark vs Hadoop

La principal diferencia es que la computación se realiza en memoria, lo que puede implicar un mejora de hasta 100 veces mejor rendimiento. Para ello, se realiza una evaluación perezosa de las operaciones, de manera, que hasta que no se realiza una operación, los datos realmente no se cargan.

Para solucionar los problemas asociados a MapReduce, Spark crea un espacio de memoria RAM compartida entre los ordenadores del clúster. Este permite que los NodeManager/WorkerNode compartan variables (y su estado), eliminando la necesidad de escribir los resultados intermedios en disco. Esta zona de memoria compartida se traduce en el uso de RDD, DataFrames y DataSets, permitiendo realizar procesamiento en memoria a lo largo de un clúster con tolerancia a fallos.

Stack unificado

El elemento principal es Spark Core el cual aporta toda la funcionalidad necesaria para preparar y ejecutar las aplicaciones distribuidas, gestionando la planificación y tolerancia a fallos de las diferentes tareas. Para ello, el núcleo ofrece un entorno NoSQL idóneo para el análisis exploratorio e interactivo de los datos. Spark se puede ejecutar en batch o en modo interactivo y tiene soporte para Python. Independientemente del lenguaje utilizado (ya sea Python, Java, Scala, R o SQL) el código se despliega entre todos los nodos a lo largo del clúster.

Además, contiene otros 4 grandes componentes construidos sobre el core:

Componentes de Spark
  1. Spark Streaming es una herramienta para la creación de aplicaciones que procesamiento en streaming que ofrece un gran rendimiento con soporte para la tolerancia a fallos. Los datos pueden venir desde fuentes de datos tan diversas como Kafka, Flume, Twitter y tratarse en tiempo real.
  2. Spark SQL ofrece un interfaz SQL para trabajar con Spark, permitiendo la lectura de datos tanto de una tabla de cualquier base de datos relacional como de ficheros con formatos estructurados (CSV, texto, JSON, Avro, ORC, Parquet, etc...) y construir tablas permanentes o temporales en Spark. Tras la lectura, permite combinar sentencias SQL para trabajar con los datos y cargar los resultados en un DataFrame de Spark.

    Por ejemplo, con este fragmento leemos un fichero JSON desde S3, creamos una tabla temporal y mediante una consulta SQL cargamos los datos en un DataFrame de Spark:

    df = spark.read.json("s3://apache_spark/data/committers.json")
    df.createOrReplaceTempView("committers")
    resultado = spark.sql("""SELECT name, org, module, release, num_commits
        FROM committers WHERE module = 'mllib' AND num_commits > 10
        ORDER BY num_commits DESC""")
    
  3. Spark MLlib es un módulo de machine learning que ofrece la gran mayoría de algoritmos de ML y permite construir pipelines para el entrenamiento y evaluación de los modelos IA.

  4. GraphX permite procesar estructuras de datos en grafo, siendo muy útiles para recorrer las relaciones de una red social u ofrecer recomendaciones sobre gustos/afinidades. En este curso no vamos a entrar en detalle en este módulo.

Además, la comunidad de Spark dispone de un gran número de conectores para diferentes fuentes de datos, herramientas de monitorización, etc... que conforman su propio ecosistema:

Ecosistema de Spark

Puesta en Marcha

En nuestra máquina virtual, únicamente necesitamos ejecutar el comando pyspark el cual arrancará directamente un cuaderno Jupyter:

iabd@iabd-virtualbox:~/Spark$ pyspark
[I 16:50:57.168 NotebookApp] Serving notebooks from local directory: /home/iabd/Spark
[I 16:50:57.168 NotebookApp] The Jupyter Notebook is running at:
[I 16:50:57.168 NotebookApp] http://localhost:8888/?token=b7b4c7232e5d9d3f7c7fdd51d75e5fe314c3f2c637e90652
[I 16:50:57.168 NotebookApp]  or http://127.0.0.1:8888/?token=b7b4c7232e5d9d3f7c7fdd51d75e5fe314c3f2c637e90652
[I 16:50:57.168 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 16:50:57.968 NotebookApp] 

    To access the notebook, open this file in a browser:
        file:///home/iabd/.local/share/jupyter/runtime/nbserver-9654-open.html
    Or copy and paste one of these URLs:
        http://localhost:8888/?token=b7b4c7232e5d9d3f7c7fdd51d75e5fe314c3f2c637e90652
     or http://127.0.0.1:8888/?token=b7b4c7232e5d9d3f7c7fdd51d75e5fe314c3f2c637e90652
[W 16:51:02.666 NotebookApp] 404 GET /api/kernels/a8119b9f-91ce-4eee-b32b-9be48a0d281e/channels?session_id=5860cf5e65fa481d9110c9ff9904d3f7 (127.0.0.1): Kernel does not exist: a8119b9f-91ce-4eee-b32b-9be48a0d281e
[W 16:51:02.676 NotebookApp] 404 GET /api/kernels/a8119b9f-91ce-4eee-b32b-9be48a0d281e/channels?session_id=5860cf5e65fa481d9110c9ff9904d3f7 (127.0.0.1) 12.30ms referer=None

Jupyter Notebook

Si instalamos PySpark según las instrucciones de la propia web, al ejecutar pyspark, se lanzara un shell. Para que se abra automáticamente Jupyter Lab, necesitamos exportar las siguientes variables de entorno:

~/.bashrc
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

Más información en https://www.sicara.ai/blog/2017-05-02-get-started-pyspark-jupyter-notebook-3-minutes

Así pues, automáticamente se abrirá una ventana en el navegador web donde crear/trabajar con los cuadernos Jupyter:

Cuadernos Jupyter con PySpark

Otra posibilidad es utilizar alguna de las imágenes Docker disponibles que facilitan su uso. En nuestro caso, recomendamos las imágenes disponibles en https://github.com/jupyter/docker-stacks.

Para lanzar la imagen de PySpark con cuadernos Jupyter utilizaremos:

docker run -d -p 8888:8888 -p 4040:4040 -p 4041:4041 jupyter/pyspark-notebook

O si queremos crear un volumen con la carpeta actual:

docker run -d -v ${PWD}:/home/jovyan/work -p 
8888:8888 -p 4040:4040 -p 4041:4041 --name pyspark jupyter/pyspark-notebook

Clúster de Spark

Si queremos montar nosotros mismo un clúster de Spark, una vez tenemos todas las máquinas instaladas con Java, Python y Spark, debemos distinguir entre:

  • Nodo maestro/driver - el cual deberemos arrancar con:

    $SPARK_HOME/sbin/start-master.sh -h 0.0.0.0
    
  • Workers (esclavos) - los cuales arrancaremos con:

    $SPARK_HOME/sbin/start-worker.sh spark://<ip-servidor-driver>:7077
    

    Sobre los workers, le podemos indicar la cantidad de CPUs mediante la opción -c y la cantidad de RAM con -m. Por ejemplo, si quisiéramos lanzar un worker con 8 núcleos y 16GB de RAM haríamos:

    $SPARK_HOME/sbin/start-slave.sh spark://<ip-servidor-driver>:7077 -c 8 -m 16G
    

Una vez arrancado, si accedemos a http://ip-servidor-driver:8080 veremos el IU de Spark con los workers arrancados.

Más información en la documentación oficial.

Uso en la nube

Para trabajar con Spark desde la nube disponemos de varias alternativas, ya sean herramientas que permiten trabajar con cuadernos Jupyter como pueden ser Google Colab o Databricks, o montar un clúster mediante AWS EMR (Elastic MapReduce) o Azure HDInsight.

Google Colab

Primero nos vamos a centrar en Google Colab. A lo largo del curso, ya hemos empleado esta herramienta tanto en sistemas de aprendizaje como en el análisis exploratorio de los datos.

Para que funcione Spark dentro de Google Colab, únicamente hemos de instalar las librerías. Se adjunta un cuaderno con ejemplo de código:

# 1. Instalar las dependencias
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xvf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q pyspark

# 2. Configurar el entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-3.2.1-bin-hadoop3.2"

# 3. Cargar Pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("s8a").master("local[*]").getOrCreate()
spark

El cual podemos observar cómo se ejecuta en Colab:

Ejemplo de cuaderno en Google Colab

Databricks

Databricks es una plataforma analítica de datos basada en Apache Spark desarrollada por la compañía con el mismo nombre. La empresa, creada en el 2013 por los desarrolladores principales de Spark, permite realizar analítica Big Data e Inteligencia Artificial con Spark de una forma sencilla y colaborativa.

Databricks se integra de forma transparente con AWS, Azure y Google Cloud. En una entrada del blog de la empresa de noviembre de 2021 anuncian un nuevo record de procesamiento que implica que su rendimiento es 3 veces superior a la competencia y con un coste menor.

Para poder trabajar con Databricks de forma gratuita, podemos hacer uso de Databricks Community Edition, donde podemos crear nuestros propios cuadernos Jupyter y trabajar con Spark sin necesidad de instalar nada.

El único paso inicial tras registrarnos, es crear un clúster básico (con 15GB de memoria y dos núcleos) desde la opción Create del menú de la izquierda:

Creación de un clúster en Databricks

Tras un par de minutos se habrá creado y lanzado el clúster, ya estaremos listos para crear un nuevo notebook y tener acceso a Spark directamente desde el objeto spark:

Ejemplo de cuaderno en Databricks

Si queremos, podemos hacer público el cuaderno y compartirlo con la comunidad.

SparkContext vs SparkSession

SparkContext es el punto de entrada a Spark desde las versiones 1.x y se utiliza para crear de forma programativa RDD, acumuladores y variables broadcast en el clúster. Desde Spark 2.0, la mayoría de funcionalidades (métodos) disponibles en SparkContext también los están en SparkSession. Su objeto sc está disponible en el spark-shell y se puede crear de forma programativa mediante la clase SparkContext.

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

SparkSession se introdujo en la versión 2.0 y es el punto de entrada para crear RDD, DataFrames y DataSets. El objeto spark se encuentra disponible por defecto en el spark-shell y se puede crear de forma programativa mediante el patrón builder de SparkSession.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 

Además, desde una sesión de Spark podemos obtener un contexto a través de la propiedad sparkContext:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

Hola Spark

Lo primero que debemos hacer siempre es conectarnos a la sesión de Spark, el cual le indica a Spark como acceder al clúster. Si utilizamos la imagen de Docker, debemos obtener siempre la sesión a partir de la clase SparkSession:

ejemploDockerSpark.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() # SparkSession de forma programativa
sc = spark.sparkContext                    # SparkContext a partir de la sesión

# Suma de los 100 primeros números
rdd = sc.parallelize(range(100 + 1))
rdd.sum()

En cambio, si utilizamos la instalación de PySpark (o la solución de Databricks) que tenemos en la máquina virtual, directamente podemos acceder a la instancia de SparkSession a través del objeto global spark:

ejemploPySpark.py
sc = spark.sparkContext     # spark es una instancia de la clase SparkSession

rdd = sc.parallelize(range(100 + 1))
rdd.sum()

En ambos casos, si mostramos el contenido del contexto obtendremos algo similar a:

Version
    v3.2.0
Master
    local[*]
AppName
    PySparkShell

A continuación podemos ver el resultado completo en su ejecución dentro de un cuaderno Jupyter:

Hola Spark

Nombre de la aplicación

Si queremos darle nombre a la aplicación Spark, lo podemos hacer al obtener la SparkSession:

spark = SparkSession.builder.appName("spark-s8a").getOrCreate()

Spark Submit

De la misma manera que mediante Hadoop podíamos lanzar un proceso al clúster para su ejecución, Spark ofrece el comando spark-submit para enviar un script al driver para su ejecución de forma distribuida.

Así pues, si colocamos nuestro código en un archivo de Python:

holaSpark.py
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Suma de los 100 primeros números
rdd = sc.parallelize(range(100 + 1))
suma = rdd.sum()
print("--------------")
print(suma)
print("--------------")

Lo podemos ejecutar mediante (en nuestra máquina virtual antes debemos resetear una variable de entorno para que no ejecute automáticamente el cuaderno jupyter: unset PYSPARK_DRIVER_PYTHON):

spark-submit holaMundo.py

Si nuestro servidor estuviera en otra dirección IP, deberíamos indicarle donde encontrar el master:

spark-submit --master spark://<ip-servidor-driver>:7077 holaMundo.py

Más información en la documentación oficial

Arquitectura

Ya hemos comentado que Spark es un sistema distribuido diseñado para procesar grandes volúmenes de datos de forma rápida y eficiente. Este sistema normalmente se despliega en un conjunto de máquinas que se conocen como un clúster Spark, pudiendo estar compuesta de unas pocas máquinas o miles de ellas. Según el FAQ de Spark, el clúster más grande de Spark está compuesto por más de 8000 nodos.

Normalmente se utiliza un sistema de gestión de recursos como YARN para gestionar de forma inteligente y eficiente el clúster. Los dos componentes principales del clúster zon:

  • el gestor de cluster: nodo maestro que sabe donde se localizan los esclavos, cuanta memoria disponen y el número de cores CPU de cada nodo. Su mayor responsabilidad es orquestar el trabajo asignándolo a los diferentes nodos.
  • los nodos trabajadores (workers): cada nodo ofrece recursos (memoria, CPU, etc...) al gestor del clúster y realiza las tareas que se le asignen.

Aplicaciones Spark

Una aplicación Spark se compone de dos partes:

  1. La lógica de procesamiento de los datos, la cual realizamos mediante alguna de las API que ofrece Spark (Java, Scala, Python, etc...), desde algo sencillo que realice una ETL sobre los datos a problemas más complejos que requieran múltiples iteraciones y tarden varias horas como entrenar un modelo de machine learning.
  2. Driver: es el coordinador central encargado de interactuar con el clúster Spark y averiguar qué máquinas deben ejecutar la lógica de procesamiento. Para cada una de esas máquinas, el driver realiza una petición al clúster para lanzar un proceso conocido como ejecutor (executor). Además, el driver Spark es responsable de gestionar y distribuir las tareas a cada ejecutor, y si es necesario, recoger y fusionar los datos resultantes para presentarlos al usuario. Estas tareas se realizan a través de la SparkSession.

Cada ejecutor es un proceso JVM (Java Virtual Machine) dedicado para una aplicación Spark específica. Un ejecutor vivirá tanto como dure la aplicación Spark, lo cual puede ser segundos, minutos o días, dependiendo de la complejidad de la aplicación. Conviene destacar que los ejecutores son elementos aislados que no se comparten entre aplicaciones Spark, por lo que la única manera de compartir información entre diferente ejecutores es mediante un sistema de almacenamiento externo como HDFS.

Arquitectura entre una aplicación Spark y el gestor del clúster

Así pues, Spark utiliza una arquitectura maestro/esclavo, donde el driver es el maestro, y los ejecutores los esclavos. Cada uno de estos componentes se ejecutan como un proceso independiente en el clúster Spark. Por lo tanto, una aplicación Spark se compone de un driver y múltiples ejecutores. Cada ejecutor realiza lo que se le pide en forma de tareas, ejecutando cada una de ellas en un núcleo CPU separado. Así es como el procesamiento paralelo acelera el tratamiento de los datos. Además, cada ejecutor, bajo petición de la lógica de la aplicación, se responsabiliza de cachear un fragmento de los datos en memoria y/o disco.

Al lanzar una aplicación Spark, podemos indicar el número de ejecutores que necesita la aplicación, así como la cantidad de memoria y número de núcleos que debería tener cada ejecutor.

Clúster compuesto por un driver y tres ejecutores

Job, Stage y Task

Cuando creamos una aplicación Spark, por debajo, se distinguen los siguientes elementos:

  • Job (trabajo): computación paralela compuesta de múltiples tareas que se crean tras una acción de Spark (save, collect, etc...). Al codificar nuestro código mediante PySpark, el driver convierte la aplicación Spark en uno o más jobs, y a continuación, estos jobs los transforma en un DAG (grafo). Este grafo, en esencia, es el plan de ejecución, donde cada elemento dentro del DAG puede implicar una o varias stages (escenas).
  • Stage (escena): cada job se divide en pequeños conjuntos de tareas que forman un escenario. Como parte del grafo, las stages se crean a partir de si las operaciones se pueden realizar de forma paralela o de forma secuencial. Como no todas las operaciones pueden realizarse en una única stage, en ocasiones se dividen en varias, normalmente debido a los límites computacionales de los diferentes ejecutores.
  • Task (tarea): unidad de trabajo más pequeña que se envía a los ejecutores Spark. Cada escenarios se compone de varias tareas. Cada una de las tareas se asigna a un único núcleo y trabaja con una única partición de los datos. Por ello, un ejecutor con 16 núcleos puede tener 16 o más tareas trabajando en 16 o más particiones en paralelo.
Driver → Job → Stage → Task

DataFrame

La principal abstracción de los datos en Spark es el Dataset. Se pueden crear desde las fuentes de entrada de Hadoop (como ficheros que provienen de HDFS o S3) o mediante transformaciones de otros Datasets. Dado el cariz de Python, no necesitamos que los Dataset estén fuertemente tipados, por eso, todos los Dataset que usemos serán Dataset[Row] (si trabajásemos mediante Java o Scala sí deberíamos indicar el tipo de sus datos), y por consistencia con el concepto de Pandas y R, los llamaremos DataFrame.

Por ejemplo, veamos cómo podemos crear un DataFrame a partir de un fichero de texto:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate() 

quijoteTxt = spark.read.text("el_quijote.txt")
quijoteTxt.count()  # número de filas del DataFrame - 2186
quijoteTxt.first()  # primera fila - Row(value='DON QUIJOTE DE LA MANCHA')
# Transformamos un DataFrame en otro nuevo
lineasConQuijote = quijoteTxt.filter(quijoteTxt.value.contains("Quijote"))  # DataFrame con las líneas que contiene la palabra Quijote
lineasConQuijote.count()  # cantidad de líneas con la palabra Quijote - 584
# Las transformaciones se pueden encadenar
quijoteTxt.filter(quijoteTxt.value.contains("Quijote")).count()     # idem - 584

Estudiaremos los DataFrame en profundidad en la siguiente sesión.

RDD

Un RDD (Resilient Distributed Datasets) es una estructura de datos que abstrae los datos para su procesamiento en paralelo.

Antes de Spark 2.0, los RDD eran el interfaz principal para interactuar con los datos.

Se trata de una colección de elementos tolerantes a fallos que son immutables (una vez creados, no se pueden modificar) y diseñados para su procesamiento distribuido. Cada conjunto de datos en los RDD se divide en particiones lógicas, que se pueden calcular en diferentes nodos del clúster.

Hay dos formas de crear un RDD:

  • Paralelizando una colección ya existente en nuestra aplicación Spark.
  • Referenciando un dataset de un sistema externo como HDFS, HBase, etc...

Sobre los RDD se pueden realizar dos tipos de operaciones:

  • Acción: devuelven un valor tras ejecutar una computación sobre el conjunto de datos.
  • Transformación: es una operación perezosa que crea un nuevo conjunto de datos a partir de otro RDD/Dataset, tras realizar un filtrado, join, etc...

¿RDD obsoleto?

Antes de la versión 2.0, el principal interfaz para programar en Spark eran los RDD. Tras la versión 2.0, fueron sustituidos por los Dataset, que son RDD fuertemente tipados que además están optimizados a bajo nivel. Aunque el interfaz RDD todavía tiene soporte, sin embargo, se recomienda el uso de los Dataset por su mejor rendimiento. A lo largo de estas sesiones iremos combinando ambos interfaces para conocer las similitudes y diferencias.

Acciones

A continuación vamos a revisar las acciones más comunes. Puedes consultar todas las acciones disponibles en la documentación oficial:

Parallelize

Podemos crear RDD directamente desde cero sin necesidad de leer los datos desde un fichero. Para ello, a partir de un SparkContext podemos utilizar parallelize.

Esta acción divide una colección de elementos entre los nodos de nuestro clústers. Por ejemplo:

miRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])

lista  = ['Hola', 'Adiós', 'Hasta luego']
listaRDD = sc.parallelize(lista) # Creamos un RDD a partir de una lista
listaRDD4 = sc.parallelize(lista,4) # Creamos un RDD con 4 particiones

Take y Sample

Cuando queremos recuperar un número determinado de resultado, de forma similar a limit en SQL, tenemos la acción take:

miRDD.take(3)       # [1, 2, 3]
listaRDD.take(2)    # ['Hola', 'Adiós']

Otra opción es utilizar sample para obtener una muestra de los datos, aunque en este caso no es una acción sino una transformación, ya que va a obtener información posiblemente de varias particiones, teniendo que barajar los datos (shuffle):

miRDDmuestra = miRDD.sample(False, 0.5)
miRDDmuestra.collect()  # [2, 4, 6, 7, 8, 9] / [1, 2, 3, 4, 6] / [5, 8, 9]

Esta transformación recibe varios parámetros:

  • withReplacement: booleano para indicar si queremos elementos repetidos
  • fraction: valor entre 0 y 1 que expresa la probabilidad de elegir cada elemento
  • opcionalmente se le puede pasar un tercer valor con la semilla

Así pues, en el ejemplo anterior, cada llamada a sample ha generado un RDD diferente, sin valores repetidos, pero con un tamaño de RDD variable.

Muestra estratificada

Si necesitamos que nuestra muestra esté estratificada para que los datos no estén sesgados podemos utilizar sampleByKey:

datos = sc.parallelize([(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')])

# indicamos la probabilidad para cada clave
fracciones = {1: 0.1, 2: 0.6, 3: 0.3}

approxSample = datos.sampleByKey(False, fracciones)     # [(2, 'c'), (2, 'd')]

Para obtener una muestra mediante una acción, tenemos la opción takeSample que funciona de forma similar pero sin hacer shuffle y devuelve una lista:

miRDDmuestraT = miRDD.takeSample(False, 5)
print(miRDDmuestraT)  # [1, 8, 9, 7, 2]

El primer parámetro vuelve a indicar si hay repetidos, pero el segundo fija la cantidad de elementos a devolver.

Por último, mediante top obtenemos los primeros elementos una vez ordenado el RDD:

miRDD.top(3)    # [9, 8, 7]

De forma similar, tenemos takeOrdered que recupera la cantidad de registros necesarios pero ordenados ascendentemente (al contrario que top), con la opción de ordenarlos descendentemente (similar a top):

miRDD.takeOrdered(3)    # [1, 2, 3]
miRDD.takeOrdered(3, lambda x: -x)    # [9, 8, 7]

Hay que tener cuidado si el conjunto de datos es muy grande, porque tanto take como takeSample, takeOrdered y top llevarán todos los datos a memoria.

Collect

Un fallo muy posible a la hora de mostrar los datos de un RDD es utilizar rdd.foreach(print) o rdd.map(print).

En una única máquina, esta operación generaría la salida esperada mostrando todos los elementos del RDD. Sin embargo, al trabajar en un clúster, la salida a stdout la realizarían los diferentes nodos y no el nodo principal.

Así pues, para mostrar todos los elementos de un RDD/DataFrame/Dataset hemos de emplear el método collect, el cual primero mostrará los RDD del nodo principal (driver node), y luego para cada nodo del cluster mostrará sus datos.

rdd.collect()

Out of memory

Hay que tener mucho cuidado, ya que nos podemos quedar fácilmente sin memoria, ya que collect se trae los datos de todos los ejecutores a un único nodo, el que ésta ejecutando el código (driver).

Si sólo necesitamos mostrar unos pocos elementos, un enfoque más seguro es utilizar take:

rdd.take(100)

Transformaciones

En Spark, las estructuras de datos son inmutables, de manera que una vez creadas no se pueden modificar. Para poder modificar un RDD/DataFrame, hace falta realizar una transformación, siendo el modo de expresar la lógica de negocio mediante Spark.

Todas las transformaciones en Spark se evalúan de manera perezosa (lazy evaluation), de manera que los resultados no se computan inmediatamente, sino que se retrasa el cálculo hasta que el valor sea necesario. Para ello, se van almacenando los pasos necesarios y se ejecutan únicamente cuando una acción requiere devolver un resultado al driver. Este diseño facilita un mejor rendimiento (por ejemplo, imagina que tras una operación map se realiza un reduce y en vez de devolver todo el conjunto de datos tras el map, sólo le enviamos al driver el resultado de la reducción).

Así pues, las acciones provocan la evaluación de todas las transformaciones previas que se habían evaluado de forma perezosa y estaban a la espera.

Por defecto, cada transformación RDD/DataSet se puede recalcular cada vez que se ejecute una acción. Sin embargo, podemos persistir un RDD en memoria mediante los métodos persist (o cache), de manera que Spark mantendrá los datos para un posterior acceso más eficiente. También podemos persistir RDD en disco o replicarlo en múltiples nodos.

Tipos de transformaciones

Existen dos tipos de transformaciones, dependiendo de las dependencias entre las particiones de datos:

  • Transformaciones Narrow: consisten en dependencias estrechas en las que cada partición de entrada contribuye a una única partición de salida.

  • Transformaciones Wide: consisten en dependencias anchas de manera que varias particiones de entrada contribuyen a muchas otras particiones de salida, es decir, cada partición de salida depende de diferentes particiones de entrada. Este proceso también se conoce como shuffle, ya que Spark baraja los datos entre las particiones del clúster.

Transformaciones Narrow vs Wide

Con las transformaciones narrow, Spark realiza un pipeline de las dependencias, de manera que si especificamos múltiples filtros sobre DataFrames/RDD, se realizarán todos en memoria.

Esto no sucede con las transformaciones wide, ya que al realizar un shuffle los resultados se persisten en disco.

Cuidado con shuffle

Las operaciones shuffle son computacionalmente caras, ya que implican E/S en disco, serialización de datos y E/S en red. Para organizar los datos previos al shuffle, Spark genera un conjunto de tareas (tareas map para organizar los datos, y reduce para agregar los resultados).

Internamente, el resultado de las tareas map se mantienen en memoria hasta que no caben. Entonces, se ordenan en la partición destino y se persisten en un único archivo. En la fase de reducción, las tareas leen los bloques ordenados que son relevantes.

Las operaciones reduceByKey y aggregateByKey son de las que más memoria consumen, al tener que crear las estructuras de datos para organizar los registros en las tareas de map, y luego generar los resultados agregados en la de reduce. Si los datos no caben en memoria, Spark los lleva a disco, incurriendo en operaciones adicionales de E/S en disco y del recolector de basura.

A continuación veremos las diferentes transformaciones que podemos realizar con Spark.

Transformaciones Narrow

Para los siguientes ejemplo, utilizaremos el siguiente fichero de empleados.txt que ya utilizamos en la sesión de Hive:

empleados.txt
Michael|Montreal,Toronto|Male,30|DB:80|Product:DeveloperLead
Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead
Shelley|New York|Female,27|Python:80|Test:Lead,COE:Architect
Lucy|Vancouver|Female,57|Sales:89,HR:94|Sales:Lead

Map

La transformación map aplica la función recibida a cada elemento del RDD, de manera que vamos a poder añadir una nueva columna, modificar una existente, etc...

Por ejemplo, si la entrada es un RDD que contiene [1, 2, 3, 4, 5], al hacer rdd.map(x=>x*2) obtendríamos un nuevo RDD con [2, 4, 6, 8, 10]:

rdd = sc.parallelize([1, 2, 3, 4, 5])
resultRDD = rdd.map(lambda x: x*2)
resultRDD.collect()          # [2, 4, 6, 8, 10]

Mediante la función textFile podemos cargar un archivo. Supongamos que tenemos cargado en Hadoop el archivo empleados.txt:

rddLocal = sc.textFile("empleados.txt")
rdd = sc.textFile("hdfs://iabd-virtualbox:9000/user/iabd/datos/empleados.txt") 
rdd.count()                 # 4 - cantidad de líneas
resultRDD = rdd.map(len)    # obtenemos la cantidad de caracteres cada línea
resultRDD.collect()         # [61, 52, 60, 50]

Si quisiéramos mostrar los datos de los empleados, podríamos recoger los datos del RDD y recorrerlo:

empleados = rdd.collect()
for empleado in empleados:
    print(empleado)

FlatMap

La transformación flatMap es muy similar a la anterior, pero en vez de devolver una lista con un elemento por cada entrada, devuelve una única lista deshaciendo las colecciones en elementos individuales:

rdd = sc.textFile("empleados.txt") 
resultFM = rdd.flatMap(lambda x: x.split("|"))
resultFM.collect()

Obtendríamos cada atributo separado y todos dentro de la misma lista:

['Michael',
 'Montreal,Toronto',
 'Male,30',
 'DB:80',
 'Product:Developer\x04Lead',
 'Will',
 'Montreal',
 'Male,35',
 'Perl:85',
 'Product:Lead,Test:Lead',
 'Shelley',
 'New York',
 'Female,27',
 'Python:80',
 'Test:Lead,COE:Architect',
 'Lucy',
 'Vancouver',
 'Female,57',
 'Sales:89,HR:94',
 'Sales:Lead']

Filter

Permite filtrar los elementos que cumplen una condición mediante filter:

rdd = sc.parallelize([1, 2, 3, 4, 5])
resultRDD = rdd.filter(lambda x: x%2==0)
resultRDD.collect()     # [2, 4] 

También podemos anidar diferentes transformaciones. Para este ejemplo, vamos a crear tuplas formadas por un número y su cuadrado, luego quitar los que no coincide el número su potencia (el 0 y el 1), y luego aplanarlo en una lista:

rdd10 = sc.parallelize(range(10+1))
rddPares = rdd10.map(lambda x: (x, x**2)).filter(lambda x: (x[0] != x[1])).flatMap(lambda x: x)
rddPares.collect()      # [2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]

Veamos otro ejemplo. Retomamos los datos de los empleados y si queremos filtrar los empleados que son hombres, primero separamos por las | y nos quedamos con el tercer elemento que contiene el sexo y la edad. A continuación, separamos por la coma para quedarnos en el sexo en la posición 0 y la edad en el 1, y comparamos con el valor deseado:

rdd = sc.textFile("empleados.txt") 
hombres = rdd.filter(lambda x: x.split("|")[2].split(",")[0] == "Male")
resultFM.collect()

Obteniendo:

['Michael|Montreal,Toronto|Male,30|DB:80|Product:Developer\x04Lead',
 'Will|Montreal|Male,35|Perl:85|Product:Lead,Test:Lead']

Distinct

Si utilizamos distinct eliminaremos los elementos repetidos:

rdd = sc.parallelize([1,1,2,2,3,4,5])
resultRDD = rdd.distinct()
resultRDD.collect()     # [4, 1, 5, 2, 3]

Trabajando con conjuntos

Union

Mediante union unimos dos RDD en uno:

rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([5,6,7,8])
resultRDD = rdd1.union(rdd2)
resultRDD.collect()     # [1, 2, 3, 4, 5, 6, 7, 8]

Intersection

Mediante intersection, obtendremos los elementos que tengan en común:

rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5,6])
resultRDD = rdd1.intersection(rdd2)
resultRDD.collect()     # [1, 2, 3, 4, 5, 6, 7, 8]

Subtract

Mediante subtract, obtendremos los elementos propios que no estén en el RDD recibido:

rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize([3,4,5,6])
resultRDD = rdd1.subtract(rdd2)
resultRDD.collect()     # [1, 2]

Autoevaluación

Si tenemos dos RDD (A y B):

rddA = sc.parallelize([1,2,3,4])
rddB = sc.parallelize([3,4,5,6])

¿Cómo conseguimos los elementos que están en A y no B y los de B que no están en A? (es decir [1, 2, 5, 6])):

resultRDD = rddA.subtract(rddB).union(rddB.subtract(rddA))
resultRDD.collect()     # [1, 2, 5, 6]

RDD de Pares

Una técnica muy común a la hora de trabajar con RDD es hacerlo con elementos que tienen el formato (clave, valor), puediendo las claves y los valores pueden ser de cualquier tipo.

listaTuplas = [(1,'a'), (2,'b'), (3,'c'), (4,'d')]
rddTuplas= sc.parallelize(listaTuplas)

Sobre estos RDD podemos realizar algoritmos MapReduce para muchas funciones de procesamiento de datos, como es la agrupación, ordenación, join, count, etc...

Para generar un RDD de pares, además de generarlo nosotros a partir de una lista, podemos emplear las siguientes operaciones:

  • zip: uniendo dos listas del mismo tamaño:

    lista1 = ['a','b','c','e','f','g','h']
    lista2 = [4, 5, 6, 7, 8, 9, 10]
    rddZip = sc.parallelize(lista1).zip(sc.parallelize(lista2)).collect()
    # [('a', 4), ('b', 5), ('c', 6), ('e', 7), ('f', 8), ('g', 9), ('h', 10)]
    
    rddZipSecuencia = sc.parallelize(zip(lista1,range(len(lista1)))) # usando el tamaño de la lista
    # [('a', 0), ('b', 1), ('c', 2), ('e', 3), ('f', 4), ('g', 5), ('h', 6)]
    

    Otros métodos relacionados son zipWithIndex y zipWithUniqueId.

  • map: asignando a cada elemento un valor o cálculo sobre él mismo:

    lista  = ['Hola', 'Adiós', 'Hasta luego']
    rddMap = sc.parallelize(lista).map(lambda x: (x, len(c))
    # [('Hola', 4), ('Adiós', 5), ('Hasta luego', 11)]
    
  • keyBy: permite crear las claves a partir de cada elemento:

    rddKeyBy = sc.parallelize(lista).keyBy(lambda x: x[0])  # creamos una clave con la primera letra
    # [('H', 'Hola'), ('A', 'Adiós'), ('H', 'Hasta luego')]
    

Autoevaluación

A partir de la lista "Perro Gato Loro Pez León Tortuga Gallina"

  1. Crea un RDD a partir de esta lista
  2. Convierte el RDD normal en un RDD de pares cuya clave sea la primera letra del animal
  3. Crea otro RDD de pares pero poniendo como clave un número incremental
  4. ¿Y si queremos que el índice incremental empiece en 100?
animales = "Perro Gato Loro Pez León Tortuga Gallina"
animalesLista = animales.split(" ")
rdd1 = sc.parallelize(animalesLista)
rdd2 = rdd1.keyBy(lambda x: x[0])
rdd3 = sc.parallelize(zip(range(len(animalesLista)), animalesLista))
rdd4 = sc.parallelize(zip(range(100,100+len(animalesLista)), animalesLista))

Sobre los RDD de pares, podemos realizar las siguientes transformaciones:

  • keys: devuelve las claves
  • values: devuelve los valores
  • mapValues: Aplica la función sobre los valores
  • flatMapValues Aplica la función sobre los valores y los aplana.

A continuación se muestra un fragmento de código para poner en práctica las transformaciones comentadas:

listaTuplas = [('a',1), ('z',3), ('b',4), ('c',3), ('a',4)]
rddTuplas= sc.parallelize(listaTuplas)

claves = rddTuplas.keys()       # ['a', 'z', 'b', 'c', 'a']
valores = rddTuplas.values()    # [1, 3, 4, 3, 4]

rddMapValues = rddTuplas.mapValues(lambda x: (x,x*2))
# [('a', (1, 2)), ('z', (3, 6)), ('b', (4, 8)), ('c', (3, 6)), ('a', (4, 8))]
rddFMV = rddTuplas.flatMapValues(lambda x: (x,x*2))
# [('a', 1),
#  ('a', 2),
#  ('z', 3),
#  ('z', 6),
#  ('b', 4),
# ...

Transformaciones Wide

Las siguientes transformaciones, además de trabajar con RDD de pares, mezclan los datos de las particiones mediante el shuffle de los elementos.

Para los siguientes ejemplos, utilizaremos el fichero de ventas que trabajamos durante las sesiones de Pentaho: pdi_sales.csv (versión extendida) / pdi_sales_small.csv (versión reducida) el cual tiene el siguiente formato:

pdi_sales.csv
ProductID;Date;Zip;Units;Revenue;Country
725;1/15/1999;41540          ;1;115.5;Germany
787;6/6/2002;41540          ;1;314.9;Germany
...

ReduceByKey

Mediante la transformación reducedByKey los datos se calculan utilizando una función de reducción a partir de la clave combinando en la misma máquina las parejas con la misma clave antes de que los datos se barajen.

Vamos a comenzar con un ejemplo sencillo, contando cuantas ventas se han realizado en cada país, o lo que es lo mismo, las veces que aparece cada palabra en el fichero:

rdd = sc.textFile("pdi_sales_small.csv")
# Recogemos el país y las unidades de las ventas
parPais1 = rdd.map(lambda x: (x.split(";")[-1].strip(), 1))
# parPais1.collect()
# [('Country', 1),
#  ('Germany', 1),
#  ('Germany', 1),
#  ('Germany', 1), ...

Hemos creado un RDD de pares compuesto por el nombre del país y el número uno, para luego en la fase de reducción sumar estos valores. Pero si nos fijamos, el archivo csv contiene el encabezado con los datos, el cual debemos quitar:

header = parPais1.first()
parPais1SinHeader = parPais1.filter(lambda linea: linea != header)
# parPais1SinHeader.collect()
# [('Germany', 1),
#  ('Germany', 1),
#  ('Germany', 1), ...

Y finalmente, ya podemos reducir por la clave:

paisesTotal = parPais1SinHeader.reduceByKey(lambda a,b: a+b)
# paisesTotal.collect()
# [('Mexico', 30060), ('France', 30060), ('Germany', 30059), ('Canada', 30060)]

Funciones lambda en reduce

Al aplicar una transformación de tipo reduce, la función lambda recibirá dos parámetros, siendo el primero el valor acumulado y el segundo el valor del elemento a operar.

Veamos otro ejemplo, en este caso vamos a calcular el total de unidades vendidas por país, de manera que vamos a coger el nombre del país (Country) y las unidades (Units) vendidas:

rdd = sc.textFile("pdi_sales_small.csv")
# Recogemos el país y las unidades de las ventas
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
# Le quitamos el encabezado
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
# Pasamos las unidades a un número entero
paisesUnidadesInt = paisesUnidadesSinHeader.map(lambda x: (x[0], int(x[1])))
# Reducimos por el país y sumamos las unidades
paisesTotalUnidades = paisesUnidadesInt.reduceByKey(lambda a,b: a+b)
paisesTotalUnidades.collect()

Y el resultado:

[('Mexico', 31095), ('France', 31739), ('Germany', 31746), ('Canada', 31148)]

Autoevaluación

Dada la siguiente lista de compra:

lista = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]

Calcula:

  1. El total que se ha gastado por cada producto
  2. Cuánto es lo máximo que se ha pagado por cada producto
lista = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
rdd = sc.parallelize(lista)
rdd1 = rdd.reduceByKey(lambda x,y: x+y)
rdd2 = rdd.reduceByKey(lambda x,y: max(x,y))

GroupByKey

Permite agrupar los datos a partir de una clave, repartiendo los resultados (shuffle) entre todos los nodos:

rdd = sc.textFile("pdi_sales_small.csv")
# Creamos un RDD de pares con el nombre del país como clave, y una lista con los valores
ventas = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")))
# Quitamos el primer elemento que es el encabezado del CSV
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
# Agrupamos las ventas por nombre del país
paisesAgrupados = ventas.groupByKey()
paisesAgrupados.collect()

Obtendremos para cada país, un iterable con todos sus datos:

[('Canada', <pyspark.resultiterable.ResultIterable at 0x7f814cd4b2e0>),
 ('France', <pyspark.resultiterable.ResultIterable at 0x7f816c3a9700>),
 ('Germany', <pyspark.resultiterable.ResultIterable at 0x7f814cd96eb0>),
 ('Mexico', <pyspark.resultiterable.ResultIterable at 0x7f814cd965e0>)]

Podemos transformar los iterables a una lista:

paisesAgrupadosLista = paisesAgrupados.map(lambda x: (x[0], list(x[1])))
paisesAgrupadosLista.collect()

Obteniendo:

[('Canada ',
  [['725', '1/15/1999', 'H1B            ', '1', '115.4', 'Canada '],
   ['2235', '1/15/1999', 'H1B            ', '2', '131.1', 'Canada '],
   ['713', '1/15/1999', 'H1B            ', '1', '160.1', 'Canada '],
   ...

Autoevaluación

Ahora tenemos las cuentas de las compras de 3 días:

  • día 1: pan 3€, agua 2€, azúcar 1€, leche 2€, pan 4€
  • día 2: pan 1€, cereales 3€, agua 0.5€, leche 2€, filetes 5€
  • día 3: filetes 2€, cereales 1€

Dada la siguiente lista de compra:

dia1 = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',4)]
dia2 = [('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
dia3 = [('filetes',2), ('cereales',1)]
  1. ¿Cómo obtenemos lo que hemos gastado en cada producto?
  2. ¿Y el gasto medio que hemos realizado en cada uno de ellos?
dia1 = [('pan',3), ('agua',2), ('azúcar',1), ('leche',2), ('pan',4)]
dia2 = [('pan',1), ('cereales',3), ('agua',0.5), ('leche',2), ('filetes',5)]
dia3 = [('filetes',2), ('cereales',1)]

rdd = sc.parallelize(dia1).union(sc.parallelize(dia2)).union(sc.parallelize(dia3))
rdd1=rdd.groupByKey()
# [('leche', <pyspark.resultiterable.ResultIterable at 0x7f3494aea130>),
#  ('agua', <pyspark.resultiterable.ResultIterable at 0x7f3494acd4c0>),
#  ('pan', <pyspark.resultiterable.ResultIterable at 0x7f3494acd2b0>),
#  ('cereales', <pyspark.resultiterable.ResultIterable at 0x7f3494acd850>),
#  ('filetes', <pyspark.resultiterable.ResultIterable at 0x7f34944e3be0>),
#  ('azúcar', <pyspark.resultiterable.ResultIterable at 0x7f34944e3fa0>)]

rdd1a = [(x,list(y)) for x,y in rdd1.collect()]
# rdd1a ya no es un RDD, es una lista que se ha cargado en el driver
rdd1b = rdd1.map(lambda x: (x[0], list(x[1])))
#rdd1b sigue siendo un RDD
# [('leche', [2, 2]),
#  ('agua', [2, 0.5]),
#  ('pan', [3, 4, 1]),
#  ('cereales', [3, 1]),
#  ('filetes', [5, 2]),
#  ('azúcar', [1])]

rdd2 = rdd1.map(lambda x: (x[0], sum(x[1])/len(x[1])))
# [('leche', 2.0),
#  ('agua', 1.25),
#  ('pan', 2.6666666666666665),
#  ('cereales', 2.0),
#  ('filetes', 3.5),
#  ('azúcar', 1.0)]

Mejor reduceByKey que groupByKey

Si el tipo de operación a realizar es posible mediante una operación de reduce, su rendimiento será una solución más eficiente. Más información en el artículo Avoid Group By

SortByKey

sortByKey permite ordenar los datos a partir de una clave. Los pares de la misma máquina se ordenan primero por la misma clave, y luego los datos de las diferentes particiones se barajan.

Para ello crearemos una tupla, siendo el primer elemento un valor numérico por el cual ordenar, y el segundo el dato asociado.

Vamos a partir del ejemplo anterior para ordenar los paises por la cantidad de ventas:

# Ejemplo anterior
rdd = sc.textFile("pdi_sales.csv")
paisesUnidades = rdd.map(lambda x: (x.split(";")[-1].strip(), x.split(";")[3]))
header = paisesUnidades.first()
paisesUnidadesSinHeader = paisesUnidades.filter(lambda linea: linea != header)
paisesTotalUnidades = paisesUnidadesSinHeader.reduceByKey(lambda a,b: int(a)+int(b))

# Le damos la vuelta a la lista
unidadesPaises = paisesTotalUnidades.map(lambda x: (x[1],x[0]))
unidadesPaises.collect()

Ahora tendríamos:

[(77609, 'Canada'),
 (327730, 'France'),
 (244265, 'Germany'),
 (223463, 'Mexico')]

Y a continuación los ordenamos:

unidadesPaisesOrdenadas = unidadesPaises.sortByKey()
unidadesPaisesOrdenadas.collect()

Y comprobamos el resultado:

[(77609, 'Canada'),
 (223463, 'Mexico'),
 (244265, 'Germany'),
 (327730, 'France')]

Si quisiéramos obtener los datos en orden descendente, le pasamos False a la transformación:

unidadesPaisesOrdenadasDesc = unidadesPaises.sortByKey(False)

SortBy

Mediante sortBy podemos ordenar los datos indicando nosotros la función de ordenación:

paisesTotalUnidades.sortBy(lambda x: x[1]).collect()

Obteniendo:

[('Canada', 77609),
 ('Mexico', 223463),
 ('Germany', 244265),
 ('France', 327730)]

Si queremos ordenar descendentemente, le pasamos un segundo parámetro con valor False (indica si la ordenación es ascendente):

paisesTotalUnidades.sortBy(lambda x: x[1], False).collect()

Join

Aunque los RDD permitan realizar operaciones join, realmente este tipo de operaciones se realizan mediante DataFrames, por lo que omitimos su explicación en esta sesión y la dejamos para la siguiente.

Particiones

Spark organiza los datos en particiones, considerándolas divisiones lógicas de los datos entre los nodos del clúster. Por ejemplo, si el almacenamiento se realiza en HDFS, cada partición se asigna a un bloque.

Cada una de las particiones va a llevar asociada una tarea de ejecución, de manera que a más particiones, mayor paralelización del proceso.

Veamos con código como podemos trabajar con las particiones:

rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.getNumPartitions() # 4
rdd = sc.parallelize([1,1,2,2,3,3,4,5], 2)
rdd.getNumPartitions() # 2

rddE = sc.textFile("empleados.txt")
rddE.getNumPartitions() # 2
rddE = sc.textFile("empleados.txt", 3)
rddE.getNumPartitions() # 3

La mayoría de operaciones / transformaciones / acciones que trabajan con los datos admiten un parámetro extra indicando la cantidad de particiones con las que queremos trabajar.

MapPartitions

A diferencia de la transformación map que se invoca por cada elemento del RDD/DataSet, mapPartitions se llama por cada partición.

La función que recibe como parámetro recogerá como entrada un iterador con los elementos de cada partición:

rdd = sc.parallelize([1,1,2,2,3,3,4,5], 2)

def f(iterator): yield sum(iterator)
resultadoRdd = rdd.mapPartitions(f)
resultadoRdd.collect()  # [6, 15]

resultadoRdd2 = rdd.mapPartitions(lambda iterator: [list(iterator)])
resultadoRdd2.collect() # [[1, 1, 2, 2], [3, 3, 4, 5]]

En el ejemplo, ha dividido los datos en dos particiones, la primera con [1, 1, 2, 2] y la otra con [3, 3, 4, 5], y de ahí el resultado de sumar sus elementos es [6, 15].

mapPartitionsWithIndex

De forma similar al caso anterior, pero ahora mapPartitionsWithIndex recibe una función cuyos parámetros son el índice de la partición y el iterador con los datos de la misma:

def mpwi(indice, iterador):
    return [(indice, list(iterador))]

resultadoRdd = rdd.mapPartitionsWithIndex(mpwi)
resultadoRdd.collect()
# [(0, [1, 1, 2, 2]), (1, [3, 3, 4, 5])]

Modificando las particiones

Podemos modificar la cantidad de particiones mediante dos transformaciones wide: coalesce y repartition.

Mediante coalesce podemos obtener un nuevo RDD con la cantidad de particiones a reducir:

rdd = sc.parallelize([1,1,2,2,3,3,4,5], 3)
rdd.getNumPartitions() # 3
rdd1p = rdd.coalesce(1)
rdd1p.getNumPartitions() # 2

En cambio, mediante repartition podemos obtener un nuevo RDD con la cantidad exacta de particiones deseadas (al reducir las particiones, repartition realiza un shuffle para redistribuir los datos, por lo tanto, si queremos reducir la cantidad de particiones, es más eficiente utilizar coalesce):

rdd = sc.parallelize([1,1,2,2,3,3,4,5], 3)
rdd.getNumPartitions() # 3
rdd2p = rdd.repartition(2)
rdd2p.getNumPartitions() # 2

Spark UI

Si accedemos a la dirección http://127.0.0.1:4040/, veremos un interfaz gráfico donde podemos monitorizar y analizar el código Spark ejecutado. La barra superior muestra un menú con las opciones para visualizar las jobs, stages, almacenamiento, el entorno y sus variables de configuración, y finalmente los ejecutores:

Spark Shell UI

Por ejemplo, si ejecutamos el ejemplo de groupByKey, obtenemos el siguiente DAG:

Ejemplo de DAG

Si pulsamos por ejemplo sobre la fase de groupBy obtendremos sus estadísticas de ejecución:

Estadísticas de una fase

AWS desde Spark

Para conectar a AWS desde Spark hace falta:

  1. Descargar dos librerías y configurarlas en spark-defaults.conf (igual que hicimos con las librerías de MariaDB / MySQL):
# https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle/1.11.901
spark.driver.extraClassPath = /opt/spark-3.2.0/conf/hadoop-aws-3.2.3.jar:/opt/spark-3.2.0/conf/aws-java-sdk-bundle-1.12.230.jar
spark.executor.extraClassPath = /opt/spark-3.2.0/conf/hadoop-aws-3.2.3.jar:/opt/spark-3.2.0/conf/aws-java-sdk-bundle-1.12.230.jar
  1. Configurar las credencias de AWS en .aws/credentials (esto lo hicimos varias veces en las sesiones de cloud)

  2. Tras crear la sesión de Spark, configurar el proveedor de credenciales:

spark = SparkSession.builder.getOrCreate()

spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")

df = spark.read.csv("s3a://s8a-spark-s3/departure_delays.csv")

Actividades

En las siguientes actividades vamos a familiarizarnos con el uso de Spark con RDD y las diferentes acciones y transformaciones para trabajar con los datos.

  1. A partir de la lista siguiente ['Alicante','Elche','Valencia','Madrid','Barcelona','Bilbao','Sevilla']:

    1. Almacena sólo las ciudades que tengan la letra e en su nombre y muéstralas.
    2. Muestra las ciudades que tienen la letra e y el número de veces que aparece en cada nombre. Por ejemplo ('Elche', 2).
    3. Averigua las ciudades que solo tengan una única e.
    4. Nos han enviado una nueva lista pero no han separado bien las ciudades. Reorganiza la lista y colocalas correctamente, y cuenta las apariciones de la letra e de cada ciudad. ciudades_mal = [['Alicante.Elche','Valencia','Madrid.Barcelona','Bilbao.Sevilla'],['Murcia','San Sebastián','Melilla.Aspe']]
  2. (opcional) A partir del fichero de El Quijote:

    1. Crear un RDD a partir del fichero y crea una lista con todas las palabras del documento.
    2. ¿Cuantas veces aparece la palabra Dulcinea (independientemente de si está en mayúsculas o minúsculas)? ¿Y Rocinante? (86 y 120 ocurrencias respectivamente)
    3. Devuelve una lista ordenada según el número de veces que sale cada palabra de más a menos (las primeras ocurrencias deben ser [('que', 10731), ('de', 9035), ('y', 8668), ('la', 5014), ...).
    4. Almacena el resultado en HDFS en /user/iabd/spark/wcQuijote.
  3. Dada una cadena que contiene una lista de nombres Juan, Jimena, Luis, Cristian, Laura, Lorena, Cristina, Jacobo, Jorge, una vez transformada la cadena en una lista y luego en un RDD:

    1. Agrúpalos según su inicial, de manera que tengamos tuplas formadas por la letra inicial y todos los nombres que comienzan por dicha letra:

      [('J', ['Juan', 'Jimena', 'Jacobo', 'Jorge']),
      ('L', ['Luis', 'Laura', 'Lorena']),
      ('C', ['Cristian', 'Cristina'])]
      
    2. De la lista original, obtén una muestra de 5 elementos sin repetir valores.

    3. Devuelve una muestra de datos de aproximadamente la mitad de registros que la lista original con datos que pudieran llegar a repetirse.
  4. A partir de las siguientes listas:

    • Inglés: hello, table, angel, cat, dog, animal, chocolate, dark, doctor, hospital, computer
    • Español: hola, mesa, angel, gato, perro, animal, chocolate, oscuro, doctor, hospital, ordenador

    Una vez creado un RDD con tuplas de palabras y su traducción (puedes usar zip para unir dos listas):

    [('hello', 'hola'),
     ('table', 'mesa'),
     ('angel', 'angel'),
     ('cat', 'gato')...
    

    Averigua:

    1. Palabras que se escriben igual en inglés y en español
    2. Palabras que en español son distintas que en inglés
    3. Obtén una única lista con las palabras en ambos idiomas que son distintas entre ellas (['hello', 'hola', 'table', ...)
    4. Haz dos grupos con todas las palabras, uno con las que empiezan por vocal y otro con las que empiecen por consonante.
  5. (opcional) Dada una lista de elementos desordenados y algunos repetidos, devolver una muestra de 5 elementos, que estén en la lista, sin repetir y ordenados descendentemente.

    lista = 4,6,34,7,9,2,3,4,4,21,4,6,8,9,7,8,5,4,3,22,34,56,98
    
    1. Selecciona el elemento mayor de la lista resultante.
    2. Muestra los dos elementos menores.
  6. En una red social sobre cine, tenemos un fichero ratings.txt compuesta por el código de la película, el código del usuario, la calificación asignada y el timestamp de la votación con el siguiente formato:

    1::1193::5::978300760
    1::661::3::978302109
    1::914::3::978301968
    

    Se pide crear dos script en Python, así como los comandos necesarios para ejecutarlos (mediante spark-submit) para:

    1. Obtener para cada película, la nota media de todas sus votaciones.
    2. Películas cuya nota media sea superior a 3.

Proyectos

A continuación planteamos dos proyectos para realizar en clase:

  1. Tenemos las calificaciones de las asignaturas de matemáticas, inglés y física de los alumnos del instituto en 3 documentos de texto. A partir de estos ficheros:

    1. Crea 3 RDD de pares, uno para cada asignatura, con los alumnos y sus notas
    2. Crea un solo RDD con todas las notas
    3. ¿Cuál es la nota más baja que ha tenido cada alumno?
    4. ¿Cuál es la nota media de cada alumno?
    5. ¿Cuantos estudiantes suspende cada asignatura?
      [('Mates', 7), ('Física', 8), ('Inglés', 7)]
    6. ¿En qué asignatura suspende más gente?
    7. Total de notables o sobresalientes por alumno, es decir, cantidad de notas superiores o igual a 7.
    8. ¿Qué alumno no se ha presentado a inglés?
    9. ¿A cuántas asignaturas se ha presentado cada alumno?
    10. Obten un RDD con cada alumno con sus notas
  2. Dada la carpeta weblog, tenemos un archivo comprimido con un conjunto de ficheros de log de una web con el el formato Common Log Format:

    • <ipUsuario> - <idUsuario> - [<fecha>] "<peticionHTTP>" <codigoHTTP> <bytesTransmitidos> <dominio> <browser>

    Un fragmento de un fichero sería similar a:

    116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100] "GET /hive-00031.html HTTP/1.0" 200 1388 "http://www.cursoDeFormacionBigData.com"  "Opera 3.4"
    116.180.70.237 - 128 [15/Sep/2013:23:59:53 +0100] "GET /theme.css HTTP/1.0" 200 5531 "http://www.cursoDeFormacionBigData.com"  "Opera 3.4"
    
    1. Carga los logs de la carpeta weblogs
    2. Crea un RDD de pares a partir de estos ficheros con el usuario como clave y un valor 1
      [('128', 1), ('128', 1), ('94', 1),...)
    3. Crea un nuevo RDD con la suma de los valores para cada identificador de usuario [('54126', 32), ('54', 498), ('21443', 35), ...)
    4. Muestra los 10 usuarios que más peticiones han realizado.
      [('193', 667), ('13', 636), ('24', 620),...
    5. Crea un nuevo RDD de pares donde la clave sea el código del usuario y el valor una lista de IP desde donde se ha conectado el usuario (las IP se pueden repetir).

      [('54126',
          ['176.80.85.20',
          '176.80.85.20',
          '216.249.102.215',
          '216.249.102.215',
      

      ¿Y si no nos interesa que las IP estén repetidas?

    6. Necesitamos contar cuantas veces se han enviado datos en formato png, jpg, html, css y txt. Para ello, a partir de la <peticionHTTP> utiliza RDD de pares.

      [('js', 25212),
      ('html', 173049),
      ('png', 4357),
      ('jpg', 24743),
      ('txt', 6096),
      ('css', 180632)]
      

Referencias