Saltar a contenido

MongoDB y Python

Para acceder a MongoDB desde Python nos vamos a centrar en la librería PyMongo.

Para instalar la librería mediante pip usaremos el comando (recuerda hacerlo dentro de un entorno virtual):

pip install pymongo

Se recomienda consultar la documentación o el API para cualquier duda o aclaración.

Versión

En el momento de escribir los apuntes, estamos utilizando la versión 4.3.2 de PyMongo.

Hola PyMongo

Un ejemplo básico podría ser similar a:

from pymongo import MongoClient

cliente = MongoClient('mongodb://localhost:27017')
iabd_db = cliente.iabd

# Recuperamos las colecciones
print(iabd_db.list_collection_names())

people_coll = iabd_db.people

# Recuperamos una persona
persona = people_coll.find_one()
print(persona)

MongoClient

A partir de la URI de conexión a MongoDB, hemos de instanciar la clase MongoClient, ya sea pasándole una URI con la cadena de conexión, o mediante parámetros:

uri = "mongodb+srv://usuario:contrasenya@host"
cliente = MongoClient(uri)

Parámetros adicionales

A la hora de crear el cliente, también podemos indicarle diferentes opciones de configuración:

cliente200Retry = MongoClient(uri, connectTimeoutMS=200, retryWrites=True)

Realmente, al instanciar el cliente, se inicializa un pool de conexiones, de manera que se crean 100 conexiones, y en vez de crear y destruir una conexión por cada petición, se reutilizan, de manera que cada petición asigna y libera una conexión conforme necesidad. Por defecto, el tamaño del pool es de 100 conexiones.

Pool de conexiones

Podemos obtener información de la conexión mediante la propiedad state. Por ejemplo, en nuestro caso, nos hemos conectado a MongoAtlas y de la salida del estado podemos ver los diferentes hosts que forman parte del conjunto de réplicas:

cliente = MongoClient('mongodb+srv://iabd:iabdiabd@cluster0.dfaz5er.mongodb.net/')
print(cliente.state)
# Database(MongoClient(host=['ac-opunia9-shard-00-01.dfaz5er.mongodb.net:27017', 'ac-opunia9-shard-00-02.dfaz5er.mongodb.net:27017', 'ac-opunia9-shard-00-00.dfaz5er.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-4wikkb-shard-0', tls=True), 'state')

Conexión a un réplica

Si nos queremos conectar a un conjunto de réplicas e indicar los nodos a los que nos podemos conectar, podemos separar los hosts con comas e indicar finalmente con un parámetro el nombre del conjunto de réplicas:

client = MongoClient('mongodb://usuario1:contra1@host1:puerto1,host2:puerto2/?replicaSet=nombreReplica')

A partir del cliente, podemos obtener información diversa. Por ejemplo, podemos recuperar un listado de las bases de datos mediante list_database_names():

print(cliente.list_database_names())
# ['sample_airbnb', 'sample_analytics', 'sample_geospatial', 'sample_guides', 'sample_mflix', 'sample_restaurants', 'sample_supplies', 'sample_training', 'sample_weatherdata', 'admin', 'local']

Para conectarnos a una base de datos en concreto, únicamente accederemos a ella como una propiedad del cliente:

bd = cliente.sample_mflix
bd = cliente["sample_mflix"] # tb podemos acceder como si fuera un diccionario

Bases de datos y colecciones Lazy

Es conveniente tener en cuenta que tantos las colecciones como las bases de datos se crean y carga de forma perezosa, esto es, hasta que no realizamos una operación sobre ellas, no se accede realmente a ellas. Así pues, para crear realmente una colección, hemos de insertar un documento en ella.

Una vez tenemos la base de datos, el siguiente paso es obtener una colección:

coleccion = bd.movies
coleccion = bd["movies"] 

Si queremos obtener el nombre de todas las colecciones usaremos el método list_collection_names():

print(bd.list_collection_names())
['users', 'movies', 'sessions', 'theaters', 'comments']

Tolerancia a fallos!

Al crear una conexión, conviene especificar el tamaño del pool y el timeout de las conexiones:

db = MongoClient(URI, maxPoolSize=50, wtimeout=2500)[nombreBD]

Se recomienda siempre especificar un wtimeout, ya que cuando se realiza una escritura con mayoría de escrituras, si fallase algún nodo no se quedaría colgado esperando.

Si no encontrase el servidor, lanzará un ServerSelectionTimeoutError el cual deberíamos capturar.

Primeras consultas

Finalmente, sobre una colección ya podemos realizar consultas y otras operaciones:

movies = bd.movies
movies.count_documents({})
# 23530
movies.find_one()
# {'_id': ObjectId('573a1390f29313caabcd4135'), 'plot': 'Three men hammer on an anvil and pass a bottle of beer around.', 'genres': ['Short'], 'runtime': 1, 'cast': ['Charles Kayser', 'John Ott'], 'num_mflix_comments': 1, 'title': 'Blacksmith Scene', 'fullplot': 'A stationary ...', 'countries': ['USA'], 'released': datetime.datetime(1893, 5, 9, 0, 0), 'directors': ['William K.L. Dickson'], 'rated': 'UNRATED', 'awards': {'wins': 1, 'nominations': 0, 'text': '1 win.'}, 'lastupdated': '2015-08-26 00:03:50.133000000', 'year': 1893, 'imdb': {'rating': 6.2, 'votes': 1189, 'id': 5}, 'type': 'movie', 'tomatoes': {'viewer': {'rating': 3.0, 'numReviews': 184, 'meter': 32}, 'lastUpdated': datetime.datetime(2015, 6, 28, 18, 34, 9)}}

Por ejemplo, podemos filtrar las películas cuya actriz sea Salma Hayek. Al realizar la consulta, find devuelve un cursor, el cual podemos recorrer:

cursor = movies.find( { "cast": "Salma Hayek" } )
for movie in cursor:
    print(movie)
{
    '_id': ObjectId('573a1399f29313caabceea6d'),
    'awards': {'nominations': 1, 'text': '1 nomination.', 'wins': 0},
    'cast': ['David Arquette', 'John Hawkes', 'Salma Hayek', 'Jason Wiles'],
    'countries': ['USA'],
    ...
    'writers': ['Robert Rodriguez', 'Tommy Nix'],
    'year': 1994
}
{'_id': ObjectId('573a139af29313caabcef0b6'),
    'awards': {'nominations': 14, 'text': '27 wins & 14 nominations.', 'wins': 27},
    'cast': ['Ernesto Gómez Cruz', 'María Rojo', 'Salma Hayek', 'Bruno Bichir'],
    'countries': ['Mexico'],
    ...
}

Si queremos contar los documentos que cumplen un criterio, en vez de find, usaremos count_documents pasándole el criterio de filtrado:

cantidad = movies.count_documents( { "cast": "Salma Hayek" } )

Para seleccionar los campos que queremos recuperar, necesitamos pasar un segundo parámetro con la proyección:

cursor = movies.find( { "cast": "Salma Hayek" }, { "title": 1, "cast": 1} )
for movie in cursor:
    print(movie)
{'_id': ObjectId('573a1399f29313caabceea6d'), 'cast': ['David Arquette', 'John Hawkes', 'Salma Hayek', 'Jason Wiles'], 'title': 'Roadracers'}
{'_id': ObjectId('573a139af29313caabcef0b6'), 'cast': ['Ernesto Gómez Cruz', 'María Rojo', 'Salma Hayek', 'Bruno Bichir'], 'title': 'Midaq Alley'}
...

Como ya vimos al hacer consultas en sesiones anterior, si no queremos el campo _id, tenemos que indicarlo:

cursor = movies.find( { "cast": "Salma Hayek" }, { "title": 1, "cast": 1, "_id": 0} )
for zip in cursor:
    print(zip)
{'cast': ['David Arquette', 'John Hawkes', 'Salma Hayek', 'Jason Wiles'], 'title': 'Roadracers'}
{'cast': ['Ernesto Gèmez Cruz', 'Marèa Rojo', 'Salma Hayek', 'Bruno Bichir'], 'title': 'Midaq Alley'}
{'title': 'Desperado', 'cast': ['Antonio Banderas', 'Salma Hayek', 'Joaquim de Almeida', 'Cheech Marin']}
...

BSON

Cuando necesitemos pasar los documentos BSON a JSON, utilizaremos la función dumps que transforma el documento a JSON:

cursor = movies.find( { "cast": "Salma Hayek" } )

from bson.json_util import dumps

print(dumps(cursor, indent=2))
"""
[
    {
        "_id": {
            "$oid": "573a1399f29313caabceea6d"
        },
        "plot": "Cynical look at a 50's rebellious Rocker who has to confront his future, thugs with knives, and the crooked town sheriff.",
        "genres": [
            "Action",
            "Drama"
        ],
        "runtime": 95,
        "rated": "R",
        "cast": [
            "David Arquette",
            "John Hawkes",
            "Salma Hayek",
            "Jason Wiles"
            ],
        ...
"""

Agregaciones

Para realizar consultas mediante el framework de agregación, usaremos el método aggregate, el cual recibe un array con el pipeline:

Por ejemplo, vamos a recuperar el título y el casting de las películas dirigidas por Sam Raimi:

match_stage = { "$match": { "directors": "Sam Raimi" } }
project_stage = { "$project": { "_id": 0, "title": 1, "cast": 1 } }

pipeline = [
    match_stage,
    project_stage
]

sam_raimi_aggregation = movies.aggregate(pipeline)
for movie in sam_raimi_aggregation:
    print(movie)
{'cast': ['Bruce Campbell',
        'Ellen Sandweiss',
        'Richard DeManincor',
        'Betsy Baker'],
'title': 'The Evil Dead'}
{'cast': ['Bruce Campbell',
        'Sarah Berry',
        'Dan Hicks',
        'Kassie Wesley DePaiva'],
'title': 'Evil Dead II'}
{'cast': ['Liam Neeson', 'Frances McDormand', 'Colin Friels', 'Larry Drake'],
'title': 'Darkman'}

Otro ejemplo, donde recuperamos los directores y la valoración media de sus películas, ordenadas de mejor a peor:

unwind_stage = { "$unwind": "$directors" }
group_stage = {
    "$group": {
        "_id": {
            "director": "$directors"
        },
        "average_rating": { "$avg": "$imdb.rating" }
    }
}
sort_stage = {
    "$sort": { "average_rating": -1 }
}

# Creamos un pipeline con las tres fases
pipeline = [
    unwind_stage,
    group_stage,
    sort_stage
]

director_ratings = movies.aggregate(pipeline)

for director in director_ratings:
    print(director)
{'_id': {'director': 'Sara Hirsh Bordo'}, 'average_rating': 9.4}
{'_id': {'director': 'Kevin Derek'}, 'average_rating': 9.3}
{'_id': {'director': 'Michael Benson'}, 'average_rating': 9.0}
{'_id': {'director': 'Slobodan Sijan'}, 'average_rating': 8.95}
{'_id': {'director': "Bozidar 'Bota' Nikolic"}, 'average_rating': 8.9}
...

O realizamos un join entre las películas y sus comentarios para recuperar una película por su identificador:

from bson.objectid import ObjectId

peliculaId = "573a13aef29313caabd2c349"

pipeline = [
    {
        "$match": {
            "_id": ObjectId(peliculaId)
        }
    }, {
        "$lookup": {
            "from": "comments",
            "localField": "_id",
            "foreignField": "movie_id",
            "as": 'comentarios'
        }
    }
]

peliculaConComentarios = movies.aggregate(pipeline).next()
print(peliculaConComentarios)
{'_id': ObjectId('573a13aef29313caabd2c349'),
'awards': {'nominations': 48,
            'text': 'Nominated for 1 Oscar. Another 21 wins & 48 nominations.',
            'wins': 22},
'cast': ['Christian Bale', 'Michael Caine', 'Liam Neeson', 'Katie Holmes'],
'comentarios': [{'_id': ObjectId('5a9427658b0beebeb696f672'),
                'date': datetime.datetime(1972, 3, 2, 15, 58, 41),
                'email': 'donald_sumpter@gameofthron.es',
                'movie_id': ObjectId('573a13aef29313caabd2c349'),
                'name': 'Maester Luwin',
                'text': 'Atque ...'}],
'countries': ['USA', 'UK'],
'directors': ['Christopher Nolan'],
'fullplot': 'When ...",
'genres': ['Action', 'Adventure'],
'imdb': {'id': 372784, 'rating': 8.3, 'votes': 860733},
'languages': ['English', 'Urdu', 'Mandarin'],
'lastupdated': datetime.datetime(2015, 8, 31, 0, 1, 54, 590000),
'metacritic': 70,
'num_mflix_comments': 1,
'plot': 'After training ....',
'poster': 'https://m.media-amazon.com/images/M/MV5BZmUwNGU2ZmItMmRiNC00MjhlLTg5YWUtODMyNzkxODYzMmZlXkEyXkFqcGdeQXVyNTIzOTk5ODM@._V1_SY1000_SX677_AL_.jpg',
'rated': 'PG-13',
'released': datetime.datetime(2005, 6, 15, 0, 0),
'runtime': 140,
'title': 'Batman Begins',
'type': 'movie',
'writers': ['Bob Kane (characters)',
            'David S. Goyer (story)',
            'Christopher Nolan (screenplay)',
            'David S. Goyer (screenplay)'],
'year': 2005}

Trabajando con cursores

A continuación vamos a realizar algunas operaciones sobre los cursores con PyMongo y a comparar a cómo podemos realizar la misma operación mediante el motor de agregaciones.

Limitando

Sobre el cursor podemos restringir la cantidad de resultados devueltos mediante el método .limit() equivalente a la agregación $limit:

limited_cursor = movies.find(
    { "directors": "Sam Raimi" },
    { "_id": 0, "title": 1, "cast": 1 }
).limit(2)

for movie in limited_cursor:
    print(movie)
pipeline = [
    { "$match": { "directors": "Sam Raimi" } },
    { "$project": { "_id": 0, "title": 1, "cast": 1 } },
    { "$limit": 2 }
]

limited_aggregation = movies.aggregate( pipeline )

for movie in limited_aggregation:
    print(movie)
[
    {
        "cast": [
            "Bruce Campbell",
            "Ellen Sandweiss",
            "Richard DeManincor",
            "Betsy Baker"
        ],
        "title": "The Evil Dead"
    },
    {
        "title": "Evil Dead II",
        "cast": [
            "Bruce Campbell",
            "Sarah Berry",
            "Dan Hicks",
            "Kassie Wesley DePaiva"
        ]
    }
]

Ordenando

Para ordenar usaremos el método .sort() que además de los campos de ordenación, indicaremos si el criterio será ascendente (ASCENDING) o descendente (DESCENDING), de forma similar a como lo hacemos con la operación de agregación $sort:

from pymongo import DESCENDING, ASCENDING

sorted_cursor = movies.find(
    { "directors": "Sam Raimi" },
    { "_id": 0, "year": 1, "title": 1, "cast": 1 }
).sort("year", ASCENDING)

for movie in sorted_cursor:
    print(movie)
pipeline = [
    { "$match": { "directors": "Sam Raimi" } },
    { "$project": { "_id": 0, "year": 1, "title": 1, "cast": 1 } },
    { "$sort": { "year": ASCENDING } }
]

sorted_aggregation = movies.aggregate( pipeline )

for movie in sorted_aggregation:
    print(movie)
[
{
    "cast": [
        "Bruce Campbell",
        "Ellen Sandweiss",
        "Richard DeManincor",
        "Betsy Baker"
    ],
    "title": "The Evil Dead",
    "year": 1981
},
{
    "year": 1987,
    "title": "Evil Dead II",
    "cast": [
        "Bruce Campbell",
        "Sarah Berry",
        "Dan Hicks",
        "Kassie Wesley DePaiva"
    ]
},
{
    "year": 1990,
    ...
}
]

En el caso de tener una clave compuesta de ordenación, le pasaremos como parámetro una lista de tuplas clave/criterio:

sorted_cursor = movies.find(
    { "cast": "Tom Hanks" },
    { "_id": 0, "year": 1, "title": 1, "cast": 1 }
).sort([("year", ASCENDING), ("title", ASCENDING)])

for movie in sorted_cursor:
    print(movie)
pipeline = [
    { "$match": { "cast": "Tom Hanks" } },
    { "$project": { "_id": 0, "year": 1, "title": 1, "cast": 1 } },
    { "$sort": { "year": ASCENDING, "title": ASCENDING } }
]

sorted_aggregation = movies.aggregate( pipeline )

for movie in sorted_aggregation:
    print(movie)
[
{
    "cast": [
        "Tom Hanks",
        "Daryl Hannah",
        "Eugene Levy",
        "John Candy"
    ],
    "title": "Splash",
    "year": 1984
},
{
    "cast": [
        "Tom Hanks",
        "Jackie Gleason",
        "Eva Marie Saint",
        "Hector Elizondo"
    ],
    "title": "Nothing in Common",
    "year": 1986
},
{
    "cast": [
        "Tom Hanks",
        "Elizabeth Perkins",
        "Robert Loggia",
        "John Heard"
    ],
    "title": "Big",
    "year": 1988
},
{
    "cast": [
        "Sally Field",
        "Tom Hanks",
        "John Goodman",
        "Mark Rydell"
    ],
    "title": "Punchline",
    "year": 1988
},
...
]

Saltando

Cuando paginamos los resultados, para saltar los documentos, haremos uso del método .skip(), el cual es similar a la operación $skip.

Por ejemplo, la siguiente consulta devuelve 13 documentos, de manera que al saltarnos 12, sólo nos devolverá uno:

skipped_sorted_cursor = movies.find(
    { "directors": "Sam Raimi" },
    { "_id": 0, "title": 1, "year": 1, "cast": 1 } 
).sort("year", ASCENDING).skip(12)

for movie in skipped_sorted_cursor:
    print(movie)
pipeline = [
    { "$match": { "directors": "Sam Raimi" } },
    { "$project": { "_id": 0, "year": 1, "title": 1, "cast": 1 } },
    { "$sort": { "year": ASCENDING } },
    { "$skip": 12 }
]

sorted_skipped_aggregation = movies.aggregate( pipeline )

for movie in sorted_skipped_aggregation:
    print(movie)
[
    {
        "cast": [
            "James Franco",
            "Mila Kunis",
            "Rachel Weisz",
            "Michelle Williams"
        ],
        "title": "Oz the Great and Powerful",
        "year": 2013
    }
]

CRUD

Para estas operaciones, vamos a trabajar con la colección iabd.people que utilizamos en la primera sesión de MongoDB.

Inserción

from pymongo import MongoClient

cliente = MongoClient("mongodb+srv://iabd:iabdiabd@cluster0.4hm7u8y.mongodb.net/?retryWrites=true&w=majority", server_api=ServerApi('1'))

bd = cliente.iabd
people = bd.people

yo = { "nombre": "Aitor Medrano", "edad": 45, "profesion": "Profesor" }
resultado = people.insert_one(yo)

print(resultado.acknowledged)   # True
print(resultado.inserted_id)    # 6366b5c85afaf1b75dc90a20

Tras realizar una operación de inserción con insert_one, obtenemos un objeto InsertOneResult del cual cabe destacar dos campos:

  • acknowledged: un campo booleano que nos indica si la operación ha sido exitosa, o False cuando indicamos un WriteConcern(w=0), es decir, es escritura fire-and-forget.
  • inserted_id: valor del ObjectId.

Si queremos insertar más de una documento a la vez, usaremos insert_many, el cual devuelve un objeto InsertManyResult.

Join

Ya hemos estudiado que al insertar un documento que está relacionado con otro, necesitamos que los campos contengan el mismo valor (normalmente, el ObjectId)

Por ejemplo, si queremos añadir un comentario a una película, hemos de unir el identificador de la película en cada comentario haciendo uso de objeto ObjectId:

from bson.objectid import ObjectId

comments = db.comments
comment_doc = {"name": usuario.nombre, "email": usuario.email,
                "movie_id": ObjectId(movie_id), "text": comentario, "date": fecha}
comments.insert_one(comment_doc))

Gestión de errores

A la hora de insertar un documento se puede dar el caso de lanzar un DuplicateKeyError, ya sea por el atributo identificador o por un índice de tipo único. En dicho caso, podemos capturar la excepción y ajustar el comportamiento o informar al usuario, dependiendo de la gravedad del error.

Por ejemplo, si insertamos una persona con un _id que ya existe:

doc = {
    "_id": 1,
    "nombre": "Aitor Medrano"
}

try:
    result = people.insert_one(doc)
    print(res.inserted_id)
except errors.DuplicateKeyError:
    usuario_id = doc["_id"]
    print(f"El usuario #{usuario_id} ya existe en el sistema.")

Borrado

El borrado de documento es similar a la creación, pudiendo utilizar delete_one para borrar el primer documento encontrado, o delete_many para borrar todos los documentos encontrados:

resultado = people.delete_many({ "edad" : 45})
print(resultado.deleted_count)

En ambas operaciones, obtenemos un objeto DeleteResult del cual destacamos la propiedad deleted_count para averiguar cuantos documentos han sido eliminados.

Borrado de colecciones

Para vaciar una colección, podemos borrar todos los documentos:

people.delete_many({})

Aunque es mejor borrar la colección entera:

people.drop()

Modificación

De forma similar a como lo hemos realizado mediante mongosh, para modificar documentos emplearemos los métodos update_one y update_many, los cuales devuelve un objeto UpdateResult:

resultado = people.update_one({"nombre":"Aitor Medrano"}, {"$set":{"nombre":"Marina Medrano", "salario": 123456}})
print(resultado.matched_count)  # (1)!
print(resultado.modified_count) # (2)!
  1. matched_count nos devuelve la cantidad de documentos encontrados
  2. modified_count nos devuelve la cantidad de documentos modificados

Tal como vimos, en las operaciones update, le podemos pasar un tercer parámetro (opcional) con upsert=False (valor por defecto) op True, cuando queramos que se inserte un nuevo documento si no encuentra ninguno. En este caso, modified_count y matched_count serán 0.

Otro argumento opcional que conviene conocer es bypass_document_validation=False (valor por defecto), el cual, si lo ponemos a True ignorará las validaciones (si hay alguna) para los documentos que modifiquemos.

Operador

Recuerda utilizar un operador de modificación en el segundo parámetro. Si no, el documento del segundo parámetro sustituirá por completo al documento encontrado.

Operaciones consistentes

Preferencias de lectura

Para realizar lecturas consistentes podemos configurar la preferencias de lectura, tal como vimos en la sesión sobre replicación.

Para ello, PyMongo ofrece un conjunto de clases para indicar las preferencias de lectura, las cuales podemos configurar a nivel de cliente:

Niveles de lectura

Los diferentes niveles vienen definidos en el objeto ReadPreference el cual ofrece los valores estudiados en la sesión de replicación: PRIMARY, PRIMARY_PREFERRED, SECONDARY, SECONDARY_PREFERRED y NEAREST.

client = MongoClient(
    'localhost:27017',
    replicaSet='iabd',
    readPreference='secondaryPreferred')

Si en vez de indicar la preferencia a nivel de cliente, lo queremos realizar a nivel de base de datos o de colección, hemos de emplear los métodos get_database y get_collection pasándole un segundo parámetro con la opción deseada:

from pymongo import ReadPreference

print(client.read_preference)   # SecondaryPreferred(tag_sets=None)

db = client.get_database('iabd', read_preference=ReadPreference.SECONDARY)
print(db.read_preference)       # Secondary(tag_sets=None)

coll = db.get_collection('people', read_preference=ReadPreference.PRIMARY)
print(coll.read_preference)     # Primary()

Finalmente, si en cualquier momento queremos cambiar las preferencias a nivel de colección, podemos emplear el método with_options:

coll2 = coll.with_options(read_preference=ReadPreference.NEAREST)
print(coll.read_preference)     # Primary()
print(coll.read_preference)     # Nearest(tag_sets=None)

Escrituras consistentes

Tal como vimos en la sesión de replicación, para indicar la consistencia en la escritura, haremos uso de la propiedad write_concern. Para ello, de la misma forma que hemos indicado la preferencia de lectura, hacemos uso del método with_options:

coll.with_options(write_concern=WriteConcern(w="majority")).insert_one({
    "nombre": nombre,
    "email": email,
    "password": hashed_pw
})

Si tenemos una aplicación crítica, no nos podemos permitir perder los datos. Por ello, necesitamos que las escritura se propaguen a la mayoría de los nodos con la opción w: majority para asegurarnos que las escrituras se propagan a la mayoría de nodos del conjunto de nodos.

Si sucede un problema en los nodos secundarios, puede ser que el primario no reciba los ACK necesarios. Si llegan más escrituras que lecturas, puede llegar el momento en que se produzca un atasco.

Para evitar esto, para cada escritura que realicemos con la mayoría de los nodos, siempre hay que indicar un timeout. La longitud del timeout vendrá determinada por la red y el hardware que dispongamos, pero siempre hemos de indicarlo.

coll.with_options(write_concern=WriteConcern(w="majority", wtimeout="2500").insert_one({
    "nombre": nombre,
    "email": email,
    "password": hashed_pw
})

Bulk writes

En ocasiones necesitamos ejecutar una bateria de operaciones a granel (bulk) las cuales se ejecutan como un proceso batch. Para ello se emplea el método bulk_write:

result = db.test.bulk_write(array_operaciones)

Transacciones

Ya hemos comentado en numerosas ocasiones toda operación en un único documento es atómica, de ahí embeber documentos y arrays para modelar las relaciones de datos en un sólo documento cubre la mayoría de los casos de usos transaccionales.

Cuando necesitamos atomicidad de lecturas y escrituras entre varios documentos (en una o más colecciones), MongoDB soporta transacciones multidocumento. Mediante las transacciones distribuidas, las transacciones puede operar entre varias operaciones, colecciones, bases de datos, documentos y particiones (shards).

Tipos de API

MongoDB ofrece dos API para utilizar transacciones. La primera, conocida como Core API, disponible desde la versión 4.0 de MongoDB, tiene una sintaxis similar a las bases de datos relacionales (por ejemplo, con operaciones como start_transaction y commit_transaction), y la segunda se conoce como Callback API, desde la versión 4.2, el cual es el enfoque actualmente recomendado.

El Core API no ofrece lógica de reintentos para la mayoría de errores y necesita que el desarrollador programe la lógica de las operaciones, la función transaccional que realiza commit y la lógica de errores y reintentos necesaria.

En cambio, el Callback API ofrece una única función que engloba un alto grado de funcionalidad comparadas con el Core API, incluyendo el inicio de una transacción asociada a una sesión lógica, ejecutar la función definida como callback y realizando el commit de la transacción (o abortándola en caso de error). Esta función también incluye la lógica de reintentos para manejar los errores al hacer commit.

En ambas APIs, el desarrollador se responsabiliza de iniciar la sesión lógica que se utilizará en la transacción. Ambas APIs requieren que las operaciones transaccionales se asocien a ésta sesión lógica (pasándola como parámetro a cada operación). Cada sesión lógica en MongoDB registra el tiempo y la secuencia de las operaciones en el contexto completo del despliegue de MongoDB.

Hola Mundo Callback API

Vamos a simular que tenemos una aplicación de gestión de un almacén, en la cual tenemos que realizar las siguientes operaciones dentro de una transacción:

pedidos.insert_one({"producto": "ps5", "cantidad": 100}, session=miSesion)
inventario.update_one({"producto": "ps5", "cantidad": {"$gte": 100}},
                      {"$inc": {"cantidad": -100}}, session=miSesion)

Veamos mediante un ejemplo el flujo del código transaccional haciendo uso del Callback API.

almacenTransaccional.py
from pymongo import MongoClient, ReadPreference
from pymongo.write_concern import WriteConcern
from pymongo.read_concern import ReadConcern

uriString = 'mongodb+srv://iabd:iabdiabd@cluster0.4hm7u8y.mongodb.net/?retryWrites=true&w=majority'
cliente = MongoClient(uriString)
miWCMajority = WriteConcern('majority', wtimeout=1000)

# Paso 0: Creamos dos colecciones e insertamos un documento en cada una
bd = client.get_database( "iabd", write_concern=miWCMajority)
bd.pedidos.insert_one({"producto": "ps5", "cantidad":0})
bd.inventario.insert_one({"producto": "ps5", "cantidad": 1000})

# Paso 1: Definir el callback que indica la secuencia de las operaciones a realizar dentro de la transacción
def callback(miSesion):
    pedidos = miSesion.cliente.iabd.pedidos
    inventario = miSesion.cliente.iabd.inventario

    # Importante: Debemos pasarle la sesión a cada operación
    pedidos.insert_one({"producto": "ps5", "cantidad": 100}, session=miSesion)
    inventario.update_one({"producto": "ps5", "cantidad": {"$gte": 100}},
                          {"$inc": {"cantidad": -100}}, session=miSesion)

# Paso 2: Iniciar una sesión de cliente.
with cliente.start_session() as session:
    # Paso 3: Empleamos with_transaction para iniciar la transacción, ejecutar el callback y realizar el commit (o abortar en caso de error).
    session.with_transaction(
        callback,
        read_concern=ReadConcern("local"),
        write_concern=miWCMajority,
        read_preference=ReadPreference.PRIMARY,
    )

PyMODM

PyMODM es una librería ODM que abstrae el acceso a MongoDB mediante un mapeo objeto-documento. De forma similar a los frameworks ORM en los entornos relacionales, como son Hibernate (Java), Doctrine/Eloquent (PHP) o el ORM de Django, abstrae y simplifica el acceso a la base de datos.

Un ejemplo de definición de un objeto y su persistencia sería:

class User(MongoModel):
    # Utiliza el 'email'como campo '_id'
    email = fields.EmailField(primary_key=True)
    nombre = fields.CharField()
    apellido = fields.CharField()

usuario = User('a.medrano@edu.gva.es', 'Aitor', 'Medrano').save()

Hemos decidido no profundizar en su conocimiento ya que el equipo de MongoDB ha pausado su mantenimiento desde hace ya dos años.

Caso de uso

PIA Login

En cursos anteriores hicimos un caso de uso donde definiamos la autenticación para el proyecto Lara. Este curso, dicho caso de uso forma parte de la sesión de trabajo del proyecto que realizaremos en una sesión aparte.

Vamos a crear una pequeña aplicación web que permita mostrar datos obtenidos desde MongoDB. Para ello, vamos a utilizar Flask como framework ligero escrito en Python.

El primer paso será, tras crear el entorno virtual, instalar las librerías de PyMongo y Flask:

pip install pymongo flask

Tras ello, crearemos nuestro primer archivo app.py, el cual colocaremos en la raíz del proyecto. Si nos fijamos, tras importar los archivos necesarios, definimos una ruta y anotamos el método que se va a ejecutar:

app.py
from flask import Flask

app = Flask(__name__)

@app.route("/")
def hola():
    return "<p>¡Hola IABD!</p>"

Para lanzar la aplicación web, en el terminal ejecutaremos:

flask run --debug

Y si entramos a http://127.0.0.1:5000 veremos el resultado de su ejecución.

MVC

Antes de integrar PyMongo con Flask, vamos a ver como podemos cargar una vista y de esta, cederle el control a Flask.

Primero, vamos a crear una página de inicio donde invoquemos a una ruta de Flask, y esta configura una variable y devuelve el dato a una nueva vista. Pero vamos paso a paso.

Primero crearemos nuestra página de inicio similar a la siguiente:

index.html
<h1>Hola IABD</h1>

<ul>
    <li><a href="/hola">Saludar</a></li>
    <li><a href="/adios">Despedir</a></li>
</ul>

Esta página la vamos a colocar dentro de una carpeta templates, de manera que nuestra estructura de proyecto será:

├───app.py
├───templates
    └───index.html

Y modificar nuestro archivo de aplicación para configurar la ruta para la raíz del proyecto, y la respuesta a cada uno de los enlaces:

app.py
from flask import Flask
from flask import render_template

app = Flask(__name__)

@app.route("/")
def index():
    return render_template("index.html")

@app.route("/hola")
def hola():
    return "<p>¡Hola IABD desde Flask!</p>"

@app.route("/adios")
def adios():
    return "<p>¡Adios IABD desde Flask!</p>"

En el método index() estamos haciendo una redirección a la plantilla index.html que habíamos definido previamente mediante el método render_template. Además, hemos creado dos nuevas rutas para cada uno de los enlaces.

A continuación, vamos a cambiar la salida de los métodos hola() y adios(), para que redirijan la petición a una nueva plantilla, que será la encargada de mostrar el mensaje. El código de la plantilla destino, la cual se basa en Jinja, será similar a:

resultado.html
<h1>Resultado</h1>

Nuestra aplicación con Flask nos dice:

<h2>{{ mensaje }}</h2>

Si nos fijamos en la línea 5, mediante {{ y }} le indica que muestre el valor de una variable que recibirá que se llamará mensaje.

Tras esto, modificamos de nuevo el enrutador, de manera que ahora cada método redirecciona a la vista resultado.html, adjuntado una frase como variable en la respuesta (la cual hemos llamado mensaje para que cuadre con la plantilla):

app.py
from flask import Flask
from flask import render_template

app = Flask(__name__)

@app.route("/")
def index():
    return render_template("index.html")

@app.route("/hola")
def hola():
    frase = "¡Hola IABD desde Flask!"
    return render_template("resultado.html", mensaje=frase)

@app.route("/adios")
def adios():
    frase = "¡Adios IABD desde Flask!"
    return render_template("resultado.html", mensaje=frase)

Flask + PyMongo

Todo el acceso a los datos los vamos a encarpsular en el archivo db.py, de manera que definimos un objeto global con los datos de la conexión:

db.py
from pymongo import MongoClient
from flask import g

def get_db():

    db = getattr(g, "_database", None)

    MONGODB_URI = "mongodb://xxx:yyyy@1.2.3.4/"
    MONGODB_DB_NAME = "prelara"

    if db is None:
        db = g._database = MongoClient(
            MONGODB_URI,
            maxPoolSize=50,
            timeoutMS=2500
       )[MONGODB_DB_NAME]

    return db

La función get_db utiliza el objeto g, el cual en Flask, es un objeto especial que es único para cada petición. Se utiliza para almacenar datos que serán accesibles desde múltiples funciones durante el request. Así pues, almacenamos la conexión, mejor dicho, el pool de conexiones a MongoDB en vez de crear un nuevo pool cada vez que queramos obtener acceso a la base de datos.

A continuación, en app.py añadimos una nueva ruta donde hacemos nuestra consulta con PyMongo:

import db
from pymongo import ASCENDING

@app.route("/listar")
def listado():
    midb = db.get_db()
    coleccion = midb.usuarios
    usuarios = list(coleccion.find({}).sort("nombre", ASCENDING))

    return render_template("listado.html", lista=usuarios)

Y finalmente, creamos una nueva plantilla para mostrar los datos:

listado.html
<h1>Resultado</h1>

Listado de usuarios:

<ul>
{% for usuario in lista %}
    <li>
        Nombre: {{ usuario["nombre"] }} // Sexo {{ usuario.sexo }}
    </li>
{% endfor %}
</ul>

Y sólo nos queda editar la plantilla de inicio para poner un enlace que nos permite acceder al listado:

index.html
<h1>Hola IABD</h1>

<ul>
    <li><a href="/hola">Saludar</a></li>
    <li><a href="/adios">Despedir</a></li>
    <li><a href="/listar">Listas usuarios</a></li>
</ul>

En resumen, la estructura del proyecto con todos los archivos y plantillas será similar a:

├───app.py
├───db.py
├───templates
    └───index.html
    └───listado.html
    └───resultado.html

Referencias

Actividades

  1. (RASBD.3 / CESBD.3d / 2p) Haciendo uso de PyMongo, y utilizando la base de datos compartida en Aules se pide:

    1. Recupera la cantidad de audios que hay grabados de cada categoría (tag).
    2. Recupera la cantidad de audios que hay grabados de cada categoría (tag) para un usuario en concreto.
    3. Muestra los usuarios técnicos (_id, mail y nombre) que no tienen ningún cliente asignado.

      Para ello, crea una consulta que recupere todos los usuarios técnicos (por ejemplo, sólo su email) y aparte, crea otra consulta que recupere los técnicos que aparecen como parent en los clientes.

      A continuación, mediante Python y una resta de conjuntos, puedes obtener el resultado:

      list( set(todosTecnicos) - set(tecnicosConClientes) )
      
  2. (RASBD.3 / CESBD.3d / 2p) Mediante una aplicación hecha en Flask, se pide:

    1. Crea una página que tras seleccionar un cliente, muestre quién es su técnico actual y permita modificarlo por otro.
    2. Crea una página que permita modificar la contraseña de un usuario. Tras importar from werkzeug.security import generate_password_hash, debes utilizar generate_password_hash(contrasenya, method='sha256') para encriptar la contraseña antes de introducirla en MongoDB.
    3. Aprovechando el listado de la actividad 1.c, para cada técnico sin clientes, define un enlace que permita eliminarlo.