Memcached Lockless Queue

I'm working on a project that needs a simple server side lightweight, scalable message queue. I'm interested in Erlang and Yaws, but the learning curve seems to me incredibly high -- not only on the code implementation front, but also the tool and management front. Moreover I have clients -- in the people sense of the word -- who may not be particularly interested in learning a new language, let alone a functional language to boot.

I know there are a huge number of Java based queuing technologies out there but currently Java isn't running on the box, so there's some "management" overhead there. Enter memcached. I'm familar with it from some work I've done with google app engine, and it's already running on the server in question.

Memcached is basically just a big dictionary that can, if you want it to, span multiple machines. New entries in the dictionary can be marked to expire after a given amount of time.

There are some aspects to memcached that I find nice. First, it's dead simple to setup, configure, and manage. Second, it has clients in a large number of languages. Third, if i ever do want to play with erlang -- there's a *server* implementation called cacherl that -- in theory -- works with all existing clients. Finally, on memcached's offical FAQ there's a link to an article about how to turn that dictionary into a queue. :)

It took me a while to understand exactly how what they describe works. It's important to understand that while memcached has atomic inc/dec/set it has no atomic exchange primitive -- that makes it difficult to implement all sorts of data structures, but at any rate here's my pseudo code based on the author's description.

addmsg(newmsg):
# incr returns the new value of the counter;
# therefore the first thread that gets here sets "writerlock" to 1
while ( incr( "writerlock" ) != 1):
# if not one, then we weren't the first thread
# keep looping till the first thread finishes (below)
pass

msgtowrite= incr( "queuehead" )

# "add" stores the value "newmsg" at the key "msgtowrite"
add( msgtowrite, newmsg )

# let some other writer thread go
set( "writerlock", 0 )

getmsg():
# same lock concept as the writer side
while ( incr( "readerlock" ) != 1):
pass

msgtoread= incr("queuetail")

# optional: use "gets" to get more than one value.
# get will fail if it didnt exist
newmsg= get( msgtoread )

# optional: delete, or just let the msg expire someday.
# let some other reader thread go
set( "readerlock", 0 )

This makes total sense, and yet -- especially after having worked on two multithreaded games this year -- i feel like there must be a lockless way. In fact, I spent a large part of yesterday trying to tell myself -- it just doesn't matter -- use the locking queue, and another part telling myself -- but if lightweight is important than locking is bad. I did *try* to do other things, but really I just kept coming back to this issue again and again.

Finally, this morning the lessons of what I read elsewhere really finally sank in, particularly: Nginx and Memcached, a 400% boost! and Using Nginx, SSI and Memcache ( both highly recommended reads ).

It basically boils down to, let memcache do what memcache does best -- read and write values.

Using the same "message key equals message number" idea from the queue above, what if, on the writer side, we just "incr" the message number and wrote to that number slot. "incr" is atomic so no two messages can conflict.

On the reader side, what if we got the client application invested in the process. Make the reader start requesting at message 0, and have the reader -- not the server -- increment the "to-read" message number. If the message hasn't been written you wont find it -- but, as long as the writer doesn't fail -- you will get the message someday.

addmsg( newmsg ):
msgtowrite= incr("queuehead");
add( msgtowrite, newmsg );

readmsg( nextmsgid ):
return get( nextmsgid );

This nice and simple and has all sorts of cool properties that I was hoping to find:

  • First: it's non blocking; non-locking.
  • Second: you get a clear intention of the desire to write a message before you actually have to generate and write the message.
  • Third: a writer can reserve a sequence of messages by incrementing by the number of msgs it needs.
  • Lastly, although i didn't mention it above, the locking queue needs overflow detection for the lock counters -- since there are no locks, there's no need for overflow detection.

You can even, at the expense of increasing the client's complexity -- have the reader use 'gets' to read several messages at once and the client can track which succeeded and ask only for those messages that it missed. ( Feels like a re-invention of TCP SYNs and ACKs all over again I know. )

6 comments:

ionous said...

cleaned up some spelling errors this morning; also noticed ollie points out in response to a lockless comment, that -- as with everything -- mileage may vary based on your specific needs.

Mike Perham said...

Check out Starling - lightweight, ruby-based, transactional, and uses the memcached API to get/put messages.

ionous said...

will do; thanks for the tip!

Leen Toelen said...

Twitter recently ported Starling to java/scala, this project is called kestrel and they have it in production already.

http://robey.lag.net/2008/11/27/scarling-to-kestrel.html

http://github.com/robey/kestrel/tree/master

Regards,
Leen Toelen

l0t3k said...

wow. read my mind. i'm building a scalable chat server exactly on this technique.
to control memory consumption, i bunch related messages (e.g. a login event can generate user-entered, user-details, and user-status messages) and compress using compact binary compression. There's a sweeper process that periodically does a db write-behind for chatrooms that need journals, but this is rare..

l0t3k said...

i should say that instead of a straight queue, we used a circular buffer to ensure things were kept in check. IOW, we use

addmsg( queueName, newmsg ):
msgtowrite= incr("queuehead" + queueName ) % BUFFER_SLOTS;
add( msgtowrite, newmsg );