Wednesday, August 12, 2009

Composed operations, coroutines and code makeover

In the previous two posts, I showed some nifty macros for doing clean and simple stackless coroutines with asio. Hold on to your hats, because in this post we'll see what these coroutines can really do for your asio programs.

A design goal of asio is to provide a basis for further levels of abstraction. One of the ways to develop abstractions on top of asio is to create what I like to call composed operations. These are simply operations that are made up of calls to other lower-level operations. Asio already includes some composed operations to address common network programming problems: async_read and async_write to deal with short reads and writes; and async_read_until to perform delimited reads.

As an example, let's say we want to write a composed operation that echoes all data received on a socket until an error occurs. The way I have done this in the past (and the way composed operations like async_read_until are written) is to implement the operation as a set of function objects:

template <typename Handler>
struct echo_read_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
}

template <typename Handler>
struct echo_write_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
};

template <typename Handler>
void echo_read_handler::operator()(
error_code ec, size_t length)
{
if (!ec)
{
echo_write_handler write_handler =
{ socket, working_buffer, handler };

async_write(socket,
buffer(working_buffer, length),
write_handler);
}
else
handler(ec);
}

template <typename Handler>
void echo_write_handler::operator()(
error_code ec, size_t /*length*/)
{
if (!ec)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}
else
handler(ec);
}

and a convenience function which acts as the public interface for the abstraction:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}

Not very encouraging if you want to write your own abstractions, is it? Now imagine you've been asked to develop a composed operation to send an email using SMTP. That would involve about a dozen lower level operations, so even I probably wouldn't bother if I had to use a function object approach.

Coroutines to the rescue

In the previous two posts we already saw how to combine stackless coroutines with asio's asynchronous operations, simply by prepending the yield "keyword". I'm sure you know where this is going... We can also use a coroutine to implement a composed operation.

Let's rewrite async_echo as a coroutine:

template <typename Handler>
struct echo_coro
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
coroutine coro;
void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer), *this);

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length), *this);
}

handler(ec);
}
}
};

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_coro coro = { socket, working_buffer, handler };
coro();
}

The code is much shorter and clearer than the function object version. For an SMTP email operation the savings would be so much more, but I'll leave that as an exercise for the reader.

Now you might think this next point is obvious, but I'm going to say it anyway and put it in bold because it's important:

The fact that the composed operation is implemented as a coroutine is entirely transparent to the caller.

What does this mean? It means:

  • You can write your composed operations as coroutines, or not, as you choose.
  • You can combine those composed operations still further using coroutines (or not).

And so on and so on, up through as many layers of abstraction as you think you can reasonably fit into your program.

An alternative approach

One aspect of the implementation above still bothers me a little: repetition. Specifically, the repetition of the operation's template parameter list and the arguments (socket, working_buffer and handler) when defining the coroutine's function object.

Here's an alternative design that implements the composed operation in a single function:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
}

handler(ec);
}
}

(N.B. box() wraps the handler with another function object to prevent evaluation of the handler as a nested bind expression.)

Of course, we've just traded one type of repetition for another: the bind expressions to create the completion handlers. At this point, I think it's a matter of taste which approach you use.

Lambdas == code liposuction

It's left to C++0x lambdas to make the coroutine-in-one-function approach the clear winner in brevity, and perhaps not in the way you first expect. Combined with auto, you can use lambdas as local functions to eliminate repeated code:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
auto resume = [&]()
{
return bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
};

reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
resume());

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
resume());
}

handler(ec);
}
}

What we end up with is a composed operation in one function, a concise coroutine to specify the asynchronous control flow, and a single definition of how to reenter the coroutine.

Phew. I think we're done.

21 comments:

Chila said...

Cool!

Chila said...

What do you think about this boost::coroutine?

http://www.crystalclearsoftware.com/soc/coroutine

chris said...

The proposed boost::coroutine library provides stackful coroutines (vs the stackless ones I showed).

The advantage of having a stack is that you can yield from a nested function, which means you can layer non-async-aware APIs (e.g. a boost.spirit parser) over the top of async calls.

However, one of the disadvantages is that you have a stack :)

You can't transparently implement composed operations using stackful coroutines because you have to pass the coroutine's "self" reference to the function. (Well actually you could do it by creating a new coroutine stack for each composed operation, but that could be quite expensive.)

Chila said...

Understood. Thanks Chris. Asio rulez!

Eric Muyser said...

Lambdas ARE code liposuction, but if your compiler is capable of lambdas, why wouldn't you just use those instead of coroutines? ie. something like...

socket.async_read_some(buffer(working_buffer), [&](tcp::socket& socket, error_code ec)
{
async_write(socket, buffer(working_buffer, length), [&](tcp::socket& socket, error_code ec)
{
handler(ec);
});
});

chris said...

In my opinion, coroutines are more readable. More important, however, is that they are capable of complex control flow, such as loops. Lambdas can't do loops of async operations within a single function.

Eric Muyser said...

Oh, I personally find lambdas more readable, since they are akin to inline functions, and closures of other languages. If you declare them by name, they appear almost exactly the same as normal functions.

What do you mean lambdas can't perform loops of asynchronous operations? Do you mean:

tcp::socket socket(io_service_);
tcp::acceptor acceptor(io_service_);

auto onAccept = [&](tcp::socket& socket, error_code ec)
{
handler(ec);

acceptor.async_accept(socket, onAccept);
};

acceptor.async_accept(socket, onAccept);

Same as how your coroutine example goes from top to bottom over and over (looping). I don't think any form of recursion (or "loops") work at the moment, but it's in the standard proposal.

Still, there's a few caveats with c++0x lambdas. I'm just trying to understand the benefits of coroutines in comparison.

chris said...

This code captures the lambda variable ("onAccept") by reference. This means you will have a dangling reference in the completion handler as soon as you leave the scope where the async operation was started.

For it to work, you need to store a copy of the lambda object, but a lambda cannot capture a copy of itself.

Eric Muyser said...

Oh right, I gotcha. A lambda can't copy itself by value in the initializer list (only by reference), but from within itself it can copy itself can't it? Which is where you would be calling the async operation (passing itself, ie. a copy). How would there be a danging reference then?

std::function<void()> h1;

{ //begin scope
std::function<void()> h2 = [&]()
{
cout << "Hello world!" << endl;

h1 = h2;
};

h2();
} //end scope

h1();

So I might not be understanding because I don't see what's wrong with that. h2 is the lambda, h1 is the async_accept/etc argument. Either way I imagine there would be some additional overhead using lambdas. Although I guess that's the case either way.

chris said...

In your snippet, both of these places:

h2(); <--- here
} //end scope

h1(); <--- and here

will perform "h1 = h2;". Obviously you need to do some additional housekeeping inside the lambda to make this work.

I'd be very interested to see a more fully worked example for an async_accept loop using this approach. And, for bonus points, whether you can do a lambda-based implementation of the async_echo composed operation (working as shown in the original blog post).

Eric Muyser said...

Hey Chris! Thanks for the response. Now I understand what you mean. You're right about the problem calling itself. Actually you can call the lambda from itself fine, even when you leave the scope. You just can't pass it as an argument and have it called from somewhere else. Which is why my last example worked, I put the h1 = h2 part to show it works when the scope ends, but I didn't pass it to a function, like you would with async_accept - it would fail from within io_service's run().

Here's async_echo:

//template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
//Handler handler,
boost::function<void()> handler,
error_code ec = error_code(),
size_t length = 0)
{
socket.async_read_some(buffer(working_buffer), [&socket, &working_buffer, &length, handler](error_code const& error, int total)
{
async_write(socket, buffer(working_buffer, length), [&socket, &working_buffer, handler](error_code const& error, int total)
{
handler();
});
});
}

Apparently there's a known bug with lambdas atm that they don't work with templated dependent types. I'm sure they'll fix that. Just to show it works though I used boost::function (or std::function).

So yeah, you can't have a gimmick all-in-one lambda server, you need to separate your loop function.

I had an example finished but it segfauls due to that lambda glitch. You think they'll fix these things? (2 big issues)

Thanks :)

starfish said...

Hello, I think I've found a bug in asio, compiled for win64. The following code compiles, but generates an access violation when run:

std::string message = "This is a server message!";
boost::asio::buffer (message);

starfish said...

Btw, I think the the lambda reference problem discussed above can be solved by making the onAccept static.

Anonymous said...
This comment has been removed by a blog administrator.
Eric Muyser said...

starfish, you're right but you can't pass the lambda any initialization variables (use []), you'd have to bind those to onAccept when passed to async_accept.

pachanga said...

I really like your stackless coroutine implementation, there is only one thing which makes me a bit nervous - exception handling inside coroutine body. It's impossible to wrap "yield" call with try...catch block(you'll get "jump to case label enters try block" error) which is really disturbing.

OvermindDL1 said...

However, in regards to exception handling, the yield function can become standalone so you can do the async call (within a try/catch) before the yield, then call yield on the next line (followed by ; or {}).

chris said...

Note that try/catch followed by a yield on the next line doesn't do quite the right thing (see new post). However, you could probably write:

yield try
{
...
}
catch (exception& e)
{
...
}

Dave Abrahams said...

Hi Chris,

IMO, the first rewrite *is* awesome, but your cures for repetition are in this case far worse than the disease.

Anonymous said...

I find the echo_coro incredibly funny. the operator() and the coroutines more or less exactly mimic the corresponding Simula67 features. As you probably know Simula influenced C++, but '67 is a looong time ago :-)

Anonymous said...

In your last example, I think it would be a lot more readable to have resume be the actual callback, like so:

auto resume = [&](const error_code& error, size_t bytes_transferred) {

async_echo(socket, working_buffer, handler, coro, error, bytes_transferred);
}


Your comment about needing "box" indicates to me that boost::bind is implementing dynamic scoping of _1, _2 etc. That sounds incredibly bad. The "usefulness" of this in the documentatio relates to function composition where lexical scoping would work just fine. I'm probably confused, but this magic scares me.

The "ref" function also looks incredibly magic. I don't even understand the boost documentation except that it is "useful".

Also, one advantage you haven't mentioned is that your coroutines give extra guarantees. For example, I've seen questions on stackoverflow related to people calling async_write before the handler is invoked. Using coroutines guarantee that that cannot happen. That in itself is a good reason to use them.

Have you packaged this up somewhere for public consumption?