Saltar a contenido

Apache Kafka

Introducción

Apache Kafka es, en pocas palabras, un middleware de mensajería entre sistemas heterogéneos, el cual, mediante un sistema de colas (topics, para ser concreto) facilita la comunicación asíncrona, desacoplando los flujos de datos de los sistemas que los producen o consumen. Funciona como un broker de mensajes, encargado de enrutar los mensajes entre los clientes de un modo muy rápido.

Supongamos que tenemos múltiples generadores de datos, ya sean servidores web, de bases de datos, un servidor de chat y que todos ellos tienen que almacenar sus datos en múltiples destinos, como pueden ser logs, métricas de rendimiento y monitorización, el carrito de la compra o los fallos ocurridos, lo que puede provocar una serie de dependencias de unos con otros. Para evitarlo, Kafka viene al rescate conectando todos los generadores de datos (productores) a Kafka y a su vez, a todos los consumidores de estos datos.

Kafka como middleware/broker de mensajes

En concreto, se trata de una plataforma open source distribuida de transmisión de eventos/mensajes en tiempo real con almacenamiento duradero y que proporciona de base un alto rendimiento (capaz de manejar billones de peticiones al día, con una latencia inferior a 10ms), tolerancia a fallos, disponibilidad y escalabilidad horizontal (mediante cientos de nodos).

Evento / Mensaje

Dentro del vocabulario asociado a arquitecturas asíncronas basadas en productor/consumidor o publicador/suscriptor, se utiliza el mensaje para indicar el dato que viaja desde un punto a otro. En Kafka, además de utilizar el concepto mensaje, se emplea el término evento.

Más del 80% de las 100 compañías más importantes de EEUU utilizan Kafka: Uber, Twitter, Netflix, Spotify, Blizzard, LinkedIn, Spotify, y PayPal procesan cada día sus mensajes con Kafka.

Como sistema de mensajes, sigue un modelo publicador-suscriptor. Su arquitectura tiene dos directivas claras:

  • No bloquear los productores (para poder gestionar la back pressure, la cual sucede cuando un publicador produce más elementos de los que un suscriptor puede consumir).
  • Aislar los productores y los consumidores, de manera que los productores y los consumidores no se conocen.

A día de hoy, Apache Kafka se utiliza, además de como un sistema de mensajería, para ingestar datos, realizar procesado de datos en streaming y analítica de datos en tiempo real, así como en arquitectura de microservicios y sistemas IOT.

Amazon Kinesis

Amazon Kinesis es un producto similar a Apache Kafka pero dentro de la plataforma AWS, por lo que no es un producto open source como tal. Su principal ventaja es la facilidad de escalabilidad a golpe de click e integración con el resto de servicios que ofrece AWS. Se trata de una herramienta muy utilizada que permite incorporar datos en tiempo real, como vídeos, audios, registros de aplicaciones, secuencias de clicks de sitios web y datos de sensores IoT para machine learning, analítica de datos en streaming, etc...

Confluent

Confluent es una solución PaaS que ofrece el despliegue y la monitorización de Kafka en un producto. Pese a existir una versión Community que podemos probar en local (incluso mediante Docker), los requisitos a nivel de RAM son bastante altos, ya que requiere un mínimo de 8GB de RAM sólo para Kafka, con lo que el ordenador host debe tener mínimo 12GB de RAM.

Publicador / Suscriptor

Antes de entrar en detalle sobre Kafka, hay que conocer el modelo publicador/suscriptor. Este patrón también se conoce como publish / subscribe o productor / consumidor.

Hay tres elementos que hay que tener realmente claros:

  • Publicador (publisher / productor / emisor): genera un dato y lo coloca en un topic como un mensaje.
  • topic (tema): almacén temporal/duradero que guarda los mensajes funcionando como una cola.
  • Suscriptor (subscriber / consumidor / receptor): recibe el mensaje.

Cabe destacar que un productor no se comunica nunca directamente con un consumidor, siempre lo hace a través de un topic:

Productor - Consumidor

Caso 0: Hola Kafka

Para arrancar Kafka, vamos a utilizar la instalación que tenemos creada en nuestra máquina virtual.

Kafka mediante Docker

Bitnami tiene una imagen para trabajar con Docker la cual permite probar todos los ejemplos de esta sesión. Para ello, se recomienda seguir los pasos de la página oficial: https://hub.docker.com/r/bitnami/kafka/

El primer paso, una vez dentro de la carpeta de instalación de Kafka (en nuestro caso /opt/kafka_2.13-3.3.1), es arrancar Zookeeper mediante el comando zookeeper-server-start.sh, el cual se encarga de gestionar la comunicación entre los diferentes brokers:

zookeeper-server-start.sh ./config/zookeeper.properties

zookeeper.properties

Del archivo de configuración de Zookeeper conviene destacar dos propiedades:

  • clientPort: puerto por defecto (2181)
  • dataDir: indica donde está el directorio de datos de Zookeeper (por defecto es tmp/zookeeper, pero si queremos que dicha carpeta no se elimine es mejor que apunte a una ruta propia, por ejemplo /opt/zookeeper-data)

Para comprobar que Zookeeper está arrancado, podemos ejecutar el comando lsof -i :2181, el cual escanea el puerto 2181 donde está corriendo Zookeeper.

Una vez comprobado, en un nuevo terminal, arrancamos el servidor de Kafka mediante el comando kafka-server-start.sh (de manera que tenemos corriendo a la vez Zookeeper y Kafka):

kafka-server-start.sh ./config/server.properties

Instalación de pruebas

En nuestra máquina virtual, en los archivos de configuración, hemos puesto que todas las rutas utilicen /tmp para no llenar el disco duro de datos, en concreto la propiedad log.dirs.

Por ello, entre cada reinicio de la máquina virtual, tendremos que volver a crear los diferentes topics, habiendo perdido los mensajes que no se hubieran consumido.

Creando un topic

A continuación, en un tercer terminal, vamos a crear un topic mediante el comando kafka-topics.sh, utilizando el parámetro --create:

kafka-topics.sh --create --topic iabd-topic --bootstrap-server iabd-virtualbox:9092

Si quisiéramos comprobar los topics que hemos creado, podemos obtener un listado mediante el parámetro --list:

kafka-topics.sh --list --bootstrap-server iabd-virtualbox:9092

Si queremos obtener la descripción del topic creado con la cantidad de particiones le pasamos el parámetro --describe:

kafka-topics.sh --describe --topic iabd-topic --bootstrap-server iabd-virtualbox:9092

Obteniendo la siguiente información:

Topic: iabd-topic       TopicId: ogKnRpOFS7mfOhspLcuB4A PartitionCount: 1       ReplicationFactor: 1      Configs: segment.bytes=1073741824
        Topic: iabd-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Produciendo mensajes

Para enviar un mensaje a un topic, ejecutaremos en un cuarto terminal un productor mediante el comando kafka-console-producer.sh. Por defecto, cada línea que introduzcamos resultará en un evento separado que escribirá un mensaje en el topic (podemos pulsar CTRL+C en cualquier momento para cancelar):

kafka-console-producer.sh --topic iabd-topic --bootstrap-server iabd-virtualbox:9092

Así pues, escribimos los mensajes que queramos:

>Este es un mensaje
>Y este es otro
>Y el tercero

Consumiendo mensajes

Y finalmente, en otro terminal, vamos a consumir los mensajes:

kafka-console-consumer.sh --topic iabd-topic --from-beginning --bootstrap-server iabd-virtualbox:9092

Al ejecutarlo veremos los mensajes que habíamos introducido antes (ya que hemos indicado la opción --from-beginning). Si ahora volvemos a escribir en el productor, casi instantáneamente, aparecerá en el consumidor el mismo mensaje.

Tras esto, paramos todos los procesos que se están ejecutando mediante CTRL+C y hemos finalizado nuestro primer contacto con Kafka.

Elementos

Dentro de una arquitectura con Kafka, existen múltiples elementos que interactúan entre sí.

Topic y Particiones

Un topic (¿tema?) es un flujo particular de datos que funciona como una cola almacenando de forma temporal o duradera los datos que se colocan en él.

Podemos crear tantos topics como queramos y cada uno de ellos tendrá un nombre unívoco.

Un topic se divide en particiones, las cuales se numeran, siendo la primera la 0. Al crear un topic podemos indicar la cantidad de particiones inicial, la cual podemos modificar a posteriori (en nuestra máquina virtual, en el archivo server.properties tenemos configurado que, por defecto, cada topic tenga una sola partición mediante la propiedad num.partitions=1).

Al crear un topic, si queremos indicar la cantidad de particiones, hemos de pasarle el parámetro --partitions y el número de particiones deseadas:

kafka-topics.sh --create --topic iabd-topic-p3 --partitions 3 --bootstrap-server iabd-virtualbox:9092

Cada partición está ordenada, de manera que cada mensaje dentro de una partición tendrá un identificador incremental, llamado offset (desplazamiento). Cada partición funciona como un commit log almacenando los mensajes que recibe.

Offset dentro de las particiones de un topic

Como podemos observar en la imagen, cada partición tiene sus propios offset (el offset 3 de la partición 0 no representa el mismo dato que el offset 3 de la partición 1).

Habíamos comentado que las particiones están ordenadas, pero el orden sólo se garantiza dentro de una partición (no entre particiones), es decir, el mensaje 7 de la partición 0 puede haber llegado antes, a la vez, o después que el mensaje 5 de la partición 1.

Los datos de una partición tiene un tiempo de vida limitado (retention period) que indica el tiempo que se mantendrán los mensajes antes de eliminarlos. Por defecto es de una semana. Además, una vez que los datos se escriben en una partición, no se pueden modificar (las mensajes son inmutables).

Finalmente, por defecto, los datos se asignan de manera aleatoria a una partición. Sin embargo, existe la posibilidad de indicar una clave de particionado.

Borrando un topic

Para eliminar un topic, le pasaremos el parámetro --delete:

kafka-topics.sh --delete --topic iabd-topic --bootstrap-server iabd-virtualbox:9092

Al borrar un topic, sus índices con el histórico no se eliminan, de manera que volver a crear un topic con el mismo nombre es una mala idea ya que podemos obtener datos incongruentes.

Brokers

Un clúster de Kafka está compuesto de múltiples nodos conocidos como Brokers, donde cada broker es un servidor de Kafka. Cada broker se identifica con un id, el cual debe ser un número entero.

Cada broker contiene un conjunto de particiones, de manera que un broker contiene parte de los datos, nunca los datos completos ya que Kafka es un sistema distribuido. Al conectarse a un broker del clúster (bootstrap broker), automáticamente nos conectaremos al clúster entero.

Para comenzar se recomienda una arquitectura de 3 brokers, aunque algunos clústers lo forman cerca de un centenar de brokers.

Por ejemplo, el siguiente gráfico muestra un clúster con tres brokers, de manera que el topic A está dividido en tres particiones, cada una de ellas residiendo en un broker diferente (no hay ninguna relación entre el número de la partición y el nombre del broker), y el topic B está a su vez dividido en dos particiones:

Ejemplo de 3 brokers

En el caso de haber introducido un nuevo topic con 4 particiones, uno de los brokers contendría dos particiones.

Factor de replicación

Para soportar la tolerancia a fallos, los topics deben tener un factor de replicación mayor que uno (normalmente se configura entre 2 y 3).

En la siguiente imagen podemos ver como tenemos 3 brokers, y un topic A con dos particiones y un factor de replicación de 2, de manera que cada partición crea una réplica de sí misma:

Divisiones de un broker en particiones

Para crear la configuración del gráfico, debemos indicar el factor de replicación mediante --replication-factor:

kafka-topics.sh --create --topic TopicA --partitions 2 --replication-factor 2 --bootstrap-server iabd-virtualbox:9092

Si se cayera el broker 102, Kafka podría devolver los datos al estar disponibles en los nodos 101 y 103.

Réplica líder

Acabamos de ver que cada broker tiene múltiples particiones, y cada partición tiene múltiples réplicas, de manera que, si se cae un nodo/broker, Kafka puede utilizar otro broker para servir los datos.

En cualquier instante, una determinada partición tendrá una única réplica que será la líder, y esta réplica líder será la única que pueda recibir y servir los datos de una partición. La réplica líder es importante porque todas las lecturas y escrituras siempre van a esta réplica. El resto de brokers sincronizarán sus datos. En resumen, cada partición tendrá un líder y múltiples ISR (in-sync replica).

Réplicas de una partición

Si se cayera el Broker 101 , entonces la partición 0 del Broker 102 se convertiría en la líder. Y cuando vuelva a funcionar el Broker 101, intentará volver a ser la partición líder.

Productores

Los productores escriben datos en los topics, sabiendo automáticamente el broker y la partición en la cual deben escribir. En el caso de un fallo de un broker, los productores automáticamente se recuperan y se comunican con el broker adecuado.

Si el productor envía los datos sin una clave determinada, Kafka utiliza un algoritmo de Round Robin, de manera que cada mensaje se va alternando entre los diferentes brokers.

La carga se balancea entre los brokers

Podemos configurar los productores para que reciban un ACK de las escrituras de los datos con los siguientes valores:

  • ack=0: El productor no espera la confirmación (posible pérdida de datos), lo que se traduce en un envío asíncrono.
  • ack=1: El productor espera la confirmación del líder (limitación de la pérdida de datos), de manera que los envíos son síncronos.
  • ack=all: El productor espera la confirmación del líder y de todas las réplicas (sin pérdida de datos).

Clave de mensaje

Los productores pueden enviar una clave con el mensaje (de tipo cadena, numérico, etc...). Cuando la clave no se envía, ya hemos comentado que los datos se envían mediante Round Robin (primero Broker 101, luego el 102, el 103, etc... y vuelta al 101).

Kafka trabaja con pares clave-valor, de manera que si no indicamos la clave, de considerará por defecto como nula y por tanto, la partición se identifica mediante round robin. Si se envía la clave, todos los mensajes con la misma clave siempre irán a la misma partición. Por lo tanto, enviaremos una clave cuando necesitemos ordenar los mensajes por un campo específico (por ejemplo, el identificador de una operación).

Para enviar mensajes con la clave, desde el terminal, necesitamos indicar dos propiedades mediante --property:

  • parse.key: si es true, obligatoriamente enviaremos la clave (por defecto es false)
  • key.separator: carácter para separar la clave del valor, por ejemplo, :, ;, ....

Así pues, podemos enviar mensajes con clave mediante:

kafka-console-producer.sh --topic iabd-topic --property "parse.key=true" --property "key.separator=:" \
    --bootstrap-server iabd-virtualbox:9092

Y luego enviar mensajes del estilo:

clave1:valor1
clave2:valor2
clave1:valor3
customer_id: {"customer_id":"1", "customer_fname": "Aitor", "customer_lname": "Medrano", "customer_email": "a.medrano@edu.gva.es"}

Consumidores

Los consumidores obtienen los datos de los topics y las particiones, y saben de qué broker deben leer los datos. Igual que los productores, en el caso de un fallo de un broker, los consumidores automáticamente se recuperan y se comunican con el broker adecuado.

Los datos se leen en orden dentro de cada partición, de manera que el consumidor no podrá leer, por ejemplo, los datos del offset 6 hasta que no haya leído los del offset 5. Además, un consumidor puede leer de varias particiones (se realiza en paralelo), pero el orden sólo se respeta dentro de cada partición, no entre particiones:

Los consumidores leen en orden dentro de cada partición

Grupo de consumidores

Un consumidor puede pertenecer a un grupo de consumidores, de manera que cada uno de los consumidores del grupo obtendrán una parte de los datos, es decir, una partición de un topic.

Por ejemplo, tenemos una aplicación compuesta de dos consumidores, formando un grupo de consumidores. El consumidor 1 lo hará de dos particiones, y el consumidor 2 lo hará de la tercera partición. También tenemos otra aplicación compuesta de tres consumidores, de manera que cada consumidor lo hará de cada una de las particiones. Finalmente, tenemos un tercer grupo de consumidores formado por un único consumidor que leerá las tres particiones. En conclusión, cada grupo de consumidores funciona como un único consumidor de manera que accede a todas las particiones de un topic.

Grupos de consumidores

Coordinando los consumidores

Los consumidores, por sí solos, no saben con qué partición se deben comunicar. Para ello, se utiliza un GroupCoordinator y un Consumer Coordinator para asignar los consumidores a cada partición. Esta gestión la realiza Kafka.

Cabe destacar que los diferentes grupos de consumidores reciben el mismo dato de cada partición, es decir, el consumidor 1 del grupo 1 y el consumidor 1 del grupo 2 reciben la información que había en la partición 0. Este caso de uso es muy útil cuando tenemos dos aplicaciones que queremos que reciban los mismos datos (por ejemplo, uno encargado de realizar machine learning y otro analítica de datos).

En el caso de tener más consumidores que particiones, algunos consumidores no realizarán nada. Este caso de uso es atípico, ya que lo recomendable es tener tantos consumidores como el mayor número de particiones existentes.

Probando los grupos de consumidores

Vamos a simular el gráfico anterior mediante un ejemplo con el terminal. Primero crearemos un topic que contenga tres particiones:

kafka-topics.sh --create --topic iabd-topic-group \
    --bootstrap-server iabd-virtualbox:9092 --partitions 3

Si comprobamos el estado del topic mediante:

kafka-topics.sh --describe --topic iabd-topic-group \
    --bootstrap-server iabd-virtualbox:9092

Obtendremos la siguiente información:

Topic: iabd-topic-group TopicId: p1i3m4fMRximngLjAV5rsA PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: iabd-topic-group Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: iabd-topic-group Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: iabd-topic-group Partition: 2    Leader: 0       Replicas: 0     Isr: 0

A continuación, en dos pestañas diferentes, vamos a crear dos consumidores que pertenezcan al mismo grupo de consumidores:

kafka-console-consumer.sh --topic iabd-topic-group \
    --group iabd-app1 \
    --bootstrap-server iabd-virtualbox:9092

Y finalmente, creamos un nuevo productor sobre el topic:

kafka-console-producer.sh --topic iabd-topic-group \
    --bootstrap-server iabd-virtualbox:9092

Y si creamos varios mensajes en el productor, veremos cómo van llegando de manera alterna a los diferentes consumidores:

Ejemplo de grupo de consumidores

Autoevaluación

  • ¿Qué sucederá se creamos un nuevo consumidor que lo haga del mismo topic pero con un grupo de consumidores diferente (por ejemplo, iabd-app2) y le pedimos que lea los mensajes desde el principio (mediante --from-beginning) ?
    Que aparecerán todos los mensajes desde el principio.
  • ¿Y si lo detenemos y volvemos a crear el mismo consumidor (también con el grupo de consumidores iabd-app2 y los vuelva a leer desde el principio también)?
    En esta ocasión, ya no recibirá ningún mensaje, ya que el primer consumidor hace commit de la lectura y el segundo al hacerlo desde el mismo grupo de consumidores ya tiene los mensajes previos marcados como leídos.
  • ¿Y si detenemos todos los consumidores y seguimos creando mensajes en el productor?
    Los mensajes se almacenan en el topic.
  • ¿Y si arrancamos de nuevo un consumidor sobre el grupo de consumidores iabd-app2?
    Que consumirá los mensajes que acabamos de crear.

Mediante el comando kafka-consumer-groups.sh podemos obtener información sobre los diferentes grupos de consumidores que tenemos creados, así como eliminarlos o resetear sus offsets.

Por ejemplo, si queremos listar los grupos de consumidores existentes ejecutaremos:

kafka-consumer-groups.sh --list \
    --bootstrap-server iabd-virtualbox:9092

En cambio, si queremos obtener la información de un determinado grupo ejecutaremos:

kafka-consumer-groups.sh --describe --group iabd-app1 \
    --bootstrap-server iabd-virtualbox:9092

Obteniendo información a destacar como:

  • CURRENT-OFFSET: valor actual del offset
  • LOG-END-OFFSET: offset del último mensaje de la partición
  • LAG: cantidad de mensajes pendientes de leer
GROUP           TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                               HOST            CLIENT-ID
iabd-app1       iabd-topic-group 0          4               4               0               consumer-iabd-app1-1-405b2b39-2252-4e12-ba55-00a579441df2 /127.0.0.1      consumer-iabd-app1-1
iabd-app1       iabd-topic-group 1          2               2               0               consumer-iabd-app1-1-405b2b39-2252-4e12-ba55-00a579441df2 /127.0.0.1      consumer-iabd-app1-1
iabd-app1       iabd-topic-group 2          4               4               0               consumer-iabd-app1-1-8f09bc45-8e8c-46d2-9c9c-cf6bd3a5fdc7 /127.0.0.1      consumer-iabd-app1-1

Si por ejemplo, con todos los consumidores detenidos, mediante un productor lanzamos 5 mensajes nuevos, estos se quedarán en el topic a la espera de ser consumidos, y se habrán repartidos entre las diferentes particiones. Si volvemos a lanzar el comando anterior obtendríamos:

Consumer group 'iabd-app1' has no active members.

GROUP           TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
iabd-app1       iabd-topic-group 2          4               5               1               -               -               -
iabd-app1       iabd-topic-group 1          2               4               2               -               -               -
iabd-app1       iabd-topic-group 0          4               6               2               -               -               -

Offsets de Consumidor

Kafka almacena los offsets por el que va leyendo un grupo de consumidores, a modo de checkpoint, en un topic llamado __consumer_offsets.

Cuando un consumidor de un grupo ha procesado los datos que ha leído de Kafka, realizará un commit de sus offsets. Si el consumidor se cae, podrá volver a leer los mensajes desde el último offset sobre el que se realizó commit.

Por ejemplo, supongamos que tenemos un consumidor el cual ha hecho un commit tras el offset 4262. Tras el commit seguimos leyendo los siguientes mensajes: 4263, 4264, 4265 y de repente el consumidor se cae sin haber hecho commit de esos mensajes. Cuando el consumidor vuelva a funcionar, volverá a leer los mensajes desde el 4263, asegurándose que no se ha quedado ningún mensaje sin procesar.

Offsets de consumidor

El commit de los mensajes está muy relacionado con la semántica de la entrega. Los consumidores eligen cuando realizar el commit de los offsets:

  • As most once: se realiza el commit del mensaje tan pronto como se recibe el mensaje. Si falla su procesamiento, el mensaje se perderá (y no se volverá a leer).
  • At least once (opción más equilibrada): El commit se realiza una vez procesado el mensaje. Este enfoque puede resultar en un procesado duplicado de los mensajes, por lo que hemos de asegurarnos que son idempotentes (el volver a procesar un mensaje no tendrá un impacto en el sistema)
  • Exactly once: sólo se puede conseguir utilizando flujos de trabajo de Kafka con Kafka mediante el API de Kafka Streams. Si necesitamos la interacción de Kafka con un sistema externo, como una base de datos, se recomienda utilizar un consumidor idempotente que nos asegura que no habrá duplicados en la base de datos.

Descubrimiento de brokers

Cada broker de Kafka es un bootstrap server, lo que significa que dicho servidor contiene un listado con todos los nodos del clúster, de manera que al conectarnos a un broker, automáticamente nos conectaremos al clúster entero.

Mediante esta configuración, cada broker conoce todos los brokers, topics y particiones (metadatos del clúster).

Así pues, cuando un cliente se conecta a un broker, también realiza una petición de los metadatos, y obtiene un listado con todos los brokers. Tras ello, ya puede conectarse a cualquiera de los brokers que necesite:

Descubrimiento de brokers

Zookeeper

En la primera sesión de Hadoop ya vimos que ZooKeeper es un servicio para mantener la configuración, coordinación y aprovisionamiento de aplicaciones distribuidas dentro del ecosistema de Apache. No sólo se utiliza en Hadoop, pero es muy útil ya que elimina la complejidad de la gestión distribuida de la plataforma.

En el caso de Kafka, Zookeeper:

  • gestiona los brokers (manteniendo una lista de ellos).
  • ayuda en la elección de la partición líder
  • envía notificaciones a Kafka cuando hay algún cambio (por ejemplo, se crea un topic, se cae un broker, se recupera un broker, al eliminar un topic, etc...).

Por todo ello, Kafka no puede funcionar sin Zookeeper.

En un entorno real, se instalan un número impar de servidores Zookeeper (3, 5, 7). Para su gestión, Zookeeper define un líder (gestiona las escrituras) y el resto de los servidores funcionan como réplicas de lectura.

Kafka y Zookeeper

Pese a su dependencia, los productores y consumidores no interactúan nunca con Zookeeper, sólo lo hacen con Kafka.

Kraft

Aunque podemos decir que todavía no es la opción más recomendable en producción, también podemos utilizar una instalación de Kafka sin Zookeeper haciendo uso de Kraft, el cual ofrece un nuevo protocolo de consenso y evita tener una infraestructura extra para Zookeeper.

En Resumen

Kafka garantiza que...

  • Los mensajes se añaden a una partición/topic en el orden en el que se envían
  • Los consumidores leen los mensajes en el orden en que se almacenaron en la partición/topic
  • Con un factor de replicación N, los productores y consumidores pueden soportar que se caigan N-1 brokers.
    • Por ejemplo, con un factor de replicación de 3 (el cual es un valor muy apropiado), podemos tener un nodo detenido para mantenimiento y podemos permitirnos que otro de los nodos se caiga de forma inesperada.
  • Mientras el número de particiones de un topic permanezca constante (no se hayan creado nuevas particiones), la misma clave implicará que los mensajes vayan a la misma partición.

Caso 1: Kafka y Python

Para poder producir y consumir mensajes desde Python necesitamos instalar la librería Kafka-python:

pip install kafka-python

KafkaConsumer

Vamos a crear un consumidor, mediante un KafkaConsumer, que escuche de nuestro servidor de Kafka:

consumer.py
from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    'iabd-topic',
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='iabd-grupo-1',
    value_deserializer=lambda m: loads(m.decode('utf-8')),
    bootstrap_servers=['iabd-virtualbox:9092'])

for m in consumer:
    print(m.value)

Al crear el consumidor, configuramos los siguientes parámetros:

  • en el primer parámetro indicamos el topic desde el que vamos a consumir los mensajes
  • bootstrap_servers: listado de brokers de Kafka
  • auto_offset_reset: le indica al consumidor desde donde empezar a leer los mensajes si se cae: earliest se moverá hasta el mensaje más antiguo y latest al más reciente.
  • enable_auto_commit: si True, el offset del consumidor realizará periódicamente commit en segundo plano.
  • value_deserializer: método utilizado para deserializar los datos. En este caso, transforma los datos recibidos en JSON.

KafkaProducer

Y para el productor, mediante un KafkaProducer, vamos a enviar 10 mensajes en formato JSON mediante el método send, pasándole como primer parámetro el topic y luego el mensaje en sí:

producer.py
from kafka import KafkaProducer
from json import dumps
import time

producer = KafkaProducer(
    value_serializer=lambda m: dumps(m).encode('utf-8'),
    bootstrap_servers=['iabd-virtualbox:9092'])

for i in range(10):
    producer.send("iabd-topic", value={"nombre": "producer " + str(i)})
# Como el envío es asíncrono, para que no se salga del programa antes de enviar el mensaje, esperamos 1 seg
time.sleep(1)
# producer.flush()

Tras ejecutar ambos programas en pestañas diferentes, en la salida del consumidor recibiremos:

{'nombre': 'producer 0'}
{'nombre': 'producer 1'}
{'nombre': 'producer 2'}
{'nombre': 'producer 3'}
{'nombre': 'producer 4'}
{'nombre': 'producer 5'}
{'nombre': 'producer 6'}
{'nombre': 'producer 7'}
{'nombre': 'producer 8'}
{'nombre': 'producer 9'}

Referencias

Actividades

  1. (RA5075.1 / CE5.1a / 0.5p) Realiza el caso de uso 0, adjuntando capturas de los comandos utilizados y, en alguno de los mensajes, envía tu nombre.

  2. (RA5075.1 / CE5.1a / 1p) Se pide:

    1. Crea un topic llamado iabd-topic-<nombre> con 4 particiones y un factor de replicación 2 y que permita el envío de claves de mensaje utilizando : como separador.
    2. A continuación, lanza dos consumidores que pertenezcan al grupo de consumidores consumer-group-iabd.
    3. Lanza un productor y envía varios mensajes compuestos de clave:valor y comprueba cómo aparecen en los consumidores.
    4. Detén ambos consumidores.
    5. Envía un nuevo mensaje.
    6. Obtén información sobre el estado del grupo de consumidores consumer-group-iabd y explica sus valores.
  3. (RA5075.1 / CE5.1a / 1.5p) A partir de un topic denominado iabd-python-topic y utilizando Python crea:

    1. un productor que envíe datos de personas cada 10 segundos al topic.

      Para ello, podéis basaros en el siguiente script para la creación de las personas, el cual utiliza la librería Faker y que, cada 10 segundos, crea ficheros con una cantidad aleatoria de datos de personas:

      personas.py
      from faker import Faker
      import json, time
      from random import randint
      from datetime import date
      
      fake = Faker('es_ES')
      today = str(date.today())
      i=0
      
      while True:
          filename = 'datosClientes' + today + '_' + str(i) + '.json'
          output=open(filename,'w')
      
          datos={}
          datos['records']=[]
          cant_personas = randint(5,20)
      
          for x in range(cant_personas):
              # datos de un persona
              data={"nombre":fake.name(),
              "edad":fake.random_int(min=18, max=80, step=1),
              "calle":fake.street_address(),
              "ciudad":fake.city(),
              "provincia":fake.state(),
              "cp":fake.postcode(),
              "longitud":float(fake.longitude()),
              "latitud":float(fake.latitude())}
      
              datos['records'].append(data)
      
          # Persistimos los datos en el fichero              
          json.dump(datos, output, indent = 6)
      
          i = i + 1
      
          time.sleep(10) # 10 segundos   
      
    2. un consumidor que reciba las personas y, mediante PyMongo, las inserte en MongoDB en una colección llamada kafka_personas.

      Para este apartado se recomienda revisar la sesión de MongoDB y Python

    Adjunta el código fuente de ambos scripts, así como capturas de pantalla de la colección en MongoDB con datos rellenados.