Getting started with Ruby Concurrency using two simple classes

Building a concurrent system isn’t as hard as they say it is. What it boils down to is, you can’t program by coincidence. Here’s a list of qualities in a strong developer:

  • Comfort in thinking about the state(s) of their program
  • Studies and understands the abstractions one or two layers above and below their program
  • Listens to the design forces on their code
Happily, that’s all you need to get started writing code running in multiple threads. You don’t need a graduate degree, mathematical tricks, a specially-ordained language, or membership in the cult of writing concurrent programs.

Today, I want to give you a starting point for tinkering with and understanding concurrent programs, particularly in modern Ruby (JRuby and Rubinius 2.0).

Work queues, out-of-process and in-process

Lots of apps use a queue to get stuff done. Throw jobs on a queue, spin up a bunch of processes, run a job worker in those processes. Simple, right? Well, not entirely. You’ve got to store those jobs somewhere, make sure pulling jobs out of it won’t lose critical work, run worker processes somewhere, restart them if they fail, make sure they don’t leak memory, etc. Writing that first bit of code is easy, but deploying it ends up being a little costly.

If process concurrency is the only available trick, running a Resque-style job queue works. But now that thread concurrency is viable with Ruby, we can look at handling these same kind of jobs in-process instead of externally. At the cost of some additional code and additional possible states in our process, we save all sorts of operational complexity.

Baby’s first work queue

Resque is a great abstraction. Let’s see if we can build something like it. Here’s how we’ll add jobs to our in-process queue:
Work.enqueue(EchoJob, "I am doing work!")
And this is how we’ll define a worker. Note that I’ve gone with call instead of perform, because that is my wont lately.
class EchoJob

def call(message) puts message end

end

Simple enough. Now let’s make this thing actually work!

Humble beginnings

require 'thread'
require 'timeout'

module Work @queue = Queue.new @n_threads = 2 @workers = [] @running = true

Job = Struct.new(:worker, :params)

First off, we pull in thread, which gives us Thread and our new best friend, Queue. We also need timeout so we have a way to interrupt methods that block.

Then we define our global work queue, aptly named Work. It’s got a modest amount of state: a queue to store pending work on, a parameter for the number of threads (I went with two since my MacBook Air has two real cores), an array to keep track of the worker threads, and a flag that indicates whether the work queue should keep running.

Finally, we define a little job object, because schlepping data around inside a library with a hash is suboptimal. Data that represents a concept deserves a name and some structure!

A public API appears

  module_function
  def enqueue(worker, *params)
    @queue <;<; Job.new(worker, params)
  end

def start @workers = @n_threads.times.map { Thread.new { process_jobs } } end

This is the heart of the API. Note the use of module_function with no arguments; this makes all the following methods attach to the module object like class methods. This saves us the tedium of typing self.some_method all the time. Happy fingers!

Users of Work will add new jobs with enqueue, just like Resque. It’s a lot simpler in our case, though, because we never have to cross process boundaries. No marshaling, no problem.

Once the queue is loaded up (or even if it’s not), users then call start. This fires up a bunch of threads and starts processing jobs. We need to keep track of those threads for later, so we toss them into a module instance variable.

The crux of the biscuit

  def process_jobs
    while @running
      job = nil
      Timeout.timeout(1) do
        job = @queue.pop
      end
      job.worker.new.call(*job.params)
    end
  end
Here’s the heart of this humble little work queue. It’s easiest to look at this one from the inside out. The crux of the biscuit is popping off the queue. For one thing, this is thread-safe, so two workers can pop off the queue at the same time and get different jobs back.

More importantly, @queue.pop will block, forever, if the queue is empty. That makes it easy for us to avoid hogging the CPU fruitlessly looking for new work. It does, however, mean we need to wrap the pop operation in a timeout, so that we can eventually get back to our loop and do some housekeeping.

Housekeeping task the first, run that job. This looks almost just like the code you’ll find inside Resque workers. Create a new instance of the class that handles this job, invoke our call interface, pass the job params on. Easy!

Housekeeping task the second, see if the worker should keep running. If the @running flag is still set, we’re good to continue consuming work off the queue. If not, something has signaled that it’s time to wrap up.

Shutting down

  def drain
    loop do
      break if @queue.empty?
      sleep 1
    end
  end

def stop @running = false @workers.each(&:join) end

end

Shutting down our work queue is a matter of draining any pending jobs and then closing out the running threads. drain is a little oddly named. It doesn’t actually do the draining, but it does block until the queue is drained. We use it as a precondition for calling stop, which tells all the workers to finish the job they’ve got and then exit their processing loop. We then call Thread#join to shutdown the worker threads.

All together now

This is how we use our cute little work queue:
10.times { |n| Work.enqueue(EchoJob, "I counted to #{n}") }

Process jobs in another thread(s)

Work.start

Block until all jobs are processed

Work.drain

Stop the workers

Work.stop

Create work, start our workers, block until they finish, and then stop working. Not too bad for fifty lines of code.

That wasn’t too hard

A lot is made about how the difficulty of concurrent programming. “Oh, the locks, oh the error cases!” they cry. Maybe it is trickier. But it’s not rocket science hard. Hell, it’s not even monads and contravariance hard.

What I hope I’ve demonstrated today is that concurrent programming, even in Ruby with all its implementation shortcomings, is approachable. To wit:

  • Ruby has a great API for working with threads themselves. You call Thread.new and pass it some code to run in a thread. Done!
  • Ruby’s Queue class is threadsafe and a great tool for coordinating concurrent programs. You can get pretty far without thinking about locks with a queue. Push things onto it from one thread, pull things off from another. Block on a queue until you get the signal you’re waiting for. It’s a lovely little abstraction.
  • It’s easy to tinker with concurrency. You don’t have to write a giant program or have exotic problems to take advantage of Ruby for concurrency.
All that said, as I was writing this post up, some shortcomings in this example script jumped out at me. Output from the workers can appear out of order (classic concurrent program challenge), we can drain the queue while new work is still arriving (easily solved, but not with queues) and sleep loops (like in drain) are inelegant. If you want to read ahead, locks and latches are the droids you’re looking for.

I hope you see the ease with which we can get started doing concurrent Ruby programming by learning just two new classes. Don’t fear the threads, friend!</

Adam Keys @therealadam