Monday, October 06, 2008

Asynchronous Fork/Join using Asio

As most Asio users will no doubt be aware, multiple threads can call io_service::run() to set up a pool of threads from which the completion handlers will be executed. This can be used in conjunction with io_service::post() to execute arbitrary tasks in the thread pool.

In some rare spare moments I have used this facility to dabble in parallel algorithms (mainly to do with sorting large data sets). However, I was not completely satisfied with its ease-of-use when it came to implementing the algorithms.

Recently I came across the new Fork/Join framework that's going to be included in Java 7. I was particularly struck by the simplicity of the coinvoke() function, and was inspired to implement something similar on top of Asio. Of course, what with Asio and my preferred mode of thinking, I wanted to implement an asynchronous version.

And the result...

I created a function template that may be used to initiate two or more tasks to run in parallel:

template <typename TaskCont0, ..., typename TaskContN,
typename Task0, ..., typename TaskN, Cont>
void coinvoke(asio::io_service& io_service,
Task0 task0, ..., TaskN taskN, Cont cont);

Each task must be a function object with a single argument, where the argument is the continuation function object for the task. The TaskContN template parameters explicitly specify the function signatures used for each of the task continuations. For example:

coinvoke<void(int), void(int)>(task0, task1, cont);

says that both tasks have a continuation with the signature void(int). The combining continuation has a signature that is the concatenation of all of the task continuations' arguments. In the above example, that's void(int, int).

The operation of coinvoke() works as follows (click image for full size view):

You can get the implementation here. Don't expect the source to be readable; it contains hairy uses of template metaprogramming and preprocessor magic.

Why continuations?

You might be wondering why each task passes its result to a continuation function rather than simply returning it. The answer to that is that a task need not be a single calcuation; it could instead be the start of a chain of asynchronous operations. This means that coinvoke() could be used to simplify management of parallel operations, such as writing data to multiple sockets, and not having the handler called until all operations have finished. I plan to explore those and other related ideas further in the near future, but for now let's just look at parallel algorithms.

Fibonacci revisited

The equivalent implementation of the Fibonacci example from the Java Fork/Join paper looks like:

void combine_fib(
int a, int b,
function<void(int)> h)
{
h(a + b);
}

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
bind(calc_fib, ref(io_service), n - 1, _1),
bind(calc_fib, ref(io_service), n - 2, _1),
bind(combine_fib, _1, _2, f));
}
}

Can I have C++0x lambdas now, please?

The need to define a separate combine_fib is not ideal, since a key part of the algorithm is off in another spot. Fortunately, C++0x's new monomorphic lambdas come to the rescue:

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 1, c),
},
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 2, c),
},
[=f](int a, int b)
{
f(a + b);
});
}
}

A useful example

Since that has to come close to being the most convoluted way of calculating a Fibonacci value, here is an example where coinvoke() is used for a parallel merge sort:

template <typename Iterator>
void merge(
Iterator begin,
Iterator middle,
Iterator end,
function<void()> f)
{
std::inplace_merge(begin, middle, end);
f();
}

template <typename Iterator>
void sort(
asio::io_service& io_service,
Iterator begin,
Iterator end,
function<void()> f)
{
std::size_t n = end - begin;
if (n <= 16384)
{
std::sort(begin, end);
io_service.post(f);
}
else
{
coinvoke<void(), void()>(io_service,

// First task sorts the initial half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin, begin + n / 2, _1),

// Second task sorts the latter half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin + n / 2, end, _1),

// Continuation function merges the two sorted ranges.
bind(&merge<Iterator>,
begin, begin + n / 2, end, f)

);
}
}

On my 8-core machine, this gives a little more than a threefold speedup in sorting large vectors.

5 comments:

Arash Partow said...

awesome! - one question though, could there be an overload of coinvoke that takes an iterator range of tasks?

Anonymous said...

inspiring post!

have you compared the performance gain vs a thread pool or map-reduce model ?

It would be nice to have a map-reduce example using asio. I guess it would be much cleaner than the coinvoke code !

jose

Dean Michael said...

Nice post Chris,

It would be nice seeing the recently-accepted Phoenix library help out those stuck with compilers that don't support C++0x lambda's.

Just thinking aloud: what is the overhead of using an io_service instance to run arbitrary function handlers? I've been doing this to implement Active Objects, but until now I'm not entirely convinced that using an io_service instance per active object is a "lean" enough way of going about it.

I hope you can write about that issue, as well as how to be able to define a custom error/exception handler for function objects scheduled for execution in an io_service queue.

Keep up the great work!

Marat said...

Awesome.
It remains to get rid of boost::function. boost::function can add additional overhead related to memory allocation.
What about overall custom memory allocation support?
What about replacing boost::function? For example, it could be replaced with boost::function with custom allocator forwarding to asio custom memory allocation.
Support of custom memory allocation could provide an opportunity to compete (sometimes) with Intel TBB (with TBB's task class).
I am more inclined to think that, thanks to its versatility and generalizations, asio can become the basis for the fork/join library, which is then able to enter the Boost.

Jim Hurd said...

Great post, great library.

Does ASIO already implement work stealing? If not, is there any reason asio would not benefit from a work stealing scheduler?