k6fv2g
Last Updated: May 30, 2016
·
9.367K
· richthegeek
13027845faeb65d26acd203f755ceb21

Writing a queue system in Redis

Our current project requires a task queue to handle data processing without overloading the server with too many processes at once (or losing data, or a million other reasons).

Recently, however, we've switched how we queue up processes to one where, for entirely good reasons, it's possible that the system will try and queue up several hundred duplicate tasks in a short space of time (and only one of these will actually do something useful).

Up until now we've been using Beanstalkd via the fivebeans node module to handle, but as a pure FIFO queue (albeit with peek-support), we can't easily remove duplicates or otherwise disallow them from being created.

An initial solution idea was to use Redis to mark things as being processed, or otherwise keep track of what from the queue should actually be executed. But of course, why bother making a hybrid when you can just drop a technical dependency entirely!

You may remember from a previous post (http://coderwall.com/p/8ozzgq) that we are already using a combination of Beanstalk for the tasking and Redis for holding the task payload to get around the 64kb message limit on Beanstalk. We can also get rid of that by using Redis as the task queue!

The task queue needs to provide the following:

  1. Support multiple queues of tasks
  2. FIFO execution, to ensure correct order-of-operations
  3. Priority queuing
  4. A guarantee that processes are only executed once.
  5. Refusal to add duplicate tasks
  6. Speed
  7. Allow tasks to be delayed and have an expiry

To achieve this, we now do the following when adding a task:

put: (data, queue, priority, delay=0, expire=-1) ->
    data = JSON.stringify data
    hash = sha512 data
    start = date + delay
    end = (expire < 0 and expire) or start + expire
    task =
        hash: hash
        start: start
        end: end

    method = (priority and 'lpush') or 'rpush'

    key = 'queue:#{queue}:'
    redis.setnx key+'#{hash}', data, (err, done) =>
        if done
            redis[method] key+'tasks', task

The most important part is the "setnx" command. This only writes to the key if it doesn't already exist, and because they key we are using contains a hash of the payload, we have de-duplication!

The callback to setnx recieves a boolean indicating wether it wrote to the key, and if it did then we put the task-pointer onto the queue. The [method] part is just a switch between putting it on either the start or the end of the queue, to allow high-priority tasks to execute straight away.

The "get task" method is nothing special - just some stuff to handle the start/end time and resolving the reference hash to the actual payload. Super simple stuff.

At this stage we have a working solution for our needs, but if you know of a MQ system that provides de-dup, wether it's in Redis or not, send me a tweet!

And if you are thinking "why not resque?" then it's fairly simple - Resque is a much heavier solution to this problem than we needed, as all the tasking, forking, etc. stuff was already implemented for Beanstalk.

It sure does feel nice removing a line from package.json...

7 Responses
Add your response

145
12003957 10206752435102297 2524621803006121539 n

Very cool, didn't know about setnx. So were you able to completely move off Beanstalkd?

over 1 year ago ·
146
13027845faeb65d26acd203f755ceb21

@mdeiters yep, Beanstalkd is now completely out of our dependencies list!

over 1 year ago ·
148
23e4fc3add5443f51e55663cfc245d24

I wrote a lightweight event scheduling library, and the events/payload added had to be unique and run on a scheduled time. I've managed to avoid the use of setnx by using a zset queue; the zset handles both scenarios nicely by it being a set and having scores.

over 1 year ago ·
150
13027845faeb65d26acd203f755ceb21

@erol what is the benefit of avoiding setnx? I assume it does a key lookup before write which is two ops instead of one.

the zset queue seems to be a more natural solution, perhaps you could do a writeup?

over 1 year ago ·
157
23e4fc3add5443f51e55663cfc245d24

Looks like I worded that poorly. Since the queue is built on top of a zset, calling zadd will always result in a unique set and a setnx call is no longer necessary. Zadd also has a behavior which I found useful: calling zadd with an existing member updates the member's score and reorders the zset, allowing an event to be "rescheduled" to a different time.

I'll post a writeup later. You can also browse the code here: https://github.com/Erol/later.

over 1 year ago ·
164
13027845faeb65d26acd203f755ceb21

ah, that does sound quite useful; we're working in Node.js for ours so directly using later isn't possible, but it can definitely form the basis for the next rewrite of the queue system!

over 1 year ago ·
7706

Well the solution is very nice, however I'm not sure if you heard about RabbitMQ. It's worth trying as it works for as like charm.

over 1 year ago ·