Writing a Custom Message Block – Part 2: Reserving and Consuming

In my last blog post, I covered an introduction to writing a custom message block. This covered the basic functions required to work with other message blocks in a message-passing network.

To be fully compliant with all message blocks, there are four more functions that must be defined to support reserving, consuming and releasing. Depending on your custom block and the behavior you desire, this could be as simple as disallowing reservations and returning false when someone tries to reserve. However, if you disallow reservations, there are certain blocks (like join) which you would not be able to link with properly.

For this example, I’ll continue using the squaring_block example introduced in the previous post, but extend it to allow a target of the block to reserve any message it was offered.

Let’s start with reserve_message:

    virtual bool reserve_message(runtime_object_identity _MsgId)
{
// Allow reservation if this is the current message
return messageQueue.front()->msgId() == _MsgId;
}

That’s a simple method! All I want to do is allow this reservation if this message is still the one being offered (i.e. it’s the one currently being offered). We can check this by simply checking if the message id’s are the same. Obviously, this reserve_message method can be customized however you want. You could only allow reservations for every other reserve call, only on Tuesdays, or whatever you like. We’ll take the simple approach here. By returning true, we are specifying that the target holds a reservation on the current message, and we will not transfer ownership of it to another block. The base propagator_block automatically sets two member variables, _M_pReservedFor and _M_reservedId, indicating the block currently holding a reservation and the Id of the reserved message, respectively.

For this block, the consume_message method is just as simple:

    virtual message<_Type>* consume_message(runtime_object_identity _MsgId)
{
// By default, accept the message
return accept_message(_MsgId);
}

There isn’t anything different we need to do for consume than simply accepting the message. The rest of the base propagator_block class code does all the error checks for you to ensure that you are the one that reserved a message and so forth.

The release_message method is also rather simple:

    virtual void release_message(runtime_object_identity _MsgId)
{
        // The head message is the one reserved.
if (messageQueue.front()->msgId() != _MsgId)
{
throw message_not_found();
}
}

Again, the base propagator_block class takes care of the heavy lifting. It handles the error checks and ensures that if you release a message, it will be repropagated to the rest of your target blocks. Now you can see why we recommend you use our base source_block, target_block, and propagator_block classes! All we have to specify in our release_message implementation is the one error check the base class is unable to do – whether this message is still the one we’re offering. We can’t do this in the base class because we don’t know how you’re storing the message – if at all.

The last necessary method to override in developing your custom message block is resume_propagation. In the previous paragraph, I mentioned that the base classes automatically take care of repropagating your released message. This happens by calling resume_propagatation. Both consume and release will invoke this method because both are called after a reservation has occurred and the message pipeline has stalled. The purpose of this method is to start up the propagation once again.

    virtual void resume_propagation()
{
        // If there are any messages in the buffer, propagate them out
if (messageBuffer.size() > 0)
{
async_send(NULL);
}
}

Here, if I have messages in my buffer, I simply do an async_send(NULL) . If you look back at propagate_to_any_targets, a NULL pointer passed in is simply ignored as far as processing of the message goes, and it will begin propagation starting with the head message!

One final change from my previous blog post, the propagate_to_any_targets method needs to be slightly altered to be made aware of reservations:

    virtual void propagate_to_any_targets(message<_Type> * _PMessage)
{
if (_PMessage != NULL)
{
            // Square the input, delete the original message
_Type outputPayload = _PMessage->payload * _PMessage->payload;
message<_Type> * newMessage = new message<_Type>(outputPayload);
delete _PMessage;

            messageQueue.push(newMessage);

            if (messageQueue.size() > 1)
return;
}

        if (_M_pReservedFor != NULL)
{
return;
}

        while (messageQueue.size() > 0)
{
message_status _Status = declined;
for (target_iterator _Iter = _M_connectedTargets.begin();
*_Iter != NULL; ++_Iter)
{
ITarget<_Target_type> * _PTarget = *_Iter;
_Status = _PTarget->propagate(messageQueue.front(), this);

// Ownership of message changed. Do not propagate this
// message to any other target.
if (_Status == accepted)
break;

                if (_M_pReservedFor != NULL)
{
return;
}
}

            // If status is anything other than accepted, then the current
// message was not propagated out. Nothing after it can be
// propagated out.

            if (_Status != accepted)
break;
}
}

The only thing I’ve added to this code is two checks for:

    if (_M_pReservedFor != NULL)
{
return;
}

Which simply return (and cease propagating messages) if someone reserves the currently offered message. Propagation will be stopped until the reserving block either consumes a message or releases the reservations, either of which will cause a new propagation task to start.

That’s it! With this reserving and consuming code, in addition to the code in my previous blog post, you have a fully functional message block that can interact with all of the other blocks already present in the Asynchronous Agents Library.  If you have any questions or there’s any other topics you would like covered, please let me know.