A Broker Pipeline Component for End-To-End Ordered Delivery

Introduction

Recently I had chance to work with a health care company that makes an extensive use of the BizTalk Accelerator for HL7 for processing a large amount of messages containing clinical and administrative information. Health care companies need a reliable application platform that can ensure medical data is transmitted securely and reliably over their internal networks. They typically rely on the HL7 (Health Level 7) industry standard to transmit medical data (e.g. patient information) to one or many backend systems and need a messaging system that can guarantee ordered message delivery. In addition, their application platform must provide a reliable queuing mechanism which should be able to gracefully handle overload and backlog conditions. When implementing an HL7 solution on the BizTalk Server platform, ordered message delivery can be achieved through the sequential convoy singleton orchestration pattern and the ordered delivery feature of the native and the use of ordered-delivery compliant adapters, such as MLLP (Minimal Lower Layer Protocol), MQSeries or MSMQ adapter. In particular, a sequential convoy enables multiple distinct messages to join together to achieve a required result. A sequential convoy is a set of related messages that have a predefined order and a singleton orchestration can be used to guarantee that incoming documents are processed in rigorous sequential order.
For more information on ordered message delivery and convoys, see the following articles:

  • "Sequential Convoys" topic on the MSDN site.
  • "Parallel Convoys" topic on the MSDN site.
  • "BizTalk Server 2004 Convoy Deep Dive " topic on the MSDN site.
  • “Working with Convoy Scenarios” topic on the MSDN site.
  • “Ordered Delivery of Messages” topic on the MSDN site.
  • "Sequential Convoy" sample on the BizTalk Gurus site.
  • “Improving Ordered Message Delivery in BizTalk Server 2006 R2 During Concurrent Processing” whitepaper on the Microsoft site.
  • “Implement End-to-End Ordered Delivery Using Microsoft BizTalk Server and SQL Server Service Broker” whitepaper on the TechNet Wiki site.

Problem Statement

One of the customer requirements was to receive and process incoming messages end-to-end in sequential order. Besides, incoming messages could be routed to a pool of backend systems based on their type (context-based routing) and based on the information contained in the payload (content-based routing). Therefore, the development team of the health care company created a distinct sequential convoy for each backend system. In particular, each orchestration used a logical receive port configured to support ordered delivery and a Listen shape inside a loop to wait for one of several types of message and branch based upon the kind of the inbound document. Unfortunately, this pattern didn’t work out as using multiple Receive shapes within a convoy orchestration does not guarantee ordered message delivery. Hence, the customer adopted another strategy: they reverted back to a convoy orchestration with a single Receive shape that was subscribing and receiving all message types and filtering out those documents that were not intended to be consumed by the backend system. However, this solution was very costly in terms of performance, because each message was sent to a large amount of convoy orchestrations, but only a few of them were indeed processing the document in question.

Solution

To solve the problem and allow inbound documents to be dispatched only to the intended convoy orchestrations, I suggested an alternative architectural approach where the target systems are determined on the receive location based on the rules defined in a Business Rule Engine policy. The idea behind my solution is pretty straightforward. I created a Property Schema which defines a promoted property for each backend system. Then I created a convoy orchestration for each of these systems. In particular, the orchestration contains a single receive shape within a loop which is able to receive any kind of messages. To implement this pattern, I defined the inbound document of type XmlDocument. Each convoy orchestration uses a filter expression of the form <backend system promoted property> exists to receive only those messages that are effectively destined to the related backend system. The core component in my solution is represented by a pipeline component called BrokerPipelineComponent that runs within the receive location. This object invokes a Business Rule Engine policy to determine the subscribers of the current message based on their type. In particular, the business rules defined in the policy use the message content and context to promote the properties associated with the target systems. To validate the effectiveness of my ideas, I created an end-to-end solution depicted in the following picture.

Architecture

Message Flow:

  1. A Unit Test running inside Visual Studio 2010 submits a configurable amount of XML messages of 3 different types (Dvd, Cd, BluRay) to a transactional MSMQ queue. The 3 document types are represented by 3 different XML schemas in a BizTalk library called Schemas. The Schemas project contains also a PropertySchema where I defined a custom context property for each convoy orchestration.
  2. A WCF-NetMsmq receive location reads the XML messages from the queue.
  3. The XML Disassembler component within the BrokerReceivePipeline pipeline promotes the MessageType context property. Then the BrokerPipelineComponent  applies the business rules contained in the BRE policy to determine the list of the context properties to promote, one for each intended sink for the message. The name of the policy to apply can be configured in the component properties. The component allows also to cache rules for each MessageType. In this case, the BRE policy only the first time that a certain message type is encountered, whereas the subsequent times the rule outcome will be retrieved from the in-process cache. This mechanism allows to speed up the subscription matching and message dispatching process. Finally, the TracePipelineComponent can be enabled to write the message content and context to the standard output. This allows to monitor the the behavior of the BrokerPipelineComponent  using a tool such as DebugView.
  4. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  5. The inbound request starts a new instance of the SupplierA and/or SupplierB convoy orchestrations which, in my sample, handle the messages destined to a couple of distinct partners. As mentioned above, each convoy orchestration uses a filter expression of the form <backend system promoted property> exists to receive only those messages that are effectively destined to the related backend system. Both orchestrations share the same structure and in particular use a Call Orchestration shape to invoke the Tracer orchestration. The latter writes the document content to the standard output, applies a transformation map to it and sends the new message to a solicit-response send port.
  6. The orchestration publishes the message to the MessageBox (BizTalkMsgBoxDb).
  7. The message is retrieved by the a solicit-response WCF-SQL send port. There are 2 distinct send ports called respectively SupplierA and SupplierB that simulate the communication with 2 different backend systems.
  8. Each send port invokes a distinct stored procedure that writes the information contained in the inbound document to the table associated with the related backend system. These tables are called SupplierA and SupplierB and are both located in the same SQL Server database called OrderedDeliveryBroker.
  9. The send port publishes the response message to the MessageBox (BizTalkMsgBoxDb).
  10. The response message is returned to the  convoy orchestration that waits for the next message.

In the following section we’ll examine the artifacts that compose the end-to-end application.

Message Queue

The following picture shows the configuration of the transactional MSMQ queue used by the BizTalk application to receive the inbound documents sent by the unit test running in Visual Studio 2010.

OrderedDeliveryBroker Queue

MSMQQueueDefinition

Schemas

The OrderedDeliveryBroker application receives 3 different types of message that have the following format:

Cd Message

 <Cd xmlns="https://schemas.appfabric.cat.microsoft.com/ordereddeliverybroker/2011/cd">  <Id>112</Id>  <Name>The fat of the land</Name>  <Price>4.99</Price></Cd>

Dvd Message

 <Dvd xmlns="https://schemas.appfabric.cat.microsoft.com/ordereddeliverybroker/2011/dvd">  <Id>113</Id>  <Name>Star Wars</Name>  <Price>7.99</Price></Dvd>

The corresponding response messages returned by the SyncMagic8Ball orchestration have the following format:

BluRay Message

 <BluRay xmlns="https://schemas.appfabric.cat.microsoft.com/ordereddeliverybroker/2011/bluray">  <Id>111</Id>  <Name>Inception</Name>  <Price>9.99</Price></BluRay>

The Schemas project also contains a PropertySchema that defines 2 custom context properties, one for each target backend system:

PropertySchema

Receive Location

To retrieve messages from the MSMQ queue I defined a WCF-NetMsmq receive location. The picture below displays the URI of the queue.

WCF-NetMsmq Receive Location Transport Properties

WCF-NetMsmqReceiveLocation01

The following picture shows that the Transactional and Ordered Processing features have been turned to increase the application reliability and guarantee ordered message delivery.

Binding Configuration

WCF-NetMsmqReceiveLocation02

The following pictures shows the configuration of the BrokerReceivePipeline. In particular note the name of the Policy used by the BrokerPipelineComponent.

BrokerReceivePipeline Configuration

BrokerReceivePipeline

Orchestrations

The following picture shows the SupplierA convoy orchestration. The SupplierB orchestration shares the same structure. In particular, the Ordered Delivery property of the SupplierReceivePort is set to true to assure that incoming messages are processed in sequential order.

SupplierA Orchestration

SupplierAOrchestration

The following picture shows the structure of the Tracer orchestration invoked by the SupplierA and SupplierB orchestrations. This orchestration writes the document content to the standard output, applies a transformation map to the inbound message passed as argument and sends the resulting message to a solicit-response send port.

Tracer Orchestration

TracerOrchestration

Send Ports

The OrderedDeliveryBroker application uses 2 distinct WCF-Custom send ports to write messages to the SupplierA and SupplierB tables in a custom SQL Server database. Both send ports are configured to use the sqlBinding. The picture below shows the configuration of one of the 2 send ports. In particular, you can notice that the send port is configured to invoke a stored procedure called supplierA_InsertLogRetry .

OrderedDeliveryBroker.SupplierA.WCF-SQL.SendPort Configuration

WCF-CustomSendPort

Database

The following picture shows the OrderedDeliveryBroker database and in particular structure of the SupplierA and SupplierB tables used to simulate two distinct backend systems.Database

Business Rule Engine Policy

The following picture shows the OrderedDeliveryBroker policy that defines the routing rules used to dispatch messages just to the intended convoy orchestrations.

OrderedDeliveryBroker Policy

Policy

BrokerPipelineComponent

The BrokerPipelineComponent represents the core component of the application. It invokes the BRE policy configured on the receive location and then promotes the list of context properties returned by the rule set to dispatch the inbound document only to the intended sinks. For your convenience, I included below the code of BrokerPipelineComponent. For brevity, I omitted the code of the ContextDictionary class used to define rules in the BRE engine and return results to the BrokerPipelineComponent.

BrokerPipelineComponent Class

 
 #region Copyright//=======================================================================================//Microsoft Windows Server AppFabric Customer Advisory Team (CAT)  //// This sample is supplemental to the technical guidance published on the community// blog at https://www.appfabriccat.com/. // // Author: Paolo Salvatori//=======================================================================================// Copyright © 2011 Microsoft Corporation. All rights reserved.// // THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER // EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE IMPLIED WARRANTIES OF // MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. YOU BEAR THE RISK OF USING IT.//=======================================================================================#endregion#region Using Directivesusing System;using System.Data;using System.Data.SqlClient;using System.Resources;using System.Drawing;using System.Diagnostics;using System.EnterpriseServices;using System.Collections;using System.Collections.Generic;using System.Reflection;using System.ComponentModel;using System.Linq;using System.Text;using System.IO;using System.Xml;using System.Xml.Serialization;using System.Security.Principal;using System.Runtime.InteropServices;using Microsoft.BizTalk.Message.Interop;using Microsoft.BizTalk.Component.Interop;using Microsoft.BizTalk.Streaming;using Microsoft.RuleEngine;using Microsoft.AppFabric.CAT.OrderedDeliveryBroker.PipelineComponents.Properties;using Microsoft.AppFabric.CAT.OrderedDeliveryBroker.Common;#endregionnamespace Microsoft.AppFabric.CAT.OrderedDeliveryBroker.PipelineComponents{    public enum QueryResolverType    {        Configuration,        Policy    }    /// <summary>    /// This component can be used to invoke a BRE policy composed of multiple rules.    /// In particular, the component performs the following actions:    ///  - The pipeline component creates an instance of the ContextDictionary class and populates this collection     ///    with the name and value of the context properties. In particular    ///    the key of each item has the following format: propertyNamespace#propertyName.    ///  - The pipeline component creates a TypedXmlDocument to pass the content of the message to the BRE policy    ///  - Invokes the BRE policy indicated by the Policy property within pipeline configuration.    ///  - The rules which compose the policy use the information contained in the ContextDictionary,    ///    for example the MessageType context property, to decide which context properties to promote.    ///  - The rules adds the context properties to promote to the ContextDictionary.ToPromote list.    ///  - Finally, the pipeline component reads and promotes the properties     ///    contained in the ContextDictionary.ToPromote.    ///  - If the value of the CacheEnabled property is true, then the pipeline component stores the couple    ///    (messagetype, list of context properties to promote) in the subscriberDictionary static dictionary.    ///    This technique allows to avoid calling the BRE policy the subsequent times that a message of a certain    ///    type is received and processed by the pipeline component and this can speed up the execution of the    ///    pipeline component. The downside of using an in-process cache is that the host process     ///    running the receive location should be restarted in case the BRE policy changes.    /// </summary>    /// <remarks>    ///</remarks>    [ComponentCategory(CategoryTypes.CATID_PipelineComponent)]    [ComponentCategory(CategoryTypes.CATID_Any)]    [ComponentCategory(CategoryTypes.CATID_Validate)]    [Guid("39B81A89-BB0B-4E3D-98E4-98DA381752E1")]    public class BrokerPipelineComponent : NewBaseCustomTypeDescriptor,                                         IBaseComponent,                                         Microsoft.BizTalk.Component.Interop.IComponent,                                         Microsoft.BizTalk.Component.Interop.IPersistPropertyBag,                                         IComponentUI    {        #region Private Constants        private const string TraceEnabledPropertyName = "TraceEnabledPropertyName";        private const string TraceEnabledPropertyDescription = "TraceEnabledPropertyDescription";        private const string CacheEnabledPropertyName = "CacheEnabledPropertyName";        private const string CacheEnabledPropertyDescription = "CacheEnabledPropertyDescription";        private const string BufferSizePropertyName = "BufferSizePropertyName";        private const string BufferSizePropertyDescription = "BufferSizePropertyDescription";        private const string ThresholdSizePropertyName = "ThresholdSizePropertyName";        private const string ThresholdSizePropertyDescription = "ThresholdSizePropertyDescription";        private const string PolicyPropertyName = "PolicyPropertyName";        private const string PolicyPropertyDescription = "PolicyPropertyDescription";        private const int DefaultBufferSize = 0x4000;        private const int DefaultThresholdSize = 0x100000;        #endregion        #region Private Fields        private int bufferSize = DefaultBufferSize;        private int thresholdSize = DefaultThresholdSize;        private bool traceEnabled = false;        private bool cacheEnabled = false;        private string policy = null;        #endregion        #region Private Static Fields        private static Dictionary<string, List<ContextProperty>> subscriberDictionary =                                                                       new Dictionary<string, List<ContextProperty>>();        #endregion        #region Private Static Fields        private static ResourceManager resourceManager = new ResourceManager(Resources.ResourcesClass,                                                                              Assembly.GetExecutingAssembly());        private Type forwardOnlyStreamType = typeof(ForwardOnlyEventingReadStream);        #endregion        #region Public Constructors        /// <summary>        /// Constructor initializes base class to allow custom names and description for component properies        /// </summary>        public BrokerPipelineComponent()            :            base(resourceManager)        {        }        #endregion        #region Public Properties        /// <summary>        /// Gets or sets the size of the buffer used to read the message part.        /// </summary>            [PropertyName(BufferSizePropertyName)]        [PropertyDescription(BufferSizePropertyDescription)]        public int BufferSize        {            get            {                return bufferSize;            }            set            {                bufferSize = value;            }        }        /// <summary>        /// Gets or sets the VirtualStream threshold size.        /// </summary>            [PropertyName(ThresholdSizePropertyName)]        [PropertyDescription(ThresholdSizePropertyDescription)]        public int ThresholdSize        {            get            {                return thresholdSize;            }            set            {                thresholdSize = value;            }        }        /// <summary>        /// Gets or sets a Boolean value indicating whether tracing is enabled.        /// </summary>            [PropertyName(TraceEnabledPropertyName)]        [PropertyDescription(TraceEnabledPropertyDescription)]        public bool TraceEnabled        {            get            {                return traceEnabled;            }            set            {                traceEnabled = value;            }        }        /// <summary>        /// Gets or sets a Boolean value indicating whether caching is enabled.        /// </summary>            [PropertyName(CacheEnabledPropertyName)]        [PropertyDescription(CacheEnabledPropertyDescription)]        public bool CacheEnabled        {            get            {                return cacheEnabled;            }            set            {                cacheEnabled = value;            }        }        /// <summary>        /// Gets or sets the name of the BRE Policy to invoke.        /// </summary>            [PropertyName(PolicyPropertyName)]        [PropertyDescription(PolicyPropertyDescription)]        public string Policy        {            get            {                return policy;            }            set            {                policy = value;            }        }        #endregion        #region IBaseComponent        /// <summary>        /// Name of the component.        /// </summary>        [Browsable(false)]        public string Name        {            get            {                return Resources.BrokerPipelineComponentName;            }        }        /// <summary>        /// Version of the component.        /// </summary>        [Browsable(false)]        public string Version        {            get            {                return Resources.BrokerPipelineComponentVersion;            }        }        /// <summary>        /// Description of the component.        /// </summary>        [Browsable(false)]        public string Description        {            get            {                return Resources.BrokerPipelineComponentDescription;            }        }        #endregion        #region IComponent        /// <summary>        /// Implements IComponent.Execute method. For details, see the description of the class.        /// </summary>        /// <param name="context">Pipeline context</param>        /// <param name="inputMessage">Input message.</param>        /// <returns>Processed input message with appended or prepended data.</returns>        /// <remarks>        /// IComponent.Execute method is used to initiate the processing of the message in pipeline component.        /// </remarks>        public IBaseMessage Execute(IPipelineContext context, IBaseMessage message)        {            try            {                // Check that the context is not null                if (context == null)                {                    throw new ArgumentException(Resources.ContextIsNullMessage);                }                // Check that the message is not null                if (message == null)                {                    throw new ArgumentException(Resources.InboundMessageIsNullMessage);                }                IBaseMessagePart bodyPart = null;                bodyPart = message.BodyPart;                // Check that the message part is not null                if (bodyPart == null)                {                    TraceHelper.WriteLine(string.Format(Resources.MessageDoesNotContainsThePartCalledMessage,                                                         Resources.Body));                    return message;                }                Stream stream = bodyPart.GetOriginalDataStream();                // Check that the original data stream of the body part is not null                if (stream == null)                {                    TraceHelper.WriteLine(string.Format(Resources.MessagePartStreamIsNullMessage, Resources.Body));                    return message;                }                // Executes the component logic only if the message stream can be read                if (stream.CanRead)                {                    VirtualStream virtualStream = null;                    ReadOnlySeekableStream readOnlySeekableStream = null;                    // if the stream is not seekable, wrap it with a ReadOnlySeekableStream object                    if (!stream.CanSeek)                    {                        virtualStream = new VirtualStream(bufferSize, thresholdSize);                        readOnlySeekableStream = new ReadOnlySeekableStream(stream, virtualStream, bufferSize);                        context.ResourceTracker.AddResource(readOnlySeekableStream);                        stream = readOnlySeekableStream;                    }                    IBaseMessageContext messageContext = message.Context;                    // This method invokes the BRE policy and promotes context properties used to determine                    // the subscribers of the actual message.                    FindSubscribers(stream, ref messageContext, context);                    stream.Seek(0, SeekOrigin.Begin);                    bodyPart.Data = stream;                }                else                {                    TraceHelper.WriteLine(context,                                           Resources.StreamDoesNotSupportReadMessage,                                           EventLogEntryType.Warning);                }            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.BrokerPipelineComponentName, ex);                TraceHelper.WriteLineIf(traceEnabled,                                        context,                                        ex.Message,                                        EventLogEntryType.Error);                throw;            }            return message;        }        #endregion        #region IPersistPropertyBag        /// <summary>        /// Gets class ID of component for usage from unmanaged code.        /// </summary>        /// <param name="classid">Class ID of the component.</param>        public void GetClassID(out Guid classid)        {            classid = new System.Guid("39B81A89-BB0B-4E3D-98E4-98DA381752E1");        }        /// <summary>        /// Not implemented.        /// </summary>        public void InitNew()        {        }        /// <summary>        /// Loads configuration property for component.        /// </summary>        /// <param name="pb">Configuration property bag.</param>        /// <param name="errlog">Error status (not used in this code).</param>        public void Load(IPropertyBag propertyBag, Int32 errlog)        {            try            {                object data;                data = ReadPropertyBag(propertyBag, Resources.BufferSizePropertyName);                if (data != null &&                    data is int)                {                    bufferSize = (int)data;                    if (bufferSize <= 0)                    {                        bufferSize = DefaultBufferSize;                    }                }                data = ReadPropertyBag(propertyBag, Resources.ThresholdSizePropertyName);                if (data != null &&                    data is int)                {                    thresholdSize = (int)data;                    if (thresholdSize <= 0)                    {                        thresholdSize = DefaultThresholdSize;                    }                }                data = ReadPropertyBag(propertyBag, Resources.TraceEnabledPropertyName);                if (data != null &&                    data is bool)                {                    traceEnabled = (bool)data;                }                data = ReadPropertyBag(propertyBag, Resources.CacheEnabledPropertyName);                if (data != null &&                    data is bool)                {                    cacheEnabled = (bool)data;                }                data = ReadPropertyBag(propertyBag, Resources.PolicyPropertyName);                if (data != null &&                    data is string)                {                    policy = data as string;                }            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.BrokerPipelineComponentName, ex);                if (traceEnabled)                {                    TraceHelper.WriteLine(ex.Message, EventLogEntryType.Error);                }                throw;            }        }        /// <summary>        /// Saves the current component configuration into the property bag.        /// </summary>        /// <param name="pb">Configuration property bag.</param>        /// <param name="fClearDirty">Not used.</param>        /// <param name="fSaveAllProperties">Not used.</param>        public void Save(IPropertyBag propertyBag, Boolean clearDirty, Boolean saveAllProperties)        {            try            {                WritePropertyBag(propertyBag, Resources.BufferSizePropertyName, (object)bufferSize);                WritePropertyBag(propertyBag, Resources.ThresholdSizePropertyName, (object)thresholdSize);                WritePropertyBag(propertyBag, Resources.TraceEnabledPropertyName, (object)traceEnabled);                WritePropertyBag(propertyBag, Resources.CacheEnabledPropertyName, (object)cacheEnabled);                WritePropertyBag(propertyBag, Resources.PolicyPropertyName, (object)policy);            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.BrokerPipelineComponentName, ex);                if (traceEnabled)                {                    TraceHelper.WriteLine(ex.Message, EventLogEntryType.Error);                }                throw;            }        }        /// <summary>        /// Reads property value from property bag.        /// </summary>        /// <param name="pb">Property bag.</param>        /// <param name="propName">Name of property.</param>        /// <returns>Value of the property.</returns>        private object ReadPropertyBag(IPropertyBag propertyBag, string propertyName)        {            object val = null;            try            {                propertyBag.Read(propertyName, out val, 0);            }            catch (ArgumentException)            {                return val;            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.BrokerPipelineComponentName, ex);                TraceHelper.WriteLineIf(traceEnabled, ex.Message, EventLogEntryType.Error);                throw;            }            return val;        }        /// <summary>        /// Writes property values into a property bag.        /// </summary>        /// <param name="pb">Property bag.</param>        /// <param name="propName">Name of property.</param>        /// <param name="val">Value of property.</param>        private void WritePropertyBag(IPropertyBag propertyBag, string propertyName, object value)        {            try            {                propertyBag.Write(propertyName, ref value);            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.BrokerPipelineComponentName, ex);                TraceHelper.WriteLineIf(traceEnabled, ex.Message, EventLogEntryType.Error);                throw;            }        }        #endregion        #region IComponentUI        /// <summary>        /// Component icon to use in BizTalk Editor.        /// </summary>        [Browsable(false)]        public IntPtr Icon        {            get            {                return Resources.Icon.Handle;            }        }        /// <summary>        /// The Validate method is called by the BizTalk Editor during the build         /// of a BizTalk project.        /// </summary>        /// <param name="obj">Project system.</param>        /// <returns>        /// A list of error and/or warning messages encounter during validation        /// of this component.        /// </returns>        public IEnumerator Validate(object obj)        {            IEnumerator enumerator = null;            ArrayList list = new ArrayList();            try            {                if (bufferSize <= 0)                {                    list.Add(Resources.BufferSizeCannotBeLessOrEqualToZeroBytesMessage);                }                if (thresholdSize <= 0)                {                    list.Add(Resources.ThresholdSizeCannotBeLessOrEqualToZeroBytesMessage);                }                if (list.Count > 0)                {                    enumerator = list.GetEnumerator();                }            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.TracePipelineComponentName, ex);                TraceHelper.WriteLineIf(traceEnabled, ex.Message, EventLogEntryType.Error);            }            return enumerator;        }        #endregion        #region Private Methods        /// <summary>        /// Invokes the BRE policy and promotes context properties defined by the policy to         /// determine the subscribers of the actual message based on its content and context.        /// </summary>        /// <param name="stream">The original data stream of the message body part.</param>        /// <param name="messageContext">The message context</param>        /// <param name="context">The pipeline context.</param>        private void FindSubscribers(Stream stream, ref IBaseMessageContext messageContext, IPipelineContext context)        {            try            {                // Check that the input data are not null                if (stream != null &&                    messageContext != null &&                    context != null)                {                    List<ContextProperty> subscriberList = null;                    string messageType = null;                    // If cache is enabled, the component reads the MessageType from the context                    // and try to see if an entry for the actual message type is already contained in the                     // subscriberDictionary. If yes, the component retrieves the list of context properties                    // associated to the message type (one for each subscriber) and promotes them.                    // Note: the cache can be used only when the following requirements are met:                    //  - The pipeline component is place in the Validate or ResolveParty                     //    stage of a receive pipeline                    //  - The receive pipeline contains at least a disassembler component that                     //    determines and promotes the MessageType                    if (cacheEnabled)                    {                                                messageType = messageContext.Read("MessageType",                                                "https://schemas.microsoft.com/BizTalk/2003/system-properties") as string;                        if (!string.IsNullOrEmpty(messageType) &&                            subscriberDictionary.ContainsKey(messageType))                        {                            lock (subscriberDictionary)                            {                                if (subscriberDictionary.TryGetValue(messageType, out subscriberList))                                {                                    for (int i = 0; i < subscriberList.Count; i++)                                    {                                        messageContext.Promote(subscriberList[i].Name,                                                               subscriberList[i].Namespace,                                                               subscriberList[i].Value);                                    }                                    return;                                }                            }                        }                    }                                        object value = null;                    string name = null;                    string nameSpace = null;                    string[] parts = null;                    string key = null;                    // Create and populate a ContextDictionary object with context properties                    ContextDictionary contextDictionary = new ContextDictionary(traceEnabled);                    for (int i = 0; i < messageContext.CountProperties; i++)                    {                        value = messageContext.ReadAt(i, out name, out nameSpace);                        contextDictionary.Add(string.Format("{0}#{1}", nameSpace, name), value);                    }                    // Determine the DocumentType of the current message                    string documentType = string.Empty;                    string schemaStrongName = messageContext.Read("SchemaStrongName",                                           "https://schemas.microsoft.com/BizTalk/2003/system-properties") as string;                    if (!string.IsNullOrEmpty(schemaStrongName))                    {                        documentType = schemaStrongName.Split(',').ElementAtOrDefault<string>(0);                    }                    XmlTextReader reader = new XmlTextReader(stream);                    // Create a TypedXmlDocument object to pass the message content to the BRE policy                    TypedXmlDocument document = new TypedXmlDocument(documentType, reader);                    // Invoke the BRE policy                    using (Policy brePolicy = new Policy(policy))                    {                        brePolicy.Execute(new object[] { document, contextDictionary});                    }                    // If the BRE policy added properties to the ToPromote collection of                     // the ContextDictionary object, the method promotes these properties.                     // If cache is enabled, the code saves the couple (MessageType, list of                     // context properties to promote) in the subscriberDictionary static collection.                    if (contextDictionary.ToPromote != null &&                        contextDictionary.ToPromote.Count > 0)                    {                        if (cacheEnabled)                        {                            subscriberList = new List<ContextProperty>();                        }                        for (int i = 0; i < contextDictionary.ToPromote.Count; i++)                        {                            key = contextDictionary.ToPromote[i];                            parts = key.Split('#');                            if (parts != null &&                                parts.Length == 2 &&                                !string.IsNullOrEmpty(parts[0]) &&                                !string.IsNullOrEmpty(parts[1]) &&                                contextDictionary.ContainsKey(key))                            {                                if (cacheEnabled)                                {                                    subscriberList.Add(new ContextProperty(parts[1],                                                                            parts[0],                                                                            contextDictionary[key]));                                }                                messageContext.Promote(parts[1], parts[0], contextDictionary[key]);                            }                        }                        if (cacheEnabled &&                            !string.IsNullOrEmpty(messageType) &&                            !subscriberDictionary.ContainsKey(messageType) &&                            subscriberList.Count > 0)                        {                            lock (subscriberDictionary)                            {                                if (!subscriberDictionary.ContainsKey(messageType))                                {                                    try                                    {                                        subscriberDictionary.Add(messageType, subscriberList);                                    }                                    catch (Exception)                                    {                                    }                                }                            }                        }                    }                }            }            catch (Exception ex)            {                ExceptionHelper.HandleException(Resources.BrokerPipelineComponentName, ex);                TraceHelper.WriteLineIf(traceEnabled,                                        context,                                        ex.Message,                                        EventLogEntryType.Error);                throw;            }        }        #endregion    }    public class ContextProperty    {        #region Private Fields        private string name;        private string nameSpace;        private object propertyValue;        #endregion        #region Public Constructors        public ContextProperty()        {            this.name = null;            this.nameSpace = null;            this.propertyValue = null;        }        public ContextProperty(string name, string nameSpace, object propertyValue)        {            this.name = name;            this.nameSpace = nameSpace;            this.propertyValue = propertyValue;        }        #endregion        #region Public Properties        public string Name        {            get            {                return this.name;            }            set            {                this.name = value;            }        }        public string Namespace        {            get            {                return this.nameSpace;            }            set            {                this.nameSpace = value;            }        }        public object Value        {            get            {                return this.propertyValue;            }            set            {                this.propertyValue = value;            }        }        #endregion    }}

 

Testing the Solution

To test the application, you can proceed as follows:

-
Build, deploy and configure the OrderedDeliveryBroker solution.

-
Open the Test View as illustrated in the picture below.

-
Run the SendMessage unit test. The code of the test method creates a blend of Cd, Dvd and BluRay messages and assign an increasing Id to the documents. This allows to verify that messages are received in the same chronological order with which they are submitted to the MSMQ queue.

UnitTest

-
Open the SQL Server Management Studio and execute a query to verify that the SupplierA and SupplierB tables contain the expected data in the right chronological order.

SQLQuery

Conclusions

In this article we have seen how to create a broker pipeline component and Business Rule Engine policy to dispatch messages to a pool of convoy orchestrations. Here you can download the companion code for this article. As always, you feedbacks are more than welcome!