Stream of changes

What if…

Have you ever wandered how it would be like if you could replay every event in the Universe history since the Big Bang ? Subscribe to Universe’s newsletter? Well guess what? If our world is build on top of Azure Cosmos DB and exposes MongoDB Change Stream endpoint, then it is possible!

You may ask yourself, well what will I be able to do with such newsletter? The answer is simple, but powerful – you could write your custom application that reacts to the events with any action you want. For example- in the event of a new planet being formed, you wish to automatically send there space probe.

You might want to look into the past and investigate everything that happened since t=0 (Big Bang). To archive every event related to planet Earth- you could simple filter all newsletters with ‘planet=’Earth’ filter and save them.

Back to reality

Today we can achieve the same functionality using Azure Cosmos DB as our Universe and Change Feed as event newsletter.

Azure Cosmos DB is Microsoft’s distributed document database service. It’s architecture allows us to use different data models based on the same backend. We decided to use MongoDB wire protocol, which wraps around underlying Cosmos database. We are using libraries compatible with MongoDB to interact with CosmosDB.

Our example application is written in Python. We use Pymongo library to interact with MongoDB. Example of “Hello World” document :

import pymongo

url = "mongodb://cosmos-account:PASSWORD@cosmos-account.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&maxIdleTimeMS=120000"
client = pymongo.MongoClient(url)

db = client['FEXCODB']
collection = db['FEXCOCOLLECTION']

document = { "hello": "World" }

result = collection.insert_one(document)

Changes in Cosmos and Mongo

Change Feed is feature in Azure Cosmos DB which allows our applications to listen to any changes happening in Collection of documents. Whenever new document is added or updated, our listeners are notified and can react to the event in a suitable manner. Great thing about Change Feed is that events are ordered and persistent. Each change stream starts from event #0, so it’s easy to start processing the stream from the beginning.

Change Stream is a similar feature in MongoDB and since we decided to use Mongo as our database, we were very excited to hear that Change Stream became available in Azure CosmosDB databases. From architectural point of view it’s build on top of Change Feed, so essentially it’s API wrapped around CosmosDB feature.

Change Stream in CosmosDB is only available for MongoDB version 3.6

What are change events ?

Events in a stream are essentially a simple list of documents, sorted in the order in which they were modified. Each change to a document appears in the change stream exactly once and has assigned checkpoint parameter called resume token. Change stream consumer can reply old events by passing appropriate resume token associated with old event.

Below picture shows example stream of events. Everything starts with collection creation. Next new documents are created and modified.

The way to connect to such stream and print each event in python :

db = client['FEXCODB']

pipeline = [{'$match': {'operationType': {'$in': ["insert", "update", "replace"]}}},
            {"$project": {"fullDocument": "$fullDocument"}}]

with db['FEXCOCOLLECTION'].watch(pipeline=pipeline, full_document="updateLookup") as stream:
    for insert_change in stream:
        print(insert_change)

Putting it all together

Of course instead of simply printing the event we could actually add some business logic there. For example call external API if the event meets our condition. Example flow of how such solution might look like:

If our goal is to start consuming events from some point in time, we have to pass resume token to watch method. If we wish to go back in time, till the beginning of collection, the “big bang” token should look like that:

{‘_data’: b'[{“token”:”\\”0\\””,”range”:{“min”:””,”max”:”FF”}}]’}

Finally we could put all the pieces together and write code which starts to watch collection and store events in Azure Blob container.
Thanks to “retry” decorator if connection with MongoDB is interrupted and resumed, watcher will retry stream consumption from the moment it stopped – no event will be lost.
If collection does not yet exist, resume token is set to Big Bang moment :

import pymongo
import logging

from azure.storage.blob import BlockBlobService
from bson.json_util import dumps
from pymongo.errors import PyMongoError
from retry import retry

COLLECTION_NAME = "my-collection"
DATABASE_NAME = "my-database"
CONTAINER = COLLECTION_NAME.lower()
BIG_BANG ={'_data': b'[{"token":"\\"0\\"","range":{"min":"","max":"FF"}}]'}
TOKEN = None

url = "mongodb://cosmos-account:PASSWORD@cosmos-account.mongo.cosmos.azure.com:10255/?ssl=true&replicaSet=globaldb&maxIdleTimeMS=120000"
client = pymongo.MongoClient(url)

pipeline = [{'$match': {'operationType': {'$in': ["insert", "update", "replace"]}}},
            {"$project": {"fullDocument": "$fullDocument"}}]

logger = logging.getLogger(__name__)

block_blob_service = BlockBlobService(
    account_name='azure-account',
    account_key='secretkey/very-secret-indeed')

@retry(PyMongoError, tries=30, delay=30, logger=logger)
def watcher():
    global TOKEN 
    try:
        logging.info("Trying to connect collection %s", COLLECTION_NAME)
        with db[COLLECTION_NAME].watch(pipeline=pipeline, full_document="updateLookup", resume_after=TOKEN ) as stream:
            for insert_change in stream:
                logger.info("Connected to connect collection %s", COLLECTION_NAME)
                content = insert_change['fullDocument']
                id = str(insert_change['fullDocument']['_id'])
                block_blob_service.create_container(CONTAINER)
                block_blob_service.create_blob_from_text(CONTAINER, id, dumps(content))
                TOKEN = stream.resume_token
    except PyMongoError as e:
        if "Collection doesn't exist" in str(e):
            logger.error("Collection %s does not exist", COLLECTION_NAME)
            TOKEN = BIG_BANG
        raise e

watcher()


Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.