Writing a Custom Message Block – Part 1: Introduction

The Asynchronous Agents Library within the Concurrency Runtime provides a set of basic message blocks which can be used to create a message passing network. In most cases, these blocks have sufficient enough flexibility and can be composed together to provide all the necessary functionality for your message-passing network. However, there are cases in which the provided blocks do not have the exact behavior you want, and a custom message block is desired. In this post, I will discuss the process for creating your own custom message block.

What kind of message block are you writing?

The first thing to decide when creating a message block is to determine what kind if block you desire. Is it a source block, in which case it only offers messages but does not receive them, or is it a target block, which can only receive messages for processing, but does not offer them out to anyone else? It could also be a combination of the two, a block that both receives from sources and offers to targets.

The Asynchronous Agents Library contains two interfaces ISource and ITarget , which provides the API interface for source and target blocks, respectively. While you are free to create your own block from these interfaces, we highly recommend you use one of the three base classes we provide: source_block , target_block , and propagator_block . These are explained in the table below:

source_block

A block that is only a source of messages. This block offers messages to other blocks, but cannot receive messages.

target_block

A block that is only a target of messages. This block receives messages from other blocks, but cannot offer them.

propagator_block

A block that is both a source and target of messages. This block can both receive messages from a source and propagate message out to other blocks.

We recommend using these base classes because they significantly ease the process of developing a custom message block. They take care of all the error checking, safe unlinking and the required locks so they free you up to focus purely on the desired behavior of your message block.

All three base classes are templated on their network link registry type. This defines how many source or target links a block can have. The Asynchronous Agents Library contains two such link registries: single_link_registry and multi_link_registry , which allow a single or multiple links, respectively. For example, if you wanted to create a message block that was purely a source block and can only accept one target, you would declare your message block as:

class example : public source_block<single_link_registry<ITarget<_Type>>>

Similarly, you could specify the block to use multi_link_registry to allow multiple links. This is only differentiated because a single_link_registry can be slightly optimized over multiple links.

I will now focus on creating source and target blocks, and how they can be customized for different behavior. A propagator block would have the combination of the two.

Writing a source_block

A source_block is a block that offers messages to its targets. Thus, it must deal with target blocks calling back to it to take messages. The following six methods must be defined in your derived class and can be customized to change the behavior of your block:

virtual void propagate_to_any_targets(message<_Type> * _PMessage)

This method is the main propagation routine for a source_block. This is where your block should be processing the received message and offering messages to its targets. This method is called automatically when calling one of two methods: async_send or sync_send, which are used to asynchronously or synchronously propagate a message out, respectively. These two methods are present on every block and serve to initiate either a asynchronous or synchronous propagation for the block.

virtual message<_Type> * accept_message(runtime_object_identity _MsgId)

This method is called from a target block after they are offered a message from this source_block. The call to accept must be made within the source’s call to target::propagate, thus this is part of the message passing handshake. The method is used to take ownership of the offered message. A common issue to decide in accept_message is if your block returns the actual offered message itself, or copies of messages. For example, our unbounded_buffer block passes the specific message offered to its targets, and only one will receive that message. Our overwrite_buffer, on the other hand, returns copies to each of its targets, so all targets can get a version of the message.

virtual bool reserve_message(runtime_object_identity _MsgId)

This method is called from a target block after they are offered a message from this source_block and want to reserve the message to consume it later. The function should return true if you want to allow this message to be reserved, false otherwise.

virtual message<_Type> * consume_message(runtime_object_identity _MsgId)

This method is called from a target block after they have already successfully reserved a message and now wish to consume it. Similar to accept, a common issue to decide here is whether to return the specific message or a copy of the message.

virtual void release_message(runtime_object_identity _MsgId)

This method is called from a target block after they have already successfully reserved a message and now do not wish to consume it. Mostly cleanup and possibly error checking should be done here.

virtual void resume_propagation()

This method is automatically called when a reserved message is consumed or a message is released. Here, you should decide how to proceed with propagation. In general an async_send(NULL) should be called, which will simply force the block to restart propagation without a new message.

Writing a target_block

A target_block is a block that can accept messages offered from its sources. Thus, it must deal with messages being passed to it from sources. There are two methods than can be overridden to handle the messages, propagate_message and send_message. Only propagate_message must be overridden in the derived class; send defaults to not allowing synchronous propagation.

virtual message_status propagate_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)

This method is called after a message, _PMessage, has been asynchronously offered from source _PSource and has successfully passed any filter that you may have added to your block. The message has not yet been accepted, so ownership still resides with the source _PSource. Thus, this target_block can now be customized to do whatever it wants with the message and source. Typically, you want to call _PSource->accept(_PMessage->msg_id(), this);

in order to take ownership of this message from the source. However, the implementation of this method is totally dependent on the desired behavior for this block.

virtual message_status send_message(message<_Type> * _PMessage, ISource<_Type> * _PSource)

This method is exactly the same as propagate_message except that the message _PMessage has been synchronously offered from source _PSource .

Combining the two: Writing a propagator_block

The main usage of a propagator_block is its ability to receive a message, like a target_block, then manipulate that message and send it out, like a source_block. Thus, creating a propagator_block is simply the combination of the methods defined above. The propagation of a received message can be done by using either of the two methods : async_send and sync_send.

The async_send(message * _PMessage) method will spin up a light-weight task in the Concurrency Runtime which will asynchronously call propagate_to_any_targets(_PMessage) to manipulate the messages as you wish according to your custom block, and offer the message out to its targets.

Similarly, the sync_send(message * _PMessage) method will also offer the message out to its targets, but does so synchronously, in line.

Example: Squaring block

As a simple example, let’s look at how I would write a simple message block that accepts any message from a source, squares the received message payload, then propagates out a message with the square to its targets.

First, since this block both receives in and sends out messages, I need to make it a propagator_block. I’ll define the class as:

    template<class _Type = int>
    class squaring_block :
          public propagator_block<multi_link_registry<ITarget<_Type>>,
                                  multi_link_registry<ISource<_Type>>>

This class declaration specifies that my block is a propagator_block that can link to any number of targets and be linked from any number of sources.

Next, I’ll need some place to store my messages as they arrive in this block. I’ll need to store them in case, for example, the input throughput exceeds the output throughput, or in case a target reserves one of the offered messages and we need to stall the network pipeline until it is consumed or released. To do this, let’s just include a simple queue that holds our messages:

     std::queue<message<_Type> *> messageBuffer;

Depending on your block, you may not need to store a queue of messages, or any messages at all. It all depends on the functionality you desire.

First, I’ll specify the constructor and destructor for the block:

    squaring_block()
    {
        initialize_source_and_target();
    }

    ~squaring_block()
    {
        remove_network_links();
    }

The call to initialize_source_and_target is a base class method for propagator_block that initializes the block and allows you to specify which Scheduler or ScheduleGroup this block uses. For simplicity, and in most cases, we can just call this method with no arguments. (The source_block and target_block base classes have initialize_source and initialize_target methods, respectively.) The destructor must call remove_network_links, which is another base class method. This method is critical in that it ensures the links to and from this block are properly removed before destructing the object. Otherwise, asynchronous propagations could be continuing to deleted memory.

Now, let’s move on to the interesting aspects about customizing the block. As a target, this squaring_block is going to receive messages offered to it. A source block will do this by calling squaring_block::propagate. Since we are deriving from propagator_block, all the necessary error handling has already been done, and we just need to specify what happens after we get a message that we actually want in the propagate_message function (which is called by propagate):

    virtual message_status propagate_message(message<_Type> * _PMessage,
                                              ISource<_Type> * _PSource)
    {
        message_status result = accepted;

        _PMessage = _PSource->accept(_PMessage->msg_id(), this);

        if (_PMessage != NULL)
        {
            async_send(_PMessage);
        }
        else
        {
            result = missed;
        }

        return result;
    }

The code here simply tries to accept a message from the source by calling           _PSource->accept, then, if it properly returned a message, it would call async_send(_PMessage) on the returned message. As soon as the accept call has returned, the message has transferred ownership to our squaring_block from the source; however, our squaring_block has not yet processed or stored away the message and prepared it for transferring to any of our targets. That process is initiated by async_send.

Similar to propagate_message, there is a send_message function which can be specified. This function happens on a synchronous path, while propagate is for the asynchronous. For this purposes of this block, we only need to change the implementation to call sync_send instead of async_send and everything else is the same.

Propagation of messages

Now how about processing and offering the message out to the squaring_block’s targets? This is done via the source methods that were outlined earlier. First, let’s specify the propagate_to_any_targets method. This is the method that is called in a light-weight task created by the call to async_send during propagate_message and is where all the main processing should occur.

    virtual void propagate_to_any_targets(message<_Type> * _PMessage)
    {
        if (_PMessage != NULL)
        {
            // Square the input, delete the original message
            _Type outputPayload = _PMessage->payload * _PMessage->payload;
            message<_Type> * newMessage = new message<_Type>(outputPayload);
            delete _PMessage;

            messageQueue.push(newMessage);

            if (messageQueue.size() > 1)
     return;
        }

        while (messageQueue.size() > 0)
        {
            message_status status = declined;

            for (target_iterator _Iter = _M_connectedTargets.begin();
                 *_Iter != NULL; ++_Iter)
            {
                ITarget<_Target_type> * _PTarget = *_Iter;
status = _PTarget->propagate(messageQueue.front(), this);

                // Ownership of message changed. Do not propagate this
// message to any other target.
if (status == accepted)
break;
}

            // If status is anything other than accepted, then the current
// message was not propagated out. Nothing after it can be
// propagated out.
            if (status != accepted)
                break;
        }
    }

Let’s step through that code slowly and investigate how I made this block behave how I wanted it to. The first part of the function is focused on the processing of the message:

    // Square the input, delete the original message
    _Type outputPayload = _PMessage->payload * _PMessage->payload;
    message<_Type> * newMessage = new message<_Type>(outputPayload);
    delete _PMessage;

    messageQueue.push(newMessage);

    if (messageQueue.size() > 1)
return;

First, upon receiving a message I squared the input, providing the desired functionality of this block. Next, I delete the old message, since this squaring_block is the current owner of it, but its lifetime will end with this block; i.e. there will be no further propagations of that specific message out of this block. Then, I checked if there was any current message being offered by this block. If so, I enqueue the new message to the queue of stored messages and return; otherwise, I can begin propagation.

One subtle note about message processing: while I technically could have done the processing of the message by squaring the input in propagate_message rather than in propagate_to_any_targets, it is better practice to do the latter. This is because propagate_message is called from your block’s source, thus it is running in a task initiated by your source. Doing processing in propagate_to_any_targets is better because it is your block’s propagation task.

At this point, everything else in the propagate_to_any_targets method has to do with the actual propagation to targets. The message has already been processed and is ready for offering. The main while loop takes each message, one at a time starting with the head, and offers it to each of the targets, also one at a time. By using our standard multi_link_registry and our base classes, I’m able to use our included iterators to search through the targets with a simple for loop and propagate out to my targets:

    for (target_iterator _Iter = _M_connectedTargets.begin();
         *_Iter != NULL; ++_Iter)
    {
        ITarget<_Target_type> * _PTarget = *_Iter;
status = _PTarget->propagate(messageQueue.front(), this);

        // Ownership of message changed. Do not propagate this
// message to any other target.
if (status == accepted)
break;
}

This takes the currentMessage and propagates it out to the targets. After each propagate call returns, we check to see what the result is. If it was accepted, that means the ownership has changed and we can break out and offer the next message in our queue. Otherwise, we should propagate it to the next target we have. If no target accepts the message, we propagation should stall until that currentMessage is accepted or consumed. We cannot propagate other messages in the queue, otherwise messages will be propagated out of order.

Now that we’ve created a block that can offer messages out, we need to handle what happens when a target block calls back and tries to take the offered message. This is handled in the accept_message method:

    virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
    {
        message<_Type> * msg = NULL;

        if (messageQueue.front()->msg_id() == _MsgId)
        {
            msg = messageQueue.front();
            messageQueue.pop();
        }

        return msg;
    }

This method just checks to see if the message the target is trying accept is the same as the current one that my squaring_block is holding and trying to propagate. If so, it dequeues it from my block, and returns it to the target. Note: as soon as it returns the message, ownership of the message has changed hands. We can no longer safely touch that message pointer.

At this point, we have a block that can participate in both synchronous and asynchronous message passing! For completeness, there are four more functions to handle reserving and consuming of messages. However, as this post is long enough already, I’ll simplify this block by disallowing reserving and consuming of message. In my next post, I’ll dive into the details of how to handle proper reserving of messages. Disallowing reservations can be done by:

    virtual bool reserve_message(runtime_object_identity _MsgId)
    {
       return false;
    }

    virtual message<_Type>* consume_message(runtime_object_identity _MsgId)
{
       return NULL;
    }

    virtual void release_message(runtime_object_identity _MsgId)
    {
    }

    virtual void resume_propagation()
    {
    }

So that’s it! That’s a rather lengthy, but in-depth introduction to writing custom message blocks! Please let me know if there’s anything still confusing or there’s any interesting custom message blocks that you come up with! Look for my next post soon on allowing reservations.