Building a concurrent system isn’t as hard as they say it is. What it boils down to is, you can’t program by coincidence. Here’s a list of qualities in a strong developer:
- Comfort in thinking about the state(s) of their program
- Studies and understands the abstractions one or two layers above and below their program
- Listens to the design forces on their code
Today, I want to give you a starting point for tinkering with and understanding concurrent programs, particularly in modern Ruby (JRuby and Rubinius 2.0).
Work queues, out-of-process and in-process
Lots of apps use a queue to get stuff done. Throw jobs on a queue, spin up a bunch of processes, run a job worker in those processes. Simple, right? Well, not entirely. You’ve got to store those jobs somewhere, make sure pulling jobs out of it won’t lose critical work, run worker processes somewhere, restart them if they fail, make sure they don’t leak memory, etc. Writing that first bit of code is easy, but deploying it ends up being a little costly.If process concurrency is the only available trick, running a Resque-style job queue works. But now that thread concurrency is viable with Ruby, we can look at handling these same kind of jobs in-process instead of externally. At the cost of some additional code and additional possible states in our process, we save all sorts of operational complexity.
Baby’s first work queue
Resque is a great abstraction. Let’s see if we can build something like it. Here’s how we’ll add jobs to our in-process queue:Work.enqueue(EchoJob, "I am doing work!")
And this is how we’ll define a worker. Note that I’ve gone with call
instead of perform
, because that is my wont lately.
class EchoJob
def call(message)
puts message
end
end
Simple enough. Now let’s make this thing actually work!
Humble beginnings
require 'thread'
require 'timeout'
module Work
@queue = Queue.new
@n_threads = 2
@workers = []
@running = true
Job = Struct.new(:worker, :params)
First off, we pull in thread
, which gives us Thread
and our new best friend, Queue
. We also need timeout
so we have a way to interrupt methods that block.
Then we define our global work queue, aptly named Work
. It’s got a modest amount of state: a queue to store pending work on, a parameter for the number of threads (I went with two since my MacBook Air has two real cores), an array to keep track of the worker threads, and a flag that indicates whether the work queue should keep running.
Finally, we define a little job object, because schlepping data around inside a library with a hash is suboptimal. Data that represents a concept deserves a name and some structure!
A public API appears
module_function
def enqueue(worker, *params)
@queue <;<; Job.new(worker, params)
end
def start
@workers = @n_threads.times.map { Thread.new { process_jobs } }
end
This is the heart of the API. Note the use of module_function
with no arguments; this makes all the following methods attach to the module object like class methods. This saves us the tedium of typing self.some_method
all the time. Happy fingers!
Users of Work
will add new jobs with enqueue
, just like Resque. It’s a lot simpler in our case, though, because we never have to cross process boundaries. No marshaling, no problem.
Once the queue is loaded up (or even if it’s not), users then call start
. This fires up a bunch of threads and starts processing jobs. We need to keep track of those threads for later, so we toss them into a module instance variable.
The crux of the biscuit
def process_jobs
while @running
job = nil
Timeout.timeout(1) do
job = @queue.pop
end
job.worker.new.call(*job.params)
end
end
Here’s the heart of this humble little work queue. It’s easiest to look at this one from the inside out. The crux of the biscuit is popping off the queue. For one thing, this is thread-safe, so two workers can pop off the queue at the same time and get different jobs back.
More importantly, @queue.pop
will block, forever, if the queue is empty. That makes it easy for us to avoid hogging the CPU fruitlessly looking for new work. It does, however, mean we need to wrap the pop operation in a timeout, so that we can eventually get back to our loop and do some housekeeping.
Housekeeping task the first, run that job. This looks almost just like the code you’ll find inside Resque workers. Create a new instance of the class that handles this job, invoke our call
interface, pass the job params on. Easy!
Housekeeping task the second, see if the worker should keep running. If the @running
flag is still set, we’re good to continue consuming work off the queue. If not, something has signaled that it’s time to wrap up.
Shutting down
def drain
loop do
break if @queue.empty?
sleep 1
end
end
def stop
@running = false
@workers.each(&:join)
end
end
Shutting down our work queue is a matter of draining any pending jobs and then closing out the running threads. drain
is a little oddly named. It doesn’t actually do the draining, but it does block until the queue is drained. We use it as a precondition for calling stop
, which tells all the workers to finish the job they’ve got and then exit their processing loop. We then call Thread#join
to shutdown the worker threads.
All together now
This is how we use our cute little work queue:10.times { |n| Work.enqueue(EchoJob, "I counted to #{n}") }
Process jobs in another thread(s)
Work.start
Block until all jobs are processed
Work.drain
Stop the workers
Work.stop
Create work, start our workers, block until they finish, and then stop working. Not too bad for fifty lines of code.
That wasn’t too hard
A lot is made about how the difficulty of concurrent programming. “Oh, the locks, oh the error cases!” they cry. Maybe it is trickier. But it’s not rocket science hard. Hell, it’s not even monads and contravariance hard.What I hope I’ve demonstrated today is that concurrent programming, even in Ruby with all its implementation shortcomings, is approachable. To wit:
- Ruby has a great API for working with threads themselves. You call
Thread.new
and pass it some code to run in a thread. Done! - Ruby’s
Queue
class is threadsafe and a great tool for coordinating concurrent programs. You can get pretty far without thinking about locks with a queue. Push things onto it from one thread, pull things off from another. Block on a queue until you get the signal you’re waiting for. It’s a lovely little abstraction. - It’s easy to tinker with concurrency. You don’t have to write a giant program or have exotic problems to take advantage of Ruby for concurrency.
drain
) are inelegant. If you want to read ahead, locks and latches are the droids you’re looking for.
I hope you see the ease with which we can get started doing concurrent Ruby programming by learning just two new classes. Don’t fear the threads, friend!</