Asynchronous Agents Library - Intro to Message Blocks
In my previous post I talked about the agent class. Now I will introduce the Agents Library’s message blocks, how to use them, and the fundamentals of what they do. I will cover some basics that apply to all of the message blocks, introduce the messaging APIs, and then specifically explain three blocks: unbounded_buffer, overwrite_buffer, and single_assignment.
Message Blocks
In the Agents Library we have created a set of interfaces and defined a protocol for message blocks to communicate and exchange messages. Message blocks are intended to be used to establish defined communication protocols between isolated components and develop concurrent applications based on data flow. The message blocks provided by the Agents Library can be used in conjunction with the agents class itself or separately.
For more information on data flow - https://en.wikipedia.org/wiki/Dataflow
ISource and ITarget Interfaces
ISource and ITarget are base interfaces message blocks work with. They declare functions for handling linking and exchanging messages. A source block implements ISource, a target block ITarget, and a propagator block, a block that is a source and a target, implements both. In this blog I will only be discussing the functions needed to use the message blocks provided by the Agents Library. A more comprehensive knowledge of the functions in the ISource and ITarget interfaces are required when creating your own message blocks; this will be discussed in a later post.
Linking/Unlinking and Creating Messaging Networks
Often it is desirable to link message blocks together to create a network. One such reason for building messaging networks is to create data flow pipelines. The following three functions, declared on the ISource interface, are used to connect source blocks to target blocks:
void link_target(ITarget<_Type> * _PTarget); – adds a link to the specified target. This causes the source block to offer any of its messages to the target until it is unlinked.
void unlink_target(ITarget<_Type> * _PTarget); – removes an existing link with the specified target, once unlinked no more messages will be presented to the target.
void unlink_targets(); – removes all links with any target blocks.
Here is a simple example connecting and then disconnecting two unbounded_buffers:
unbounded_buffer<int> buffer1, buffer2;
buffer1.link_target(&buffer2);
...
buffer2.unlink_target(&buffer2);
Some blocks have restrictions on the number of targets allowed; invalid_link_target exception is thrown in these cases.
Basic Message Propagation
Messages are exchanged between blocks using light weight tasks (LWTs) on a scheduler, because of this message propagation works cooperatively with any other work in the same scheduler. This means long running tasks that never block or yield can slow down or cease the forward progress of message delivery. Thus is the nature of working in a cooperative environment.
All of the message blocks built into the Agents Library guarantee in-order message delivery. It is possible to create your own blocks which do not preserve order, however all of built in ones do.
Messaging APIs - send, asend, receive, and try_receive
Once creation of messaging networks and propagation of messages is understood the only other thing needed to start programming is how to insert and remove messages directly from individual blocks.
Two global functions are used to create and insert messages:
bool send(ITarget<_Type> &_Trg, const _Type &_Data);
bool asend(ITarget<_Type> &_Trg, const _Type &_Data);
Each of these takes a target and the data to transmit. Send synchronously originates a message with a target, whereas asend asynchronously does. This means a send call will block until the target either takes the message or declines it. Send returns true if the message was delivered and false otherwise. Asend will not block until the target takes the message, it offers the message and immediately returns. A return value of true means the target has accepted the message and will eventually take it, otherwise false means the target either declined the message or postponed the decision on whether or not to take it until later.
Likewise here are the two global functions for removing or extracting messages:
template <class _Type>
_Type receive(ISource<_Type> & _Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);
template <class _Type>
bool try_receive(ISource<_Type> & _Src, _Type & _value);
Receive takes a source block to extract a message from and an optional timeout, the extracted value is the return value. If a message is currently not available in the source then receive will block until one is, optionally a timeout also may be specified. Correspondingly try_receive will only obtain a message if the source has one at that instance, otherwise it returns immediately. A return value of true on try_receive indicates a message was received, false means one was not.
All of these functions when blocking do so cooperatively with the Concurrency Runtime. To learn more about working cooperatively with the Concurrency Runtime take a look at the series of posts on Synchronization with the Concurrency Runtime.
unbounded_buffer
Unbounded_buffer is one of the most basic messaging blocks; it acts very similar to a queue. As its name suggests unbounded_buffer can store any number of messages, limited only by memory, collected from its source links (links to blocks that have the unbounded_buffer as a target). Unbounded_buffer always accepts all messages offered to it. Messages propagated to an unbounded_buffer are collected into a queue and then offered one at a time to each of its targets. Each message in an unbounded_buffer will only be given to one of its targets based on link ordering. This means targets of an unbounded_buffer compete for messages.
Unbounded_buffer provides two utility functions:
bool enqueue(_Type const& _Item);
_Type dequeue();
Each of these is equivalent to send and receive respectively, and basically are wrappers around them.
unbounded_buffer<int> buffer;
// These are equivalent.
buffer.enqueue(1);
send(buffer, 1);
// And so are these.
int value = buffer.dequeue();
int value = receive(buffer);
Unbounded_buffers are excellent for producer/consumer patterns. In a previous post Introduction to Asynchronous Agents Library the FindString agents sample makes use of unbounded_buffers to communicate between the individual agents.
overwrite_buffer
Essentially overwrite_buffer is a simple broadcaster. Overwrite_buffer is a message block that holds one message at a time, very similar to a variable. Every time an overwrite_buffer receives a message it offers a copy of it to any of its targets and then stores the message internally, replacing any previously stored message. The important thing to note here is there is no competition for data. Every time a message comes into an overwrite_buffer it is offered to all of its targets, then afterwards it can be overwritten at any point.
Overwrite_buffer also provides two utility functions:
bool has_value() const ;
_Type value();
Has_value returns true or false indicating whether or not the overwrite_buffer has received its first message. Value is a wrapper around receive. Has_value can be used to check if overwrite_buffer has a message, if overwrite_buffer does then calling value or receive will not be a blocking call.
Uses of overwrite_buffer include tracking state, continuously monitoring status, or broadcasting messages. In the Agents Library the agent class takes advantage of this using an overwrite_buffer internally to track its state. Calling the start, cancel, done, and wait functions work with agent’s the internal overwrite_buffer.
single_assignment
Single_assignment behaves very similar to overwrite_buffer, except it will only accept one message from any of its sources. Once single_assignment accepts a message all subsequent offered messages will be declined. Just like overwrite_buffer, single_assignment gives a copy of the message to each of its targets; there is no competition for data.
Single_assignment provides the following two utility functions:
bool has_value() const ;
_Type const & value();
These perform exactly same as in overwrite_buffer.
Single_assignments are useful when a single value is read by many, similar to a const variable. A single_assignment can also be used to pick the first available message from a group of blocks. The offered message will be accepted and all others will be declined. Used in this form single_assignment can act as a choice or chooser from a group of blocks.
In following posts I will introduce more of the message blocks and provide sample applications.
Comments
Anonymous
July 09, 2009
The comment has been removedAnonymous
July 09, 2009
I discussed this question with .NET TPL team, my point was similar to yours - overload control is crucial for reliable async systems, their answer was something like: we provide the fastest queue implementation and generic interface for extensions, if you don't like our queues write your own. However note that if framework is unable to capture both sender and receiver of the message (seems to be the case with AAL), it can't just block sender because it's the deadly way to system-induced deadlocks, it can't silently drop messages (too unfriendly for naive user) and it can't return 'false' (who checks return values anyway?). So what's the options? Flexible and graceful overload control is possible only in pure agent-based systems where framework is able to capture both sender and receiver of the message.Anonymous
July 09, 2009
Yes I agree there are scenarios where a bounded_buffer is important. One question is what should the behavior be of the bounded_buffer when it reaches its capacity. What is the behavior you desire here? The simplest option is once full the bounded_buffer will decline any subsequent offered messages until it has more free space. This would mean the source block would have to later offer the message again or do something else with it. The full fledged solution would be something like bounded_buffer postpones any offered messages after its capacity is reached saving the offered message id. Then once more space in the bounded_buffer is avaliable it will try and reserve and consume the previously offered message. This requires tracking a message id per source. This solution results in no dropped messages anywhere, I'm assuming the second option would cover your case perfectly. It is possible at a later point we might add a bounded_buffer message block, however right now this is not in the dev10 timeframe. I'm planning in the future on doing a couple of blog posts showing how to create your own message blocks using our infrastructure. In these posts I will create whatever blocks seem to be the most important and missing from our current set. So, I'm interesting in hearing what you and other customers believe would be the most valuable ones to get samples of!Anonymous
July 15, 2009
Thanks for the comments. The way I handle this in my framework is to have the send function take an optional timeout, just like receive. When called with a zero timeout, the behavior would be like your simple option, requiring polling to continue. An infinite timeout would provide full flow control (but at the risk of deadlock without careful design if the data flow network is complex). A Finite timeout provides a compromise to be able to recover from transient deadlocks by backing off and retrying when necessary without compromising throughput in the general case. I'm not sure I understand how your full-fledged solution would work. If you postpone offered messages without somehow quenching the source then they will build up in memory somewhere, so it's not really any different to unbounded_buffer. How do you tell if asend has rejected (need to try again) or postponed (might be consumed later) the message when the same return code is used for both cases? Perhaps this will become clearer after your forthcoming posts...Anonymous
July 20, 2009
Yes with asend there is no way to differentiate between whether the target declined the message or postponed to possibly later consume. What you could do is use send, this will block until the target actually takes or declines the message. So if you do a send to a bounded_buffer that is full it would block until more space becomes avaliable. Doing this would prevent any build up and block the production of more data at the location it is created. Would this not work in your case? This past week I just created 3 new message blocks alternator, priority_buffer, and a bounded_buffer. The bounded_buffer works very similar to how I described above. I will try and post these new blocks when I get some time. Jon Baggott I would really be interested in chatting more with you about your scenarios and how they could be solved. If you go to the top of the page and pick "Email" under "This Blog" on the right of the page and fill in your email address I will get your contact information and we can talk in more detail.Anonymous
December 21, 2009
My English is so weak that I cannot understand this whole post. Could you please say shorter sentences for our non-english learner?