BizTalk Custom Receive Pipeline Component to Batch Messages


Introduction:


 


Pipelines in BizTalk are used to do specific processing on incoming and outgoing messages. Each pipeline has some stages in which stage specific components can be employed to do certain tasks.


 


“Disassemble” is the second stage of the receive pipeline which is primarily used to disassemble incoming messages, validate schema and promote properties. To disassemble (break) messages it uses an envelope schema and it breaks the message record-wise. There is no provision to break a set of records (batches).


To break messages as per a configurable batch size, we need to write a custom receive pipeline. For example –


If batch size is configured to 5 and incoming message is –


 


<ns0:sample xmlns:ns0=’http://MGSIBREDemo.LoanRequest’>


<data1><name>name1</name></data1>


<data1><name>name2</name></data1>


<data1><name>name3</name></data1>


<data1><name>name4</name></data1>


<data1><name>name5</name></data1>


<data1><name>name6</name></data1>


<data1><name>name7</name></data1>


<data1><name>name8</name></data1>


<data1><name>name9</name></data1>


<data1><name>name10</name></data1>


</ns0:sample>


 


Custom receive pipeline will break this message into 2 messages each having 5 records –


 


<ns0:sample xmlns:ns0=’http://MGSIBREDemo.LoanRequest’>


<data1><name>name1</name></data1>


<data1><name>name2</name></data1>


<data1><name>name3</name></data1>


<data1><name>name4</name></data1>


<data1><name>name5</name></data1>


</ns0:sample>


 


<ns0:sample xmlns:ns0=’http://MGSIBREDemo.LoanRequest’>


<data1><name>name6</name></data1>


<data1><name>name7</name></data1>


<data1><name>name8</name></data1>


<data1><name>name9</name></data1>


<data1><name>name10</name></data1>


</ns0:sample>


 


 


Sample Code:


 


Following is a high level guidance and code snippet (.net 2.0 based) to write such pipeline –


 


1. Create a class library project.


 


2. Add reference to Microsoft.BizTalk.Pipeline.dll.


 


3. Rename default class1.cls to DisassemblePipeline.cs.


 


4. Use following code


 


using System;


using System.Collections.Generic;


using System.Text;


using System.IO;


using System.Xml;


using System.ComponentModel;


using Microsoft.BizTalk.Component.Interop;


using Microsoft.BizTalk.Message.Interop;


namespace MessageBatchPipelineCompoent


{


[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]


[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]


[System.Runtime.InteropServices.Guid(“6118B8F0-8684-4ba2-87B4-8336D70BD4F7”)]


public class DisassemblePipeline : IBaseComponent,


IDisassemblerComponent,


IComponentUI,


IPersistPropertyBag


{


//Used to hold disassembled messages


private System.Collections.Queue qOutputMsgs = new System.Collections.Queue();


private string systemPropertiesNamespace = @”http://schemas.microsoft.com/BizTalk/2003/system-properties”;


/// <summary>


/// Batch size used to batch records


/// </summary>


private int _BatchSize;


public int BatchSize


{


get { return _BatchSize; }


set { _BatchSize = value; }


}


/// <summary>


/// Default constructor


/// </summary>


public DisassemblePipeline()


{


}


/// <summary>


/// Description of pipeline


/// </summary>


public string Description


{


get


{


return “Component to batch (break) large message into multiple small messages”;


}


}


/// <summary>


/// Name of pipeline


/// </summary>


public string Name


{


get


{


return “MessageBatchComponent”;


}


}


/// <summary>


/// Pipeline version


/// </summary>


public string Version


{


get


{


return “1.0.0.0”;


}


}


 


/// <summary>


/// Returns collecton of errors


/// </summary>


public System.Collections.IEnumerator Validate(object projectSystem)


{


return null;


}


/// <summary>


/// Returns icon of pipeline


/// </summary>


public System.IntPtr Icon


{


get


{


return new System.IntPtr();


}


}


 


/// <summary>


/// Class GUID


/// </summary>


public void GetClassID(out Guid classID)


{


classID = new Guid(“ACC3F15A-C389-4a5d-8F8E-2A951CDC4C19”);


}


/// <summary>


/// InitNew


/// </summary>


public void InitNew()


{


}


/// <summary>


/// Load property from property bag


/// </summary>


public void Load(IPropertyBag propertyBag, int errorLog)


{


object val = null;


try


{


propertyBag.Read(“BatchSize”, out val, 0);


}


catch (Exception ex)


{


throw new ApplicationException(“Error reading propertybag: “ + ex.Message);


}


if (val != null)


_BatchSize = (int)val;


else


_BatchSize = 1;


}


/// <summary>


/// Write property to property bag


/// </summary>


public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)


{


object val = (object)BatchSize;


propertyBag.Write(“BatchSize”, ref val);


}


/// <summary>


/// Disassembles (breaks) message into small messages as per batch size


/// </summary>


public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)


{


string originalDataString;


try


{


//fetch original message


Stream originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();


byte[] bufferOriginalMessage = new byte[originalMessageStream.Length];


originalMessageStream.Read(bufferOriginalMessage, 0, Convert.ToInt32(originalMessageStream.Length));


originalDataString = System.Text.ASCIIEncoding.ASCII.GetString(bufferOriginalMessage);


}


catch (Exception ex)


{


throw new ApplicationException(“Error in reading original message: “ + ex.Message);


}


XmlDocument originalMessageDoc = new XmlDocument();


StringBuilder messageString;


try


{


//load original message


originalMessageDoc.LoadXml(originalDataString);


//fetch namespace and root element


string namespaceURI = originalMessageDoc.DocumentElement.NamespaceURI;


string rootElement = originalMessageDoc.DocumentElement.Name;


//start batching messages


int counter = 0;


messageString = new StringBuilder();


messageString.Append(“<“ + rootElement + ” xmlns:ns0='” + namespaceURI + “‘>”);


foreach (XmlNode childNode in originalMessageDoc.DocumentElement.ChildNodes)


{


counter = counter + 1;


if (counter > BatchSize)


{


messageString.Append(“</” + rootElement + “>”);


//Queue message


CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);


counter = 1;


messageString.Remove(0, messageString.Length);


messageString.Append(“<“ + rootElement + ” xmlns:ns0='” + namespaceURI + “‘>”);


messageString.Append(childNode.OuterXml);


}


else


{


messageString.Append(childNode.OuterXml);


}


}


messageString.Append(“</” + rootElement + “>”);


CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);


}


catch (Exception ex)


{


throw new ApplicationException(“Error in writing outgoing messages: “ + ex.Message);


}


finally


{


messageString = null;


originalMessageDoc = null;


}


}


/// <summary>


/// Used to pass output messages`to next stage


/// </summary>


public IBaseMessage GetNext(IPipelineContext pContext)


{


if (qOutputMsgs.Count > 0)


return (IBaseMessage)qOutputMsgs.Dequeue();


else


return null;


}


/// <summary>


/// Queue outgoing messages


/// </summary>


private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement)


{


IBaseMessage outMsg;


try


{


//create outgoing message


outMsg = pContext.GetMessageFactory().CreateMessage();


outMsg.AddPart(“Body”, pContext.GetMessageFactory().CreateMessagePart(), true);


outMsg.Context.Promote(“MessageType”, systemPropertiesNamespace, namespaceURI + “#” + rootElement.Replace(“ns0:”, “”));


byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);


outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);


qOutputMsgs.Enqueue(outMsg);


}


catch (Exception ex)


{


throw new ApplicationException(“Error in queueing outgoing messages: “ + ex.Message);


}


}


 


}


}


5. Sign and then build project.


6. Copy MessageBatchPipelineCompoent.dll to C:\Program Files\Microsoft BizTalk Server 2006\Pipeline Components.


7. Pipeline component is now ready to use. In BTS pipeline project, add this pipeline component dll in toolbar and then use in disassemble stage.


 


Usage Scenario


 


Sometimes in BizTalk orchestrations/messaging, incoming messages come with huge chunks of records where as a part of business process each record is processed individually one by one. Publishing this huge record set in message box and further their processing consumes a lot of resources because orchestration takes a longer time to process each record individually.


 


Using this custom pipeline, huge messages can be broken into multiple messages of smaller records. And when published in a message box, orchestrations can run in parallel to process more than one message (broken ones) at the same time. Result is better performance and small processing time.


 


If process area (host) is scaled out, then broken messages are processed in different machines to give more scalable and fast execution.


 

Comments (4)

  1. Anonymous says:

    Why don’t you use envelopes?

  2. brajens says:

    if you use envelop and document schema then you break message per record (as per document schema). This custom pipeline breaks message in batch of records. And this batch size is configurable at design time. if you wish, you can extend this pipleline code to have properties of document and envelop schema and work same as default disassemble component with batch size property available.

  3. Anonymous says:

    i’m not able to call enterprise library 2.0 exception policy inside the custom pipeline component even though i created appdomain in the btsntsvc.config

  4. JigneshSurati says:

    Getting below error after deployment and message is receive at the receive port:

    The published message could not be routed because no subscribers were found. This error occurs if the subscribing orchestration or send port has not been enlisted, or if some of the message properties necessary for subscription evaluation have not been promoted. Please use the Biztalk Administration console to troubleshoot this failure.

    Resolution: Added filter condition on send port BTS.MessageType = bauschlomb.biztalk.bts_debatch_pipeline.debatchschema

    But does not work with Receiving and Sending with Orchestration.