Getting started with Ruby Concurrency using two simple classes

Building a concurrent system isn’t as hard as they say it is. What it boils down to is, you can’t program by coincidence. Here’s a list of qualities in a strong developer:

  • Comfort in thinking about the state(s) of their program
  • Studies and understands the abstractions one or two layers above and below their program
  • Listens to the design forces on their code

Happily, that’s all you need to get started writing code running in multiple threads. You don’t need a graduate degree, mathematical tricks, a specially-ordained language, or membership in the cult of writing concurrent programs.

Today, I want to give you a starting point for tinkering with and understanding concurrent programs, particularly in modern Ruby (JRuby and Rubinius 2.0).

Work queues, out-of-process and in-process

Lots of apps use a queue to get stuff done. Throw jobs on a queue, spin up a bunch of processes, run a job worker in those processes. Simple, right? Well, not entirely. You’ve got to store those jobs somewhere, make sure pulling jobs out of it won’t lose critical work, run worker processes somewhere, restart them if they fail, make sure they don’t leak memory, etc. Writing that first bit of code is easy, but deploying it ends up being a little costly.

If process concurrency is the only available trick, running a Resque-style job queue works. But now that thread concurrency is viable with Ruby, we can look at handling these same kind of jobs in-process instead of externally. At the cost of some additional code and additional possible states in our process, we save all sorts of operational complexity.

Baby’s first work queue

Resque is a great abstraction. Let’s see if we can build something like it. Here’s how we’ll add jobs to our in-process queue:

Work.enqueue(EchoJob, "I am doing work!")

And this is how we’ll define a worker. Note that I’ve gone with call instead of perform, because that is my wont lately.

class EchoJob

  def call(message)
    puts message
  end

end

Simple enough. Now let’s make this thing actually work!

Humble beginnings

require 'thread'
require 'timeout'

module Work
  @queue = Queue.new
  @n_threads = 2
  @workers = []
  @running = true

  Job = Struct.new(:worker, :params)

First off, we pull in thread, which gives us Thread and our new best friend, Queue. We also need timeout so we have a way to interrupt methods that block.

Then we define our global work queue, aptly named Work. It’s got a modest amount of state: a queue to store pending work on, a parameter for the number of threads (I went with two since my MacBook Air has two real cores), an array to keep track of the worker threads, and a flag that indicates whether the work queue should keep running.

Finally, we define a little job object, because schlepping data around inside a library with a hash is suboptimal. Data that represents a concept deserves a name and some structure!

A public API appears

  module_function
  def enqueue(worker, *params)
    @queue <;<; Job.new(worker, params)
  end

  def start
    @workers = @n_threads.times.map { Thread.new { process_jobs } }
  end

This is the heart of the API. Note the use of module_function with no arguments; this makes all the following methods attach to the module object like class methods. This saves us the tedium of typing self.some_method all the time. Happy fingers!

Users of Work will add new jobs with enqueue, just like Resque. It’s a lot simpler in our case, though, because we never have to cross process boundaries. No marshaling, no problem.

Once the queue is loaded up (or even if it’s not), users then call start. This fires up a bunch of threads and starts processing jobs. We need to keep track of those threads for later, so we toss them into a module instance variable.

The crux of the biscuit

  def process_jobs
    while @running
      job = nil
      Timeout.timeout(1) do
        job = @queue.pop
      end
      job.worker.new.call(*job.params)
    end
  end

Here’s the heart of this humble little work queue. It’s easiest to look at this one from the inside out. The crux of the biscuit is popping off the queue. For one thing, this is thread-safe, so two workers can pop off the queue at the same time and get different jobs back.

More importantly, @queue.pop will block, forever, if the queue is empty. That makes it easy for us to avoid hogging the CPU fruitlessly looking for new work. It does, however, mean we need to wrap the pop operation in a timeout, so that we can eventually get back to our loop and do some housekeeping.

Housekeeping task the first, run that job. This looks almost just like the code you’ll find inside Resque workers. Create a new instance of the class that handles this job, invoke our call interface, pass the job params on. Easy!

Housekeeping task the second, see if the worker should keep running. If the @running flag is still set, we’re good to continue consuming work off the queue. If not, something has signaled that it’s time to wrap up.

Shutting down

  def drain
    loop do
      break if @queue.empty?
      sleep 1
    end
  end

  def stop
    @running = false
    @workers.each(&:join)
  end

end

Shutting down our work queue is a matter of draining any pending jobs and then closing out the running threads. drain is a little oddly named. It doesn’t actually do the draining, but it does block until the queue is drained. We use it as a precondition for calling stop, which tells all the workers to finish the job they’ve got and then exit their processing loop. We then call Thread#join to shutdown the worker threads.

All together now

This is how we use our cute little work queue:

10.times { |n| Work.enqueue(EchoJob, "I counted to #{n}") }

# Process jobs in another thread(s)
Work.start

# Block until all jobs are processed
Work.drain

# Stop the workers
Work.stop

Create work, start our workers, block until they finish, and then stop working. Not too bad for fifty lines of code.

That wasn’t too hard

A lot is made about how the difficulty of concurrent programming. “Oh, the locks, oh the error cases!” they cry. Maybe it is trickier. But it’s not rocket science hard. Hell, it’s not even monads and contravariance hard.

What I hope I’ve demonstrated today is that concurrent programming, even in Ruby with all its implementation shortcomings, is approachable. To wit:

  • Ruby has a great API for working with threads themselves. You call Thread.new and pass it some code to run in a thread. Done!
  • Ruby’s Queue class is threadsafe and a great tool for coordinating concurrent programs. You can get pretty far without thinking about locks with a queue. Push things onto it from one thread, pull things off from another. Block on a queue until you get the signal you’re waiting for. It’s a lovely little abstraction.
  • It’s easy to tinker with concurrency. You don’t have to write a giant program or have exotic problems to take advantage of Ruby for concurrency.

All that said, as I was writing this post up, some shortcomings in this example script jumped out at me. Output from the workers can appear out of order (classic concurrent program challenge), we can drain the queue while new work is still arriving (easily solved, but not with queues) and sleep loops (like in drain) are inelegant. If you want to read ahead, locks and latches are the droids you’re looking for.

I hope you see the ease with which we can get started doing concurrent Ruby programming by learning just two new classes. Don’t fear the threads, friend!</

8 comments

  1. Wade Winningham · June 19, 2012

    Nice! You mention issues with output from the workers that can appear out of order. While there’s not much you can do about that, I’d be curious about the problems you’ve run into relative to that. Auto-number ID issues, etc.

  2. Adam Keys · June 19, 2012

    For any kind of global state (standard output, counters, logs) there’s going to be a mutex lock around accessing it. If you’re lucky, that global state already has locks or reentrancy built-in (c.f. a relational database). If you’re less lucky, you’ll have to write that mutex yourself. I’ll probably write an article on locks and their unintended side-effects next.

  3. Mike Perham · June 19, 2012

    Unfortunately Ruby doesn’t have a Queue with a timed pop operation, which is almost always required for queue processing. I have one here:

    https://github.com/mperham/connection_pool/blob/master/lib/timed_queue.rb

    Another interesting resource to page through for in-process queueing is my girl_friday gem: https://github.com/mperham/girl_friday

  4. Adam Keys · June 19, 2012

    First off, I should thank Mike for blazing a path for Ruby concurrency. He’s got a lot of bruises from trying this stuff years ago when it wasn’t quite such a rosy situation. Mike is a good guy.

    That’s a nice addition to Queue. I’d also love to see a latch class make it into the standard library, somehow.

  5. Mike Perham · June 19, 2012

    Thanks, Adam.

    Knowing the Thread API and how it works is important but I look at it like assembly language: it’s important to your understanding of how your application is executed but I’d suggest 99% of applications should not use threads directly.

    Having done this stuff for a while now, I always recommend people use actors to create a OO design for their concurrent infrastructure. I’ve seen a very real and tangible reduction in the number of race conditions and deadlocks I’ve had to debug due to using Celluloid vs threads directly. I’ve spent more time debugging threading issues in my connection_pool gem than I have in Sidekiq and Sidekiq is an order of magnitude larger in size and complexity.

  6. Adam Keys · June 19, 2012

    I’ve found learning threads is quite useful, for me, in better understanding how to use things like actors, executor pools, etc. To carry the assembly language metaphor through, I think we’re still in the days of C. The abstractions we’re using are a portable layer over the underlying mechanisms. I don’t think we’ve really reached the days of Java or Ruby where your average, thread-using developer doesn’t think need to think about threads directly.

    I’m hoping ideas like Celluloid, factory_girl, or Go-style agents will prove to be that Java/Ruby-like breakthrough.

  7. Jim Howard · June 19, 2012

    @workers.join does not do what you think. Join concatenates the array contents. You want to call @workers.each(&:join)

  8. Adam Keys · June 19, 2012

    You’re absolutely right. Corrected!

Comments are closed.