Getting started with Ruby Concurrency using two simple classes
Building a concurrent system isn’t as hard as they say it is. What it boils down to is, you can’t program by coincidence. Here’s a list of qualities in a strong developer:
- Comfort in thinking about the state(s) of their program
- Studies and understands the abstractions one or two layers above and below their program
- Listens to the design forces on their code
Today, I want to give you a starting point for tinkering with and understanding concurrent programs, particularly in modern Ruby (JRuby and Rubinius 2.0).
Work queues, out-of-process and in-process
Lots of apps use a queue to get stuff done. Throw jobs on a queue, spin up a bunch of processes, run a job worker in those processes. Simple, right? Well, not entirely. You’ve got to store those jobs somewhere, make sure pulling jobs out of it won’t lose critical work, run worker processes somewhere, restart them if they fail, make sure they don’t leak memory, etc. Writing that first bit of code is easy, but deploying it ends up being a little costly.If process concurrency is the only available trick, running a Resque-style job queue works. But now that thread concurrency is viable with Ruby, we can look at handling these same kind of jobs in-process instead of externally. At the cost of some additional code and additional possible states in our process, we save all sorts of operational complexity.
Baby’s first work queue
Resque is a great abstraction. Let’s see if we can build something like it. Here’s how we’ll add jobs to our in-process queue:Work.enqueue(EchoJob, "I am doing work!")
call instead of perform, because that is my wont lately.
class EchoJob
def call(message)
puts message
end
end
Humble beginnings
require 'thread'
require 'timeout'
module Work
@queue = Queue.new
@n_threads = 2
@workers = []
@running = true
Job = Struct.new(:worker, :params)
thread, which gives us Thread and our new best friend, Queue. We also need timeout so we have a way to interrupt methods that block.
Then we define our global work queue, aptly named Work. It’s got a modest amount of state: a queue to store pending work on, a parameter for the number of threads (I went with two since my MacBook Air has two real cores), an array to keep track of the worker threads, and a flag that indicates whether the work queue should keep running.
Finally, we define a little job object, because schlepping data around inside a library with a hash is suboptimal. Data that represents a concept deserves a name and some structure!
A public API appears
  module_function
  def enqueue(worker, *params)
    @queue <;<; Job.new(worker, params)
  end
def start
@workers = @n_threads.times.map { Thread.new { process_jobs } }
end
module_function with no arguments; this makes all the following methods attach to the module object like class methods. This saves us the tedium of typing self.some_method all the time. Happy fingers!
Users of Work will add new jobs with enqueue, just like Resque. It’s a lot simpler in our case, though, because we never have to cross process boundaries. No marshaling, no problem.
Once the queue is loaded up (or even if it’s not), users then call start. This fires up a bunch of threads and starts processing jobs. We need to keep track of those threads for later, so we toss them into a module instance variable.
The crux of the biscuit
  def process_jobs
    while @running
      job = nil
      Timeout.timeout(1) do
        job = @queue.pop
      end
      job.worker.new.call(*job.params)
    end
  end
More importantly, @queue.pop will block, forever, if the queue is empty. That makes it easy for us to avoid hogging the CPU fruitlessly looking for new work. It does, however, mean we need to wrap the pop operation in a timeout, so that we can eventually get back to our loop and do some housekeeping.
Housekeeping task the first, run that job. This looks almost just like the code you’ll find inside Resque workers. Create a new instance of the class that handles this job, invoke our call interface, pass the job params on. Easy!
Housekeeping task the second, see if the worker should keep running. If the @running flag is still set, we’re good to continue consuming work off the queue. If not, something has signaled that it’s time to wrap up.
Shutting down
  def drain
    loop do
      break if @queue.empty?
      sleep 1
    end
  end
def stop
@running = false
@workers.each(&:join)
end
end
drain is a little oddly named. It doesn’t actually do the draining, but it does block until the queue is drained. We use it as a precondition for calling stop, which tells all the workers to finish the job they’ve got and then exit their processing loop. We then call Thread#join to shutdown the worker threads.
All together now
This is how we use our cute little work queue:10.times { |n| Work.enqueue(EchoJob, "I counted to #{n}") }
Process jobs in another thread(s)
Work.start
Block until all jobs are processed
Work.drain
Stop the workers
Work.stop
That wasn’t too hard
A lot is made about how the difficulty of concurrent programming. “Oh, the locks, oh the error cases!” they cry. Maybe it is trickier. But it’s not rocket science hard. Hell, it’s not even monads and contravariance hard.What I hope I’ve demonstrated today is that concurrent programming, even in Ruby with all its implementation shortcomings, is approachable. To wit:
- Ruby has a great API for working with threads themselves. You call Thread.newand pass it some code to run in a thread. Done!
- Ruby’s Queueclass is threadsafe and a great tool for coordinating concurrent programs. You can get pretty far without thinking about locks with a queue. Push things onto it from one thread, pull things off from another. Block on a queue until you get the signal you’re waiting for. It’s a lovely little abstraction.
- It’s easy to tinker with concurrency. You don’t have to write a giant program or have exotic problems to take advantage of Ruby for concurrency.
drain) are inelegant. If you want to read ahead, locks and latches are the droids you’re looking for.
I hope you see the ease with which we can get started doing concurrent Ruby programming by learning just two new classes. Don’t fear the threads, friend!</
