Showing posts with label coroutines. Show all posts
Showing posts with label coroutines. Show all posts

Wednesday, March 31, 2010

A potted guide to stackless coroutines

Keen-eyed Asio users may have noticed that Boost 1.42 includes a new example, HTTP Server 4, that shows how to use stackless coroutines in conjunction with asynchronous operations. This follows on from the coroutines I explored in the previous three posts, but with a few improvements. In particular:

  • the pesky entry pseudo-keyword is gone; and
  • a new fork pseudo-keyword has been added.

The result bears a passing resemblance to C#'s yield and friends. This post aims to document my stackless coroutine API, but before launching into a long and wordy explanation, here's a little picture to liven things up:

[ Click image for full size. This image was generated from this source code using asioviz. ]

class coroutine

Every coroutine needs to store its current state somewhere. For that we have a class called coroutine:

class coroutine
{
public:
coroutine();
bool is_child() const;
bool is_parent() const;
bool is_complete() const;
};

Coroutines are copy-constructible and assignable, and the space overhead is a single int. They can be used as a base class:

class session : coroutine
{
...
};

or as a data member:

class session
{
...
coroutine coro_;
};

or even bound in as a function argument using bind() (see previous post). It doesn't really matter as long as you maintain a copy of the object for as long as you want to keep the coroutine alive.

reenter

The reenter macro is used to define the body of a coroutine. It takes a single argument: a pointer or reference to a coroutine object. For example, if the base class is a coroutine object you may write:

reenter (this)
{
... coroutine body ...
}

and if a data member or other variable you can write:

reenter (coro_)
{
... coroutine body ...
}

When reenter is executed at runtime, control jumps to the location of the last yield or fork.

The coroutine body may also be a single statement. This lets you save a few keystrokes by writing things like:

reenter (this) for (;;)
{
...
}

Limitation: The reenter macro is implemented using a switch. This means that you must take care when using local variables within the coroutine body. The local variable is not allowed in a position where reentering the coroutine could bypass the variable definition.

yield statement

This form of the yield keyword is often used with asynchronous operations:

yield socket_->async_read_some(buffer(*buffer_), *this);

This divides into four logical steps:

  1. yield saves the current state of the coroutine.
  2. The statement initiates the asynchronous operation.
  3. The resume point is defined immediately following the statement.
  4. Control is transferred to the end of the coroutine body.

When the asynchronous operation completes, the function object is invoked and reenter causes control to transfer to the resume point. It is important to remember to carry the coroutine state forward with the asynchronous operation. In the above snippet, the current class is a function object object with a coroutine object as base class or data member.

The statement may also be a compound statement, and this permits us to define local variables with limited scope:

yield
{
mutable_buffers_1 b = buffer(*buffer_);
socket_->async_read_some(b, *this);
}

yield return expression ;

This form of yield is often used in generators or coroutine-based parsers. For example, the function object:

struct interleave : coroutine
{
istream& is1;
istream& is2;
char operator()(char c)
{
reenter (this) for (;;)
{
yield return is1.get();
yield return is2.get();
}
}
};

defines a trivial coroutine that interleaves the characters from two input streams.

This type of yield divides into three logical steps:

  1. yield saves the current state of the coroutine.
  2. The resume point is defined immediately following the semicolon.
  3. The value of the expression is returned from the function.

yield ;

This form of yield is equivalent to the following steps:

  1. yield saves the current state of the coroutine.
  2. The resume point is defined immediately following the semicolon.
  3. Control is transferred to the end of the coroutine body.

This form might be applied when coroutines are used for cooperative threading and scheduling is explicitly managed. For example:

struct task : coroutine
{
...
void operator()()
{
reenter (this)
{
while (... not finished ...)
{
... do something ...
yield;
... do some more ...
yield;
}
}
}
...
};
...
task t1, t2;
for (;;)
{
t1();
t2();
}

yield break ;

The final form of yield is adopted from C# and is used to explicitly terminate the coroutine. This form is comprised of two steps:

  1. yield sets the coroutine state to indicate termination.
  2. Control is transferred to the end of the coroutine body.
Once terminated, calls to is_complete() return true and the coroutine cannot be reentered.

Note that a coroutine may also be implicitly terminated if the coroutine body is exited without a yield, e.g. by return, throw or by running to the end of the body.

fork statement ;

The fork pseudo-keyword is used when "forking" a coroutine, i.e. splitting it into two (or more) copies. One use of fork is in a server, where a new coroutine is created to handle each client connection:

reenter (this)
{
do
{
socket_.reset(new tcp::socket(io_service_));
yield acceptor->async_accept(*socket_, *this);
fork server(*this)();
} while (is_parent());
... client-specific handling follows ...
}

The logical steps involved in a fork are:

  1. fork saves the current state of the coroutine.
  2. The statement creates a copy of the coroutine and either executes it immediately or schedules it for later execution.
  3. The resume point is defined immediately following the semicolon.
  4. For the "parent", control immediately continues from the next line.

The functions is_parent() and is_child() can be used to differentiate between parent and child. You would use these functions to alter subsequent control flow.

Note that fork doesn't do the actual forking by itself. It is your responsibility to write the statement so that it creates a clone of the coroutine and calls it. The clone can be called immediately, as above, or scheduled for delayed execution using something like io_service::post().

Wednesday, August 12, 2009

Composed operations, coroutines and code makeover

In the previous two posts, I showed some nifty macros for doing clean and simple stackless coroutines with asio. Hold on to your hats, because in this post we'll see what these coroutines can really do for your asio programs.

A design goal of asio is to provide a basis for further levels of abstraction. One of the ways to develop abstractions on top of asio is to create what I like to call composed operations. These are simply operations that are made up of calls to other lower-level operations. Asio already includes some composed operations to address common network programming problems: async_read and async_write to deal with short reads and writes; and async_read_until to perform delimited reads.

As an example, let's say we want to write a composed operation that echoes all data received on a socket until an error occurs. The way I have done this in the past (and the way composed operations like async_read_until are written) is to implement the operation as a set of function objects:

template <typename Handler>
struct echo_read_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
}

template <typename Handler>
struct echo_write_handler
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
void operator()(error_code ec, size_t length);
};

template <typename Handler>
void echo_read_handler::operator()(
error_code ec, size_t length)
{
if (!ec)
{
echo_write_handler write_handler =
{ socket, working_buffer, handler };

async_write(socket,
buffer(working_buffer, length),
write_handler);
}
else
handler(ec);
}

template <typename Handler>
void echo_write_handler::operator()(
error_code ec, size_t /*length*/)
{
if (!ec)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}
else
handler(ec);
}

and a convenience function which acts as the public interface for the abstraction:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_read_handler read_handler =
{ socket, working_buffer, handler };

socket.async_read_some(
buffer(working_buffer),
read_handler);
}

Not very encouraging if you want to write your own abstractions, is it? Now imagine you've been asked to develop a composed operation to send an email using SMTP. That would involve about a dozen lower level operations, so even I probably wouldn't bother if I had to use a function object approach.

Coroutines to the rescue

In the previous two posts we already saw how to combine stackless coroutines with asio's asynchronous operations, simply by prepending the yield "keyword". I'm sure you know where this is going... We can also use a coroutine to implement a composed operation.

Let's rewrite async_echo as a coroutine:

template <typename Handler>
struct echo_coro
{
tcp::socket& socket;
mutable_buffer working_buffer;
Handler handler;
coroutine coro;
void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer), *this);

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length), *this);
}

handler(ec);
}
}
};

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler)
{
echo_coro coro = { socket, working_buffer, handler };
coro();
}

The code is much shorter and clearer than the function object version. For an SMTP email operation the savings would be so much more, but I'll leave that as an exercise for the reader.

Now you might think this next point is obvious, but I'm going to say it anyway and put it in bold because it's important:

The fact that the composed operation is implemented as a coroutine is entirely transparent to the caller.

What does this mean? It means:

  • You can write your composed operations as coroutines, or not, as you choose.
  • You can combine those composed operations still further using coroutines (or not).

And so on and so on, up through as many layers of abstraction as you think you can reasonably fit into your program.

An alternative approach

One aspect of the implementation above still bothers me a little: repetition. Specifically, the repetition of the operation's template parameter list and the arguments (socket, working_buffer and handler) when defining the coroutine's function object.

Here's an alternative design that implements the composed operation in a single function:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
}

handler(ec);
}
}

(N.B. box() wraps the handler with another function object to prevent evaluation of the handler as a nested bind expression.)

Of course, we've just traded one type of repetition for another: the bind expressions to create the completion handlers. At this point, I think it's a matter of taste which approach you use.

Lambdas == code liposuction

It's left to C++0x lambdas to make the coroutine-in-one-function approach the clear winner in brevity, and perhaps not in the way you first expect. Combined with auto, you can use lambdas as local functions to eliminate repeated code:

template <typename Handler>
void async_echo(
tcp::socket& socket,
mutable_buffer working_buffer,
Handler handler,
// coroutine state:
coroutine coro = coroutine(),
error_code ec = error_code(),
size_t length = 0)
{
auto resume = [&]()
{
return bind(&async_echo<Handler>,
ref(socket), working_buffer,
box(handler), coro, _1, _2));
};

reenter (coro)
{
entry:
while (!ec)
{
yield socket.async_read_some(
buffer(working_buffer),
resume());

if (ec) break;

yield async_write(socket,
buffer(working_buffer, length),
resume());
}

handler(ec);
}
}

What we end up with is a composed operation in one function, a concise coroutine to specify the asynchronous control flow, and a single definition of how to reenter the coroutine.

Phew. I think we're done.

Monday, August 10, 2009

Secret sauce revealed

In my previous post, I showed a little program using stackless coroutines with asio. Obviously there's no yield keyword in C++, so without further ado here's the preprocessor magic that makes it possible:

class coroutine
{
public:
coroutine() : value_(0) {}
private:
friend class coroutine_ref;
int value_;
};

class coroutine_ref
{
public:
coroutine_ref(coroutine& c) : value_(c.value_) {}
coroutine_ref(coroutine* c) : value_(c->value_) {}
operator int() const { return value_; }
int operator=(int v) { return value_ = v; }
private:
int& value_;
};

#define reenter(c) \
switch (coroutine_ref _coro_value = c)

#define entry \
extern void you_forgot_to_add_the_entry_label(); \
bail_out_of_coroutine: break; \
case 0

#define yield \
if ((_coro_value = __LINE__) == 0) \
{ \
case __LINE__: ; \
(void)&you_forgot_to_add_the_entry_label; \
} \
else \
for (bool _coro_bool = false;; \
_coro_bool = !_coro_bool) \
if (_coro_bool) \
goto bail_out_of_coroutine; \
else

Unlike the preprocessor-based coroutines presented here, the above macros let you yield from a coroutine without having to return from a function. Instead, the yield simply breaks you out of the reenter block. That trick is what allows us to write a server in one function.

Yes, yes, I know. An echo server in a single function is a bit of a gimmick. The following snippet may give a better idea of how the coroutines can be used:

class session : coroutine
{
public:
session(tcp::acceptor& acceptor)
: acceptor_(acceptor),
socket_(new tcp::socket(acceptor.get_io_service())),
data_(new array<char, 1024>)
{
}

void operator()(
error_code ec = error_code(),
size_t length = 0)
{
reenter (this)
{
entry:
for (;;)
{
yield acceptor_.async_accept(
*socket_, *this);

while (!ec)
{
yield socket_->async_read_some(
buffer(*data_), *this);

if (ec) break;

yield async_write(*socket_,
buffer(*data_, length), *this);
}

socket_->close();
}
}
}

private:
tcp::acceptor& acceptor_;
shared_ptr<tcp::socket> socket_;
shared_ptr<array<char, 1024> > data_;
};

Compared to the usual boost::bind-based approach of doing callbacks, the control flow is all in one place and easy to follow.

But wait, there's more... In the next post I'll reveal the real power you get when you combine stackless coroutines and asio.

Wednesday, July 29, 2009

Wife says: "I can't believe it works"

Just a teaser:

int main()
{
try
{
using asio::ip::tcp;
using namespace boost::lambda;

asio::io_service io_service;
tcp::acceptor acceptor(io_service,
tcp::endpoint(tcp::v4(), 54321));

const int max_clients = 100;
coroutine coro[max_clients];
std::auto_ptr<tcp::socket> socket[max_clients];
asio::error_code ec[max_clients];
std::size_t length[max_clients];
boost::array<char, 1024> data[max_clients];

// Kick off all the coroutines.
int n = -1;
for (int i = 0; i < max_clients; ++i)
{
socket[i].reset(new tcp::socket(io_service));
io_service.post(
unlambda((
var(n) = i
)));
}

for (; io_service.run_one() > 0; n = -1)
{
if (n != -1)
{
reenter (coro[n])
{
entry:
for (;;)
{
// Wait for a client to connect.
yield acceptor.async_accept(
*socket[n],
unlambda((
var(n) = n,
var(ec[n]) = _1
)));

// Echo at will.
while (!ec[n])
{
yield socket[n]->async_read_some(
asio::buffer(data[n]),
unlambda((
var(n) = n,
var(ec[n]) = _1,
var(length[n]) = _2
)));

if (!ec[n])
{
yield asio::async_write(
*socket[n],
asio::buffer(data[n], length[n]),
unlambda((
var(n) = n,
var(ec[n]) = _1
)));
}
}

// Clean up before accepting next client.
socket[n]->close();
}
}
}
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}

One function. One fully asynchronous server. Bog standard C++.