Keep your application state out of your queue

I’m going to pick on Resque here, since it’s nominally great and widely used. You’re probably using it in your application right now. Unfortunately, I need to tell you something unsettling.

¡DRAMA MUSIC!

There’s an outside chance your application is dropping jobs. The Resque worker process pulls a job off the queue, turns it into an object, and passes it to your class for processing. This handles the simple case beautifully. But failure cases are important, if tedious, to consider.

What happens if there’s an exception in your processing code? There’s a plugin for that. What happens if you need to restart your Resque process while a job is processing? There’s a signal for that. What if, between taking a job off the queue and fully processing it, the whole server disappears due to a network partition, hardware failure, or the FBI “borrowing” your rack?

¡DRAMA MUSIC!

Honestly, you shouldn’t treat Resque, or even Redis, like you would a relational or dynamo-style database. Redis, like memcached, is designed as a cache. You put things in it, you can get it out, really fast. Redis, like memcached, rarely falls over. But if it does, the remediation steps are manual. Redis currently doesn’t have a good High Availability setup (it’s being contemplated).

Further, Resque assumes that clients will properly process every message they dequeue. This isn’t a bad assumption. Most systems work most of the time. But, if a Resque worker process fails, it’s not great. It will lose all of the message(s) held in memory, and the Redis instance that runs your Resque queues is none the wiser.

¡DRAMA MUSIC!

In my past usage of Resque, this isn’t that big of a deal. Most jobs aren’t business-critical. If the occasional image doesn’t get resized or a notification email doesn’t go out, life goes on. A little logging and data tweaking cures many ills.

But, some jobs are business-critical. They need stronger semantics than Resque provides. The processing of those jobs, the state of that processing, is part of our application’s logic. We need to model those jobs in our application and store that state somewhere we can trust.

I first became really aware of this problem, and a nice solution to it, listening to the Ruby Rogues podcast. Therein, one of the panelists advised everyone to model crucial processing as state machines. The jobs become the transitions from one model state to the next. You store the state alongside an appropriate model in your database. If a job should get dropped, it’s possible to scan the database for models that are in an inconsistent state and issue the job again.

Example follows

Let’s work an example. For our imaginary application, comment notifications are really important. We want to make sure they get sent, come hell or high water. Here’s what our comment model looks like originally:

    class Comment

      after_create :send_notification

      def send_notification
        Resque.enqueue(NotifyUser, self.user_id, self.id)
      end

    end

Now we’ll add a job to send that notification:

    class NotifyUser
      @queue = :notify_user

      def self.perform(user_id, comment_id)
        # Load the user and comment, send a notification!
      end

    end

But, as I’ve pointed out with great drama, this code can drop jobs. Let’s throw that state machine in:

    class Comment
      # We'll use acts-as-state-machine. It's a classic.
      include AASM

      # We store the state of sending this notification in the aptly named
      # `notification_state` column. AASM gives us predicate methods to see if this
      # model is in the `pending?` or `sent?` states and a `notification_sent!`
      # method to go from one state to the next.
      aasm :column => :notification_state do
        state :pending, :initial => true
        state :sent

        event :notification_sent do
          transitions :to => :sent, :from => [:pending]
        end
      end

      after_create :send_notification

      def send_notification
        Resque.enqueue(NotifyUser, self.user_id, self.id)
      end

    end

Our notification has two states: pending, and sent. Our web app creates it in the pending state. After the job finishes, it will put it in the sent state.

    class NotifyUser
      @queue = :notify_user

      def self.perform(user_id, comment_id)
        user    = User.find(user_id)
        comment = Comment.find(comment_id)

        user.notify_for(comment)
        # Notification success! Update the comment's state.
        comment.notification_sent!
      end

    end

This a good start for more reliably processing jobs. However, most jobs happen to handle the interaction between two systems. This notification is a great example. It integrates our application with a mail server or another service that handles our notifications. Talking to those things is probably something that isn’t tolerant to duplicate requests. If our process croaks between the time it tells the mail server to send and the time it updates the notification state in our database, we could accidentally process this notification twice. Back to square one?

¡DRAMA MUSIC!

Not quite. We can reduce our problem space once more by adding another state to our model.

    class Comment
      include AASM

      aasm :column => :notification_state do
        state :pending, :initial => true
        state :sending # We have attempted to send a notification
        state :sent    # The notification succeeded
        state :error   # Something is amiss :(

        # This happens right before we attempt to send the notification
        event :notification_attempted do
          transitions :to => :sending, :from [:pending]
        end

        # We take this transition if an exception occurs
        event :notification_error do
          transitions :to => :error, :from => [:sending]
        end

        # When everything goes to plan, we take this transition
        event :notification_sent do
          transitions :to => :sent, :from => [:sending]
        end

      end

      after_create :send_notification

      def send_notification
        Resque.enqueue(NotifyUser, self.user_id, self.id)
      end

    end

Now, when our job processes a notification, it first uses notification_attempted. Should this job fail, we’ll know which jobs we should look for in our logs. We could also get a little sneaky and monitor the number of jobs in this state if we think we’re bottlenecking around sending the actual notification. Once the job completes, we transition to the sent state. If anything goes wrong, we catch the exception and put the job in the error state. We definitely want to monitor this state and use the logs to figure out what went wrong, manually fix it, and perhaps write some code to fix bugs or add robustness.

The sending state is entered when at least one worker has picked up a notification and tried to send the message. Should that worker fail in sending the message or updating the database, we will know. When trouble strikes, we’ll know we have two cases to deal with: notifications that haven’t been touched at all, and notifications that were attempted and may have succeeded. The former, we’ll handle by requeueing them. The latter, we’ll probably have to write a script to grep our mail logs and see if we successfully sent the message. (You are logging everything, centrally aggregating it, and know how to search it, right?)

The truth is, the integration points between systems is a gnarly problem. You don’t so much solve the problem; you get really good at detecting and responding to edge cases. Thus is life in production. But losing jobs, we can make really good progress on that. Don’t worry about your low-value jobs; start with the really important ones, and weave the state of those jobs into the rest of your application. Some day, you’ll thank yourself for doing that.

Adam Keys @therealadam