Using the Mongo Wire Protocol for API services

So the startup I work for uses Mongo. I have my own opinions of Mongo, some good, some bad. I have a decent amount of experience with the wire protocol. I've written an asynchronous C driver[1] for GObject, rewritten a python-twisted driver[2], and my own BSON implementation. We wrote ourselves a "schema" verifier for Mongo that works offline against the Mongo disk format using my C driver. Needless to say, we have a lot of investment in Mongo and therefore this impacts my design decisions.

One thing that I do like about Mongo is the simplicity in the scaling design. The implementation is somewhat annoying (all the different types of daemons and configurations), but the design itself seems "good enough". You communicate to each Mongo instance as if it were a real data store, and it handles communicating with various shards or physical data access. It is the "turtles all the way down" approach. Lovely. When a system fails, you have information about the replica-set and can connect to another node to continue operation.

This simplicity in this design means slightly more complexity in the client libraries, at the benefit of simplifying the server implementation. Debugging this process can also be much simpler because you can reuse your debugging tools at each link in the system.

So, in short, I like this design and would like to use it for internal services used to build our platform. Wouldn't it be nice if our services were exported in the same fashion? It would allow for a more seamless upgrade of internal services with little downtime. It would also allow for code to be separated more readily and tested individually. When you have issues with memory fragmentation or leaking in python it is contained to the particular service and much easier to debug (as compared to a monolithic app server). This has nothing to do with the choice of exporting the command via Mongo, but just a side-effect of moving to this design.

So, in my fork of txmongo (a Mongo driver for twisted), I have implemented a way to implement both clients as well as Mongo servers. You can export RPCs as Mongo commands. This means you can use the Mongo command line client to test things. You can implement how you want failover to work yourself. You can use official Mongo drivers to write your RPC clients. Getting to build on their work for handling failover, reconnection, and what not. Standing on the shoulders of giants.

So lets take a look at how this might look in Twisted. We will start by writing a service that we will eventually export as a Mongo server.

from twisted.application import service
class EchoService(service.Service):
    def do_ping(self, **kwargs):
        return {'pong': kwargs}

Now lets create a TCP listener that will listen and proxy commands to the above service.

import bson
from twisted.internet import defer
from twisted.python import failure
from txmongo.protocol import MongoProtocol, Reply, REPLY_QUERY_FAILURE
from pymongo.errors import OperationFailure
import types

class ServiceProtocol(MongoProtocol):
def __init__(self, service):
    self.service = service

def handle_QUERY(self, request):
    if request.collection.endswith('.$cmd'):
    query = request.query.decode(as_class=dict)
    if '$query' in query:
        query = query['$query']
    # first key is the name of the command
    name = 'do_' + query.iterkeys().next().lower()
    func = getattr(self.service, name, None)
    if not func:
        # needed for pymongo to connect
        if name == 'do_ismaster':
        func = lambda *a, **kw: {'ismaster': True, 'ok': 1}
    if callable(func):
        df = defer.maybeDeferred(func, **query)
        df.addCallback(self._queryCb, request.request_id)
        df.addErrback(self._queryEb, request.request_id)

    ex = OperationFailure('No such command: ' + name)
    self._queryEb(failure.Failure(ex), request.request_id)

def _queryCb(self, result, request_id):
    if not isinstance(result, types.ListType):
    result = [result]
    result = [b if isinstance(b, bson.BSON) else bson.BSON.encode(b)
          for b in result]
    reply = Reply(response_to=request_id, documents=result)
    return self.send(reply)

def _queryEb(self, reason, request_id):
    doc = {'$err': str(reason.value), 'ok': 0}
    if hasattr(reason.value, 'code'):
    doc['code'] = reason.value.code
    ret = [bson.BSON.encode(doc)]
    reply = Reply(response_to=request_id, documents=ret,
    return self.send(reply)

Now the simple part, lets wire up a TCP listener using twisted to accept Mongo connections and bridge the service.

from twisted.internet import protocol, reactor

service = EchoService()
factory = protocol.Factory()
factory.protocol = lambda: ServiceProtocol(service)
reactor.listenTCP(27017, factory)

So now we can go ahead and use the Mongo command line interface to communicate with this service. Lets do that and execute our new ping command.

$ mongo
> db.runCommand({ping: 1, hello: 'world'})
{ "pong" : { "ping" : 1, "hello" : "world" } }

We can do the same thing using the pymongo driver from Python.

>>> import pymongo
>>> pymongo.Connection().test['$cmd'].find_one({"ping": 1, "hello": "world"})
{u'pong': {u'ping': 1, u'hello': u'world'}}

I haven't decided whether or not I want to move forward with designing services in this manner, but I do think it is interesting and warrants more exploration and experimentation. I really like the ability to compartmentalize my various internal services and the power that gives me in testing, monitoring, and application lifecycle.

I haven't made this easy to do with my C driver yet, but I do plan on adding the functionality soon.

As always, patches welcomed!

[1] [2]

-- Christian Hergert 2012-08-06

Back to Index