Monday, October 06, 2008

Asynchronous Fork/Join using Asio

As most Asio users will no doubt be aware, multiple threads can call io_service::run() to set up a pool of threads from which the completion handlers will be executed. This can be used in conjunction with io_service::post() to execute arbitrary tasks in the thread pool.

In some rare spare moments I have used this facility to dabble in parallel algorithms (mainly to do with sorting large data sets). However, I was not completely satisfied with its ease-of-use when it came to implementing the algorithms.

Recently I came across the new Fork/Join framework that's going to be included in Java 7. I was particularly struck by the simplicity of the coinvoke() function, and was inspired to implement something similar on top of Asio. Of course, what with Asio and my preferred mode of thinking, I wanted to implement an asynchronous version.

And the result...

I created a function template that may be used to initiate two or more tasks to run in parallel:

template <typename TaskCont0, ..., typename TaskContN,
typename Task0, ..., typename TaskN, Cont>
void coinvoke(asio::io_service& io_service,
Task0 task0, ..., TaskN taskN, Cont cont);

Each task must be a function object with a single argument, where the argument is the continuation function object for the task. The TaskContN template parameters explicitly specify the function signatures used for each of the task continuations. For example:

coinvoke<void(int), void(int)>(task0, task1, cont);

says that both tasks have a continuation with the signature void(int). The combining continuation has a signature that is the concatenation of all of the task continuations' arguments. In the above example, that's void(int, int).

The operation of coinvoke() works as follows (click image for full size view):

You can get the implementation here. Don't expect the source to be readable; it contains hairy uses of template metaprogramming and preprocessor magic.

Why continuations?

You might be wondering why each task passes its result to a continuation function rather than simply returning it. The answer to that is that a task need not be a single calcuation; it could instead be the start of a chain of asynchronous operations. This means that coinvoke() could be used to simplify management of parallel operations, such as writing data to multiple sockets, and not having the handler called until all operations have finished. I plan to explore those and other related ideas further in the near future, but for now let's just look at parallel algorithms.

Fibonacci revisited

The equivalent implementation of the Fibonacci example from the Java Fork/Join paper looks like:

void combine_fib(
int a, int b,
function<void(int)> h)
{
h(a + b);
}

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
bind(calc_fib, ref(io_service), n - 1, _1),
bind(calc_fib, ref(io_service), n - 2, _1),
bind(combine_fib, _1, _2, f));
}
}

Can I have C++0x lambdas now, please?

The need to define a separate combine_fib is not ideal, since a key part of the algorithm is off in another spot. Fortunately, C++0x's new monomorphic lambdas come to the rescue:

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 1, c),
},
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 2, c),
},
[=f](int a, int b)
{
f(a + b);
});
}
}

A useful example

Since that has to come close to being the most convoluted way of calculating a Fibonacci value, here is an example where coinvoke() is used for a parallel merge sort:

template <typename Iterator>
void merge(
Iterator begin,
Iterator middle,
Iterator end,
function<void()> f)
{
std::inplace_merge(begin, middle, end);
f();
}

template <typename Iterator>
void sort(
asio::io_service& io_service,
Iterator begin,
Iterator end,
function<void()> f)
{
std::size_t n = end - begin;
if (n <= 16384)
{
std::sort(begin, end);
io_service.post(f);
}
else
{
coinvoke<void(), void()>(io_service,

// First task sorts the initial half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin, begin + n / 2, _1),

// Second task sorts the latter half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin + n / 2, end, _1),

// Continuation function merges the two sorted ranges.
bind(&merge<Iterator>,
begin, begin + n / 2, end, f)

);
}
}

On my 8-core machine, this gives a little more than a threefold speedup in sorting large vectors.