Writing a queue system in Redis
Our current project requires a task queue to handle data processing without overloading the server with too many processes at once (or losing data, or a million other reasons).
Recently, however, we've switched how we queue up processes to one where, for entirely good reasons, it's possible that the system will try and queue up several hundred duplicate tasks in a short space of time (and only one of these will actually do something useful).
Up until now we've been using Beanstalkd via the fivebeans node module to handle, but as a pure FIFO queue (albeit with peek-support), we can't easily remove duplicates or otherwise disallow them from being created.
An initial solution idea was to use Redis to mark things as being processed, or otherwise keep track of what from the queue should actually be executed. But of course, why bother making a hybrid when you can just drop a technical dependency entirely!
You may remember from a previous post (http://coderwall.com/p/8ozzgq) that we are already using a combination of Beanstalk for the tasking and Redis for holding the task payload to get around the 64kb message limit on Beanstalk. We can also get rid of that by using Redis as the task queue!
The task queue needs to provide the following:
- Support multiple queues of tasks
- FIFO execution, to ensure correct order-of-operations
- Priority queuing
- A guarantee that processes are only executed once.
- Refusal to add duplicate tasks
- Speed
- Allow tasks to be delayed and have an expiry
To achieve this, we now do the following when adding a task:
put: (data, queue, priority, delay=0, expire=-1) ->
data = JSON.stringify data
hash = sha512 data
start = date + delay
end = (expire < 0 and expire) or start + expire
task =
hash: hash
start: start
end: end
method = (priority and 'lpush') or 'rpush'
key = 'queue:#{queue}:'
redis.setnx key+'#{hash}', data, (err, done) =>
if done
redis[method] key+'tasks', task
The most important part is the "setnx" command. This only writes to the key if it doesn't already exist, and because they key we are using contains a hash of the payload, we have de-duplication!
The callback to setnx recieves a boolean indicating wether it wrote to the key, and if it did then we put the task-pointer onto the queue. The [method] part is just a switch between putting it on either the start or the end of the queue, to allow high-priority tasks to execute straight away.
The "get task" method is nothing special - just some stuff to handle the start/end time and resolving the reference hash to the actual payload. Super simple stuff.
At this stage we have a working solution for our needs, but if you know of a MQ system that provides de-dup, wether it's in Redis or not, send me a tweet!
And if you are thinking "why not resque?" then it's fairly simple - Resque is a much heavier solution to this problem than we needed, as all the tasking, forking, etc. stuff was already implemented for Beanstalk.
It sure does feel nice removing a line from package.json...
Written by Richard Lyon
Related protips
8 Responses
Very cool, didn't know about setnx. So were you able to completely move off Beanstalkd?
@mdeiters yep, Beanstalkd is now completely out of our dependencies list!
I wrote a lightweight event scheduling library, and the events/payload added had to be unique and run on a scheduled time. I've managed to avoid the use of setnx by using a zset queue; the zset handles both scenarios nicely by it being a set and having scores.
@erol what is the benefit of avoiding setnx? I assume it does a key lookup before write which is two ops instead of one.
the zset queue seems to be a more natural solution, perhaps you could do a writeup?
Looks like I worded that poorly. Since the queue is built on top of a zset, calling zadd will always result in a unique set and a setnx call is no longer necessary. Zadd also has a behavior which I found useful: calling zadd with an existing member updates the member's score and reorders the zset, allowing an event to be "rescheduled" to a different time.
I'll post a writeup later. You can also browse the code here: https://github.com/Erol/later.
ah, that does sound quite useful; we're working in Node.js for ours so directly using later isn't possible, but it can definitely form the basis for the next rewrite of the queue system!
Well the solution is very nice, however I'm not sure if you heard about RabbitMQ. It's worth trying as it works for as like charm.
I began to be interested in programming while still in school, and I searched for information on the Internet and tried to master a completely new direction on my own. Then I decided to go to university for more extensive study and practice of the knowledge gained in practice. But despite this, I was still looking for more information, because I really like it. Also there on Paperial I recently found information about "setnx", everything is very clear and well described. But I was always very surprised by the fact that in any university, in addition to the specialized subjects necessary for your professionalism, you need to study general education subjects. And I never understood why this is necessary, especially why we, programmers, need to write a lot of scientific articles, coursework and the like. I basically did not spend my time on it and always used the resources that provide such services as writing paper online, and with a calm soul, I was engaged in programming. I was very lucky, you can say that I got to the right place at the right time and met a fairly well-known businessperson, and from the moment of our meeting I started working in his company. So at the time of graduation, I had a job in my specialty and of all my classmates and friends I had the most work experience.