Keep your application state out of your queue

I’m going to pick on Resque here, since it’s nominally great and widely used. You’re probably using it in your application right now. Unfortunately, I need to tell you something unsettling.

¡DRAMA MUSIC!

There’s an outside chance your application is dropping jobs. The Resque worker process pulls a job off the queue, turns it into an object, and passes it to your class for processing. This handles the simple case beautifully. But failure cases are important, if tedious, to consider.

What happens if there’s an exception in your processing code? There’s a plugin for that. What happens if you need to restart your Resque process while a job is processing? There’s a signal for that. What if, between taking a job off the queue and fully processing it, the whole server disappears due to a network partition, hardware failure, or the FBI “borrowing” your rack?

¡DRAMA MUSIC!

Honestly, you shouldn’t treat Resque, or even Redis, like you would a relational or dynamo-style database. Redis, like memcached, is designed as a cache. You put things in it, you can get it out, really fast. Redis, like memcached, rarely falls over. But if it does, the remediation steps are manual. Redis currently doesn’t have a good High Availability setup (it’s being contemplated).

Further, Resque assumes that clients will properly process every message they dequeue. This isn’t a bad assumption. Most systems work most of the time. But, if a Resque worker process fails, it’s not great. It will lose all of the message(s) held in memory, and the Redis instance that runs your Resque queues is none the wiser.

¡DRAMA MUSIC!

In my past usage of Resque, this isn’t that big of a deal. Most jobs aren’t business-critical. If the occasional image doesn’t get resized or a notification email doesn’t go out, life goes on. A little logging and data tweaking cures many ills.

But, some jobs are business-critical. They need stronger semantics than Resque provides. The processing of those jobs, the state of that processing, is part of our application’s logic. We need to model those jobs in our application and store that state somewhere we can trust.

I first became really aware of this problem, and a nice solution to it, listening to the Ruby Rogues podcast. Therein, one of the panelists advised everyone to model crucial processing as state machines. The jobs become the transitions from one model state to the next. You store the state alongside an appropriate model in your database. If a job should get dropped, it’s possible to scan the database for models that are in an inconsistent state and issue the job again.

Example follows

Let’s work an example. For our imaginary application, comment notifications are really important. We want to make sure they get sent, come hell or high water. Here’s what our comment model looks like originally:

    class Comment

      after_create :send_notification

      def send_notification
        Resque.enqueue(NotifyUser, self.user_id, self.id)
      end

    end

Now we’ll add a job to send that notification:

    class NotifyUser
      @queue = :notify_user

      def self.perform(user_id, comment_id)
        # Load the user and comment, send a notification!
      end

    end

But, as I’ve pointed out with great drama, this code can drop jobs. Let’s throw that state machine in:

    class Comment
      # We'll use acts-as-state-machine. It's a classic.
      include AASM

      # We store the state of sending this notification in the aptly named
      # `notification_state` column. AASM gives us predicate methods to see if this
      # model is in the `pending?` or `sent?` states and a `notification_sent!`
      # method to go from one state to the next.
      aasm :column => :notification_state do
        state :pending, :initial => true
        state :sent

        event :notification_sent do
          transitions :to => :sent, :from => [:pending]
        end
      end

      after_create :send_notification

      def send_notification
        Resque.enqueue(NotifyUser, self.user_id, self.id)
      end

    end

Our notification has two states: pending, and sent. Our web app creates it in the pending state. After the job finishes, it will put it in the sent state.

    class NotifyUser
      @queue = :notify_user

      def self.perform(user_id, comment_id)
        user    = User.find(user_id)
        comment = Comment.find(comment_id)

        user.notify_for(comment)
        # Notification success! Update the comment's state.
        comment.notification_sent!
      end

    end

This a good start for more reliably processing jobs. However, most jobs happen to handle the interaction between two systems. This notification is a great example. It integrates our application with a mail server or another service that handles our notifications. Talking to those things is probably something that isn’t tolerant to duplicate requests. If our process croaks between the time it tells the mail server to send and the time it updates the notification state in our database, we could accidentally process this notification twice. Back to square one?

¡DRAMA MUSIC!

Not quite. We can reduce our problem space once more by adding another state to our model.

    class Comment
      include AASM

      aasm :column => :notification_state do
        state :pending, :initial => true
        state :sending # We have attempted to send a notification
        state :sent    # The notification succeeded
        state :error   # Something is amiss :(

        # This happens right before we attempt to send the notification
        event :notification_attempted do
          transitions :to => :sending, :from [:pending]
        end

        # We take this transition if an exception occurs
        event :notification_error do
          transitions :to => :error, :from => [:sending]
        end

        # When everything goes to plan, we take this transition
        event :notification_sent do
          transitions :to => :sent, :from => [:sending]
        end

      end

      after_create :send_notification

      def send_notification
        Resque.enqueue(NotifyUser, self.user_id, self.id)
      end

    end

Now, when our job processes a notification, it first uses notification_attempted. Should this job fail, we’ll know which jobs we should look for in our logs. We could also get a little sneaky and monitor the number of jobs in this state if we think we’re bottlenecking around sending the actual notification. Once the job completes, we transition to the sent state. If anything goes wrong, we catch the exception and put the job in the error state. We definitely want to monitor this state and use the logs to figure out what went wrong, manually fix it, and perhaps write some code to fix bugs or add robustness.

The sending state is entered when at least one worker has picked up a notification and tried to send the message. Should that worker fail in sending the message or updating the database, we will know. When trouble strikes, we’ll know we have two cases to deal with: notifications that haven’t been touched at all, and notifications that were attempted and may have succeeded. The former, we’ll handle by requeueing them. The latter, we’ll probably have to write a script to grep our mail logs and see if we successfully sent the message. (You are logging everything, centrally aggregating it, and know how to search it, right?)

The truth is, the integration points between systems is a gnarly problem. You don’t so much solve the problem; you get really good at detecting and responding to edge cases. Thus is life in production. But losing jobs, we can make really good progress on that. Don’t worry about your low-value jobs; start with the really important ones, and weave the state of those jobs into the rest of your application. Some day, you’ll thank yourself for doing that.

Published by Adam Keys

Telling a joke. Typing.

7 replies on “Keep your application state out of your queue”

  1. Alas now our comment has a state machine that is there to work around problems of the job processing system. Could something like this be moved down into Resque (i.e. the job has the persistent state)?

  2. @Anthony: I wouldn’t say that our comment now has state due to the job processing. I would say that our comment needed state from the beginning but it was overlooked. Learning how our job processing worked helped us realize that we needed something more robust around the state of a given comment.

  3. Redis is indeed not a very good queueing system. It’s not a bad one either, but it’s basically a toy. It works fine for your quick-and-dirty work distribution, where there’s no big deal if the job isn’t done. Redis and Resque are also miles better than some of the really backwards solutions that have cropped up lately, like Queue Classic and Boxed Ice’s MongoDB-backed queueing. Messaging and work distribution isn’t a problem that can be solved as a library, it doesn’t work very well.

    I think you don’t have to do to such lengths as to implement state machine logic to solve this problem. It’s easily solved by using a real message broker instead of a database. A message broker handles problems like workers dying half-way through, and even parts of the broker becoming unavailable in a much simpler, and fault tolerant way. They deliver a message to a worker, and the worker acknowledges it when it’s done processing. If the worker disconnects, for example by crashing, the message is requeued, or if the worker detects and error itself it can tell the broker to requeue or discard the message. In a way, they implement the state machine you describe, but in a more robust way. They are very simple to use, and have much better performance than Resque. Try RabbitMQ, it’s great to work with and has HA features that are ages ahead of Redis.

    Even if Redis has a few features that can be used for messaging and work distribution — blocking pop is a poor, but workable, version of subscriptions and pub/sub is not a bad for simple fanout messaging — in general databases are crap at this kind of thing. Message brokers are designed for messaging, it’s a very different use case from databases, and I think that most projects that use Resque could benefit from using, for example, RabbitMQ instead.

    Again, Redis isn’t bad for work distribution, and if you’re fine with having to write the state machine logic yourself, then it works. But if you want to avoid that code, and get better reliability and performance for free I encourage you to look beyond Redis.

  4. Great article!

    Re: resque dropping jobs: we recently developed an in-house queueing system that is heavily inspired from resque, but doesn’t drop jobs*:

    https://github.com/seomoz/qless

    Like resque, it’s built on top of redis, but the main difference is that it is powered by a set of lua scripts running on redis 2.6. The lua scripts do lots of cool atomic stuff you couldn’t do on the client side. There’s a configurable heartbeat interval. Workers get a lock on a job, and must check in (complete it, fail it, or say, “I’m still working on it”) within the heartbeat interval; if they don’t, the job will be given to another worker.

    * Standard caveats like “if the datacenter gets nuked, then yes, it will drop jobs” still apply.

  5. What John said. If a job is critical, like say payments or domain transfers *cough*, it’s not really a job. It’s part of your application’s state and modeling it as such makes sense, to me.

    What the headline of this article says, and I failed to emphasize, is that it seems that assuming a reliable, durable queue is possibly architectural folly. If it’s that important to you, model and store that state along with your other business critical data.

    If that still makes you uneasy, working from an event feed is the next thing I’m going to research in this area. A few useful tweets:

  6. John makes a good point, but lets acknowledge Theo’s wisdom here as well. Resque is good, but there’s no reason a queue can’t have stronger guarantees that solve half the problem described here. I say half because if a job can’t be done atomically or idempotently at the application-level, then a robust messaging system won’t save you, but some jobs *can* be done atomically or idempotently in which case the queue could be the weak link.

Comments are closed.