Asynchronous Agents Library - Intro to Message Blocks

In my previous post I talked about the agent class. Now I will introduce the Agents Library’s message blocks, how to use them, and the fundamentals of what they do. I will cover some basics that apply to all of the message blocks, introduce the messaging APIs, and then specifically explain three blocks: unbounded_buffer, overwrite_buffer, and single_assignment.

Message Blocks

In the Agents Library we have created a set of interfaces and defined a protocol for message blocks to communicate and exchange messages. Message blocks are intended to be used to establish defined communication protocols between isolated components and develop concurrent applications based on data flow. The message blocks provided by the Agents Library can be used in conjunction with the agents class itself or separately.

For more information on data flow - https://en.wikipedia.org/wiki/Dataflow

ISource and ITarget Interfaces

ISource and ITarget are base interfaces message blocks work with. They declare functions for handling linking and exchanging messages. A source block implements ISource, a target block ITarget, and a propagator block, a block that is a source and a target, implements both. In this blog I will only be discussing the functions needed to use the message blocks provided by the Agents Library. A more comprehensive knowledge of the functions in the ISource and ITarget interfaces are required when creating your own message blocks; this will be discussed in a later post.

Linking/Unlinking and Creating Messaging Networks

Often it is desirable to link message blocks together to create a network. One such reason for building messaging networks is to create data flow pipelines. The following three functions, declared on the ISource interface, are used to connect source blocks to target blocks:

void link_target(ITarget<_Type> * _PTarget); – adds a link to the specified target. This causes the source block to offer any of its messages to the target until it is unlinked.

void unlink_target(ITarget<_Type> * _PTarget); – removes an existing link with the specified target, once unlinked no more messages will be presented to the target.

      void unlink_targets(); – removes all links with any target blocks.

Here is a simple example connecting and then disconnecting two unbounded_buffers:

            unbounded_buffer<int> buffer1, buffer2;

      buffer1.link_target(&buffer2);

      ...

      buffer2.unlink_target(&buffer2);

Some blocks have restrictions on the number of targets allowed; invalid_link_target exception is thrown in these cases.

Basic Message Propagation

Messages are exchanged between blocks using light weight tasks (LWTs) on a scheduler, because of this message propagation works cooperatively with any other work in the same scheduler. This means long running tasks that never block or yield can slow down or cease the forward progress of message delivery. Thus is the nature of working in a cooperative environment.

All of the message blocks built into the Agents Library guarantee in-order message delivery. It is possible to create your own blocks which do not preserve order, however all of built in ones do.

Messaging APIs - send, asend, receive, and try_receive

Once creation of messaging networks and propagation of messages is understood the only other thing needed to start programming is how to insert and remove messages directly from individual blocks.

Two global functions are used to create and insert messages:

            bool send(ITarget<_Type> &_Trg, const _Type &_Data);

      bool asend(ITarget<_Type> &_Trg, const _Type &_Data);

Each of these takes a target and the data to transmit. Send synchronously originates a message with a target, whereas asend asynchronously does. This means a send call will block until the target either takes the message or declines it. Send returns true if the message was delivered and false otherwise. Asend will not block until the target takes the message, it offers the message and immediately returns. A return value of true means the target has accepted the message and will eventually take it, otherwise false means the target either declined the message or postponed the decision on whether or not to take it until later.

Likewise here are the two global functions for removing or extracting messages:

template <class _Type>

_Type receive(ISource<_Type> & _Src, unsigned int _Timeout = COOPERATIVE_TIMEOUT_INFINITE);

template <class _Type>

bool try_receive(ISource<_Type> & _Src, _Type & _value);

Receive takes a source block to extract a message from and an optional timeout, the extracted value is the return value. If a message is currently not available in the source then receive will block until one is, optionally a timeout also may be specified. Correspondingly try_receive will only obtain a message if the source has one at that instance, otherwise it returns immediately. A return value of true on try_receive indicates a message was received, false means one was not.

All of these functions when blocking do so cooperatively with the Concurrency Runtime. To learn more about working cooperatively with the Concurrency Runtime take a look at the series of posts on Synchronization with the Concurrency Runtime.

unbounded_buffer

Unbounded_buffer is one of the most basic messaging blocks; it acts very similar to a queue. As its name suggests unbounded_buffer can store any number of messages, limited only by memory, collected from its source links (links to blocks that have the unbounded_buffer as a target). Unbounded_buffer always accepts all messages offered to it. Messages propagated to an unbounded_buffer are collected into a queue and then offered one at a time to each of its targets. Each message in an unbounded_buffer will only be given to one of its targets based on link ordering. This means targets of an unbounded_buffer compete for messages.

Unbounded_buffer provides two utility functions:

            bool enqueue(_Type const& _Item);

      _Type dequeue();

Each of these is equivalent to send and receive respectively, and basically are wrappers around them.

unbounded_buffer<int> buffer;

      // These are equivalent.

      buffer.enqueue(1);

      send(buffer, 1);

      // And so are these.

      int value = buffer.dequeue();

      int value = receive(buffer);

Unbounded_buffers are excellent for producer/consumer patterns. In a previous post Introduction to Asynchronous Agents Library the FindString agents sample makes use of unbounded_buffers to communicate between the individual agents.

overwrite_buffer

Essentially overwrite_buffer is a simple broadcaster. Overwrite_buffer is a message block that holds one message at a time, very similar to a variable. Every time an overwrite_buffer receives a message it offers a copy of it to any of its targets and then stores the message internally, replacing any previously stored message. The important thing to note here is there is no competition for data. Every time a message comes into an overwrite_buffer it is offered to all of its targets, then afterwards it can be overwritten at any point.

Overwrite_buffer also provides two utility functions:

bool has_value() const ;

_Type value();

Has_value returns true or false indicating whether or not the overwrite_buffer has received its first message. Value is a wrapper around receive. Has_value can be used to check if overwrite_buffer has a message, if overwrite_buffer does then calling value or receive will not be a blocking call.

Uses of overwrite_buffer include tracking state, continuously monitoring status, or broadcasting messages. In the Agents Library the agent class takes advantage of this using an overwrite_buffer internally to track its state. Calling the start, cancel, done, and wait functions work with agent’s the internal overwrite_buffer.

single_assignment

Single_assignment behaves very similar to overwrite_buffer, except it will only accept one message from any of its sources. Once single_assignment accepts a message all subsequent offered messages will be declined. Just like overwrite_buffer, single_assignment gives a copy of the message to each of its targets; there is no competition for data.

Single_assignment provides the following two utility functions:

            bool has_value() const ;

      _Type const & value();

These perform exactly same as in overwrite_buffer.

Single_assignments are useful when a single value is read by many, similar to a const variable. A single_assignment can also be used to pick the first available message from a group of blocks. The offered message will be accepted and all others will be declined. Used in this form single_assignment can act as a choice or chooser from a group of blocks.

In following posts I will introduce more of the message blocks and provide sample applications.