Wednesday, August 12, 2009

Composed operations, coroutines and code makeover

In the previous two posts, I showed some nifty macros for doing clean and simple stackless coroutines with asio. Hold on to your hats, because in this post we'll see what these coroutines can really do for your asio programs.

A design goal of asio is to provide a basis for further levels of abstraction. One of the ways to develop abstractions on top of asio is to create what I like to call composed operations. These are simply operations that are made up of calls to other lower-level operations. Asio already includes some composed operations to address common network programming problems: async_read and async_write to deal with short reads and writes; and async_read_until to perform delimited reads.

As an example, let's say we want to write a composed operation that echoes all data received on a socket until an error occurs. The way I have done this in the past (and the way composed operations like async_read_until are written) is to implement the operation as a set of function objects:

template <typename Handler>
struct echo_read_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
}

template <typename Handler>
struct echo_write_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
};

template <typename Handler>
void echo_read_handler::operator()(
error_code ec, size_t length)
{
if (!ec)
{
echo_write_handler write_handler =
{ socket, working_buffer, handler };

async_write(socket,
buffer(working_buffer, length),
write_handler);
}
else
handler(ec);
}

template <typename Handler>
void echo_write_handler::operator()(
error_code ec, size_t /*length*/)
{
if (!ec)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}
else
handler(ec);
}

and a convenience function which acts as the public interface for the abstraction:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}

Not very encouraging if you want to write your own abstractions, is it? Now imagine you've been asked to develop a composed operation to send an email using SMTP. That would involve about a dozen lower level operations, so even I probably wouldn't bother if I had to use a function object approach.

Coroutines to the rescue

In the previous two posts we already saw how to combine stackless coroutines with asio's asynchronous operations, simply by prepending the yield "keyword". I'm sure you know where this is going... We can also use a coroutine to implement a composed operation.

Let's rewrite async_echo as a coroutine:

template <typename Handler>
struct echo_coro
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
coroutine coro;
void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer), *this);

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length), *this);
}

handler(ec);
}
}
};

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_coro coro = { socket, working_buffer, handler };
coro();
}

The code is much shorter and clearer than the function object version. For an SMTP email operation the savings would be so much more, but I'll leave that as an exercise for the reader.

Now you might think this next point is obvious, but I'm going to say it anyway and put it in bold because it's important:

The fact that the composed operation is implemented as a coroutine is entirely transparent to the caller.

What does this mean? It means:

  • You can write your composed operations as coroutines, or not, as you choose.
  • You can combine those composed operations still further using coroutines (or not).

And so on and so on, up through as many layers of abstraction as you think you can reasonably fit into your program.

An alternative approach

One aspect of the implementation above still bothers me a little: repetition. Specifically, the repetition of the operation's template parameter list and the arguments (socket, working_buffer and handler) when defining the coroutine's function object.

Here's an alternative design that implements the composed operation in a single function:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
}

handler(ec);
}
}

(N.B. box() wraps the handler with another function object to prevent evaluation of the handler as a nested bind expression.)

Of course, we've just traded one type of repetition for another: the bind expressions to create the completion handlers. At this point, I think it's a matter of taste which approach you use.

Lambdas == code liposuction

It's left to C++0x lambdas to make the coroutine-in-one-function approach the clear winner in brevity, and perhaps not in the way you first expect. Combined with auto, you can use lambdas as local functions to eliminate repeated code:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
auto resume = [&]()
{
return bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
};

reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
resume());

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
resume());
}

handler(ec);
}
}

What we end up with is a composed operation in one function, a concise coroutine to specify the asynchronous control flow, and a single definition of how to reenter the coroutine.

Phew. I think we're done.

Monday, August 10, 2009

Secret sauce revealed

In my previous post, I showed a little program using stackless coroutines with asio. Obviously there's no yield keyword in C++, so without further ado here's the preprocessor magic that makes it possible:

class coroutine
{
public:
coroutine() : value_(0) {}
private:
friend class coroutine_ref;
int value_;
};

class coroutine_ref
{
public:
coroutine_ref(coroutine& c) : value_(c.value_) {}
coroutine_ref(coroutine* c) : value_(c->value_) {}
operator int() const { return value_; }
int operator=(int v) { return value_ = v; }
private:
int& value_;
};

#define reenter(c) \
switch (coroutine_ref _coro_value = c)

#define entry \
extern void you_forgot_to_add_the_entry_label(); \
bail_out_of_coroutine: break; \
case 0

#define yield \
if ((_coro_value = __LINE__) == 0) \
{ \
case __LINE__: ; \
(void)&you_forgot_to_add_the_entry_label; \
} \
else \
for (bool _coro_bool = false;; \
_coro_bool = !_coro_bool) \
if (_coro_bool) \
goto bail_out_of_coroutine; \
else

Unlike the preprocessor-based coroutines presented here, the above macros let you yield from a coroutine without having to return from a function. Instead, the yield simply breaks you out of the reenter block. That trick is what allows us to write a server in one function.

Yes, yes, I know. An echo server in a single function is a bit of a gimmick. The following snippet may give a better idea of how the coroutines can be used:

class session : coroutine
{
public:
session(tcp::acceptor& acceptor)
: acceptor_(acceptor),
socket_(new tcp::socket(acceptor.get_io_service())),
data_(new array<char, 1024>)
{
}

void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (this)
{
entry:
for (;;)
{
yield acceptor_.async_accept(
*socket_, *this);

while (!ec)
{
yield socket_->async_read_some(
buffer(*data_), *this);

if (ec) break;

yield async_write(*socket_,
buffer(*data_, length), *this);
}

socket_->close();
}
}
}

private:
tcp::acceptor& acceptor_;
shared_ptr<tcp::socket> socket_;
shared_ptr<array<char, 1024> > data_;
};

Compared to the usual boost::bind-based approach of doing callbacks, the control flow is all in one place and easy to follow.

But wait, there's more... In the next post I'll reveal the real power you get when you combine stackless coroutines and asio.

Wednesday, July 29, 2009

Wife says: "I can't believe it works"

Just a teaser:

int main()
{
try
{
using asio::ip::tcp;
using namespace boost::lambda;

asio::io_service io_service;
tcp::acceptor acceptor(io_service,
tcp::endpoint(tcp::v4(), 54321));

const int max_clients = 100;
coroutine coro[max_clients];
std::auto_ptr<tcp::socket> socket[max_clients];
asio::error_code ec[max_clients];
std::size_t length[max_clients];
boost::array<char, 1024> data[max_clients];

// Kick off all the coroutines.
int n = -1;
for (int i = 0; i < max_clients; ++i)
{
socket[i].reset(new tcp::socket(io_service));
io_service.post(
unlambda((
var(n) = i
)));
}

for (; io_service.run_one() > 0; n = -1)
{
if (n != -1)
{
reenter (coro[n])
{
entry:
for (;;)
{
// Wait for a client to connect.
yield acceptor.async_accept(
*socket[n],
unlambda((
var(n) = n,
var(ec[n]) = _1
)));

// Echo at will.
while (!ec[n])
{
yield socket[n]->async_read_some(
asio::buffer(data[n]),
unlambda((
var(n) = n,
var(ec[n]) = _1,
var(length[n]) = _2
)));

if (!ec[n])
{
yield asio::async_write(
*socket[n],
asio::buffer(data[n], length[n]),
unlambda((
var(n) = n,
var(ec[n]) = _1
)));
}
}

// Clean up before accepting next client.
socket[n]->close();
}
}
}
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}

One function. One fully asynchronous server. Bog standard C++.

Tuesday, July 14, 2009

User-friendly compile errors for templates in C++0x

The C++0x features decltype, static_assert and the "new function declarator syntax" can be combined with our old friend SFINAE to generate nicer template compile errors.

As a simple example, consider a container class similar to std::set. Normally if you just declare a variable

set<my_type> s;

it will compile without error even if my_type has no operator<. You will only get an error when you try to call a set member function, such as insert(). Worse still, the errors tend to be quite verbose. (Too verbose for me to want to paste here, anyway.) It would be really nice to generate a short, readable error at the point of the original variable declaration. Let's see how we can do just that in C++0x.

First, we need to write a compile-time test for operator<. This is where SFINAE, decltype and the new function declarator syntax come together. We write the test function:

auto less_than_test(const T* t)
-> decltype(*t < *t, char(0));

and the fallback overload:

std::array<char, 2> less_than_test(...);

The trick here is that, according to the C++0x grammar, we have:

decltype ( expression )

and

expression:
assignment-expression
expression , assignment-expression

This means that the first overload uses decltype to do two things: it makes the overload a viable candidate only if the expression *t < *t is valid; and it says the overload returns a char.

Second, we can use sizeof to determine which of the overloads is selected for a given type T, and static_assert to generate a readable error:

template <typename T>
class set
{
public:
static_assert(
sizeof(less_than_test((T*)0)) == 1,
"type T must provide operator<");
};

The g++ 4.4 compiler then gives the following output on the original variable declaration:

test.cpp: In instantiation of set<my_type>
test.cpp:21: instantiated from here
test.cpp:13: error: static assertion failed:
"type T must provide operator<"

It works with function templates too. To add a check to Asio's async_read function's ReadHandler parameter, I could write the check as follows:

template <typename T>
auto read_handler_test(T* t)
-> decltype(
(*t)(
*(const error_code*)0,
(const std::size_t)0),
char(0));

std::array<char, 2> read_handler_test(...);

template <..., typename ReadHandler>
void async_read(..., ReadHandler handler)
{
static_assert(
sizeof(read_handler_test(&handler)) == 1,
"ReadHandler type requirements not met");
...
}

Perhaps with a touch of macro magic, checks of this sort could become quite easy to write.

"Hang on, what about C++0x concepts?" I hear you ask. What are they? ;-)

Monday, October 06, 2008

Asynchronous Fork/Join using Asio

As most Asio users will no doubt be aware, multiple threads can call io_service::run() to set up a pool of threads from which the completion handlers will be executed. This can be used in conjunction with io_service::post() to execute arbitrary tasks in the thread pool.

In some rare spare moments I have used this facility to dabble in parallel algorithms (mainly to do with sorting large data sets). However, I was not completely satisfied with its ease-of-use when it came to implementing the algorithms.

Recently I came across the new Fork/Join framework that's going to be included in Java 7. I was particularly struck by the simplicity of the coinvoke() function, and was inspired to implement something similar on top of Asio. Of course, what with Asio and my preferred mode of thinking, I wanted to implement an asynchronous version.

And the result...

I created a function template that may be used to initiate two or more tasks to run in parallel:

template <typename TaskCont0, ..., typename TaskContN,
typename Task0, ..., typename TaskN, Cont>
void coinvoke(asio::io_service& io_service,
Task0 task0, ..., TaskN taskN, Cont cont);

Each task must be a function object with a single argument, where the argument is the continuation function object for the task. The TaskContN template parameters explicitly specify the function signatures used for each of the task continuations. For example:

coinvoke<void(int), void(int)>(task0, task1, cont);

says that both tasks have a continuation with the signature void(int). The combining continuation has a signature that is the concatenation of all of the task continuations' arguments. In the above example, that's void(int, int).

The operation of coinvoke() works as follows (click image for full size view):

You can get the implementation here. Don't expect the source to be readable; it contains hairy uses of template metaprogramming and preprocessor magic.

Why continuations?

You might be wondering why each task passes its result to a continuation function rather than simply returning it. The answer to that is that a task need not be a single calcuation; it could instead be the start of a chain of asynchronous operations. This means that coinvoke() could be used to simplify management of parallel operations, such as writing data to multiple sockets, and not having the handler called until all operations have finished. I plan to explore those and other related ideas further in the near future, but for now let's just look at parallel algorithms.

Fibonacci revisited

The equivalent implementation of the Fibonacci example from the Java Fork/Join paper looks like:

void combine_fib(
int a, int b,
function<void(int)> h)
{
h(a + b);
}

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
bind(calc_fib, ref(io_service), n - 1, _1),
bind(calc_fib, ref(io_service), n - 2, _1),
bind(combine_fib, _1, _2, f));
}
}

Can I have C++0x lambdas now, please?

The need to define a separate combine_fib is not ideal, since a key part of the algorithm is off in another spot. Fortunately, C++0x's new monomorphic lambdas come to the rescue:

void calc_fib(
asio::io_service& io_service,
int n,
function<void(int)> f)
{
if (n <= threshold)
{
f(seq_fib(n));
}
else
{
coinvoke<void(int), void(int)>(io_service,
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 1, c),
},
[&io_service, =n](function<void(int)> c)
{
calc_fib(io_service, n - 2, c),
},
[=f](int a, int b)
{
f(a + b);
});
}
}

A useful example

Since that has to come close to being the most convoluted way of calculating a Fibonacci value, here is an example where coinvoke() is used for a parallel merge sort:

template <typename Iterator>
void merge(
Iterator begin,
Iterator middle,
Iterator end,
function<void()> f)
{
std::inplace_merge(begin, middle, end);
f();
}

template <typename Iterator>
void sort(
asio::io_service& io_service,
Iterator begin,
Iterator end,
function<void()> f)
{
std::size_t n = end - begin;
if (n <= 16384)
{
std::sort(begin, end);
io_service.post(f);
}
else
{
coinvoke<void(), void()>(io_service,

// First task sorts the initial half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin, begin + n / 2, _1),

// Second task sorts the latter half of the range.
bind(&sort<Iterator>,
ref(io_service),
begin + n / 2, end, _1),

// Continuation function merges the two sorted ranges.
bind(&merge<Iterator>,
begin, begin + n / 2, end, f)

);
}
}

On my 8-core machine, this gives a little more than a threefold speedup in sorting large vectors.

Friday, June 27, 2008

Mention in Stroustrup Interview

The A-Z of Programming Languages: C++, top of page 5:

Do you feel that resources like the boost libraries will provide this functionality/accessibility for C++?

Some of the boost libraries - especially the networking library - are a good beginning. The C++0x standard threads look a lot like boost threads. If at all possible, a C++ programmer should begin with an existing library (and/or tool), rather than building directly on fundamental language features and/or system threads.

Not by name, but hey! The rest of the interview is worth reading too. ;)

Friday, May 23, 2008

Boost.Asio vs Asio

Sometimes I am asked what the difference is between the (non-Boost) Asio and Boost.Asio packages I provide. Here is the definitive word on the subject, presented as a series of questions and answers.

What are the differences in the source code?

— Asio is in a namespace called asio::, whereas Boost.Asio puts everything under boost::asio::.

— The main Asio header file is called asio.hpp. The corresponding header in Boost.Asio is boost/asio.hpp. All other headers are similarly changed.

— Any macros used by or defined in Asio are prefixed with ASIO_. In Boost.Asio they are prefixed with BOOST_ASIO_.

— Asio includes a class for launching threads, asio::thread. Boost.Asio does not include this class, to avoid overlap with the Boost.Thread library

— Boost.Asio uses the Boost.System library to provide support for error codes (boost::system::error_code and boost::system::system_error). Asio includes these under its own namespace (asio::error_code and asio::system_error). The Boost.System version of these classes currently supports better extensibility for user-defined error codes.

— Asio is header-file-only and for most uses does not require linking against any Boost library. Boost.Asio always requires that you link against the Boost.System library, and also against Boost.Thread if you want to launch threads using boost::thread.

Where do I get a release package?

Asio is available for download from sourceforge, in a package named asio-X.Y.Z.tar.gz (or .tar.bz2 or .zip).

Boost.Asio is included in the Boost 1.35 distribution. It is also available as a separate package on sourceforge, named boost_asio_X_Y_Z.tar.gz. The latter is intended to be copied over the top of an existing Boost source code distribution.

Where are the source code repositories?

Asio uses a sourceforge-hosted CVS repository. Details of how to access it may be found here. It may also be browsed via the web.

Boost.Asio is checked into Boost's subversion repository.

How do you maintain both versions?

All development is done in the Asio CVS repository. I periodically convert the source into Boost format using a script called boostify.pl, and merge the changes into the Boost subversion repository.

Will Asio be discontinued now that Boost.Asio is included with Boost?

No. There are projects using Asio and they will continue to be supported. I also prefer to use Asio over Boost.Asio in my own projects, for the convenience of header-file-only and shorter namespaces.

Should I use Asio or Boost.Asio?

It depends. Here are some things to consider:

— If you (like me) prefer the convenience of header-file-only libraries then I'd suggest using Asio over Boost.Asio.

— If you must use a version of Boost older than 1.35 then Boost.Asio is not included. You can use Boost.Asio by copying it over the top of your Boost distribution (see above), but not everyone is comfortable doing this. In that case, I would suggest using Asio over Boost.Asio.

— I will be creating new versions of both the Asio and Boost.Asio packages on a faster release cycle than that followed by Boost. If you want to use the latest features you can still use Boost.Asio as long as you are happy to copy it over the top of your Boost distribution. If you don't want to do this, use Asio rather than Boost.Asio.

Can Asio and Boost.Asio coexist in the same program?

Yes. Since they use different namespaces there should be no conflicts, although obviously the types themselves are not interchangeable. (In case you're wondering why you might want to do this, consider a situation where a program is using third party libraries that are also using Asio internally.)