BizTalk Custom Receive Pipeline Component to Batch Messages

Introduction:

Pipelines in BizTalk are used to do specific processing on incoming and outgoing messages. Each pipeline has some stages in which stage specific components can be employed to do certain tasks.

“Disassemble” is the second stage of the receive pipeline which is primarily used to disassemble incoming messages, validate schema and promote properties. To disassemble (break) messages it uses an envelope schema and it breaks the message record-wise. There is no provision to break a set of records (batches).

To break messages as per a configurable batch size, we need to write a custom receive pipeline. For example -

If batch size is configured to 5 and incoming message is -

<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>

<data1><name>name1</name></data1>

<data1><name>name2</name></data1>

<data1><name>name3</name></data1>

<data1><name>name4</name></data1>

<data1><name>name5</name></data1>

<data1><name>name6</name></data1>

<data1><name>name7</name></data1>

<data1><name>name8</name></data1>

<data1><name>name9</name></data1>

<data1><name>name10</name></data1>

</ns0:sample>

Custom receive pipeline will break this message into 2 messages each having 5 records –

<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>

<data1><name>name1</name></data1>

<data1><name>name2</name></data1>

<data1><name>name3</name></data1>

<data1><name>name4</name></data1>

<data1><name>name5</name></data1>

</ns0:sample>

<ns0:sample xmlns:ns0='https://MGSIBREDemo.LoanRequest'>

<data1><name>name6</name></data1>

<data1><name>name7</name></data1>

<data1><name>name8</name></data1>

<data1><name>name9</name></data1>

<data1><name>name10</name></data1>

</ns0:sample>

Sample Code:

Following is a high level guidance and code snippet (.net 2.0 based) to write such pipeline -

 

1. Create a class library project.

 

2. Add reference to Microsoft.BizTalk.Pipeline.dll.

 

3. Rename default class1.cls to DisassemblePipeline.cs.

 

4. Use following code

 

using System;

using System.Collections.Generic;

using System.Text;

using System.IO;

using System.Xml;

using System.ComponentModel;

using Microsoft.BizTalk.Component.Interop;

using Microsoft.BizTalk.Message.Interop;

namespace MessageBatchPipelineCompoent

{

[ComponentCategory(CategoryTypes.CATID_PipelineComponent)]

[ComponentCategory(CategoryTypes.CATID_DisassemblingParser)]

[System.Runtime.InteropServices.Guid("6118B8F0-8684-4ba2-87B4-8336D70BD4F7")]

public class DisassemblePipeline : IBaseComponent,

IDisassemblerComponent,

IComponentUI,

IPersistPropertyBag

{

//Used to hold disassembled messages

private System.Collections.Queue qOutputMsgs = new System.Collections.Queue();

private string systemPropertiesNamespace = @"https://schemas.microsoft.com/BizTalk/2003/system-properties";

/// <summary>

/// Batch size used to batch records

/// </summary>

private int _BatchSize;

public int BatchSize

{

get { return _BatchSize; }

set { _BatchSize = value; }

}

/// <summary>

/// Default constructor

/// </summary>

public DisassemblePipeline()

{

}

/// <summary>

/// Description of pipeline

/// </summary>

public string Description

{

get

{

return "Component to batch (break) large message into multiple small messages";

}

}

/// <summary>

/// Name of pipeline

/// </summary>

public string Name

{

get

{

return "MessageBatchComponent";

}

}

/// <summary>

/// Pipeline version

/// </summary>

public string Version

{

get

{

return "1.0.0.0";

}

}

 

/// <summary>

/// Returns collecton of errors

/// </summary>

public System.Collections.IEnumerator Validate(object projectSystem)

{

return null;

}

/// <summary>

/// Returns icon of pipeline

/// </summary>

public System.IntPtr Icon

{

get

{

return new System.IntPtr();

}

}

 

/// <summary>

/// Class GUID

/// </summary>

public void GetClassID(out Guid classID)

{

classID = new Guid("ACC3F15A-C389-4a5d-8F8E-2A951CDC4C19");

}

/// <summary>

/// InitNew

/// </summary>

public void InitNew()

{

}

/// <summary>

/// Load property from property bag

/// </summary>

public void Load(IPropertyBag propertyBag, int errorLog)

{

object val = null;

try

{

propertyBag.Read("BatchSize", out val, 0);

}

catch (Exception ex)

{

throw new ApplicationException("Error reading propertybag: " + ex.Message);

}

if (val != null)

_BatchSize = (int)val;

else

_BatchSize = 1;

}

/// <summary>

/// Write property to property bag

/// </summary>

public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)

{

object val = (object)BatchSize;

propertyBag.Write("BatchSize", ref val);

}

/// <summary>

/// Disassembles (breaks) message into small messages as per batch size

/// </summary>

public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)

{

string originalDataString;

try

{

//fetch original message

Stream originalMessageStream = pInMsg.BodyPart.GetOriginalDataStream();

byte[] bufferOriginalMessage = new byte[originalMessageStream.Length];

originalMessageStream.Read(bufferOriginalMessage, 0, Convert.ToInt32(originalMessageStream.Length));

originalDataString = System.Text.ASCIIEncoding.ASCII.GetString(bufferOriginalMessage);

}

catch (Exception ex)

{

throw new ApplicationException("Error in reading original message: " + ex.Message);

}

XmlDocument originalMessageDoc = new XmlDocument();

StringBuilder messageString;

try

{

//load original message

originalMessageDoc.LoadXml(originalDataString);

//fetch namespace and root element

string namespaceURI = originalMessageDoc.DocumentElement.NamespaceURI;

string rootElement = originalMessageDoc.DocumentElement.Name;

//start batching messages

int counter = 0;

messageString = new StringBuilder();

messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");

foreach (XmlNode childNode in originalMessageDoc.DocumentElement.ChildNodes)

{

counter = counter + 1;

if (counter > BatchSize)

{

messageString.Append("</" + rootElement + ">");

//Queue message

CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);

counter = 1;

messageString.Remove(0, messageString.Length);

messageString.Append("<" + rootElement + " xmlns:ns0='" + namespaceURI + "'>");

messageString.Append(childNode.OuterXml);

}

else

{

messageString.Append(childNode.OuterXml);

}

}

messageString.Append("</" + rootElement + ">");

CreateOutgoingMessage(pContext, messageString.ToString(), namespaceURI, rootElement);

}

catch (Exception ex)

{

throw new ApplicationException("Error in writing outgoing messages: " + ex.Message);

}

finally

{

messageString = null;

originalMessageDoc = null;

}

}

/// <summary>

/// Used to pass output messages`to next stage

/// </summary>

public IBaseMessage GetNext(IPipelineContext pContext)

{

if (qOutputMsgs.Count > 0)

return (IBaseMessage)qOutputMsgs.Dequeue();

else

return null;

}

/// <summary>

/// Queue outgoing messages

/// </summary>

private void CreateOutgoingMessage(IPipelineContext pContext, String messageString, string namespaceURI, string rootElement)

{

IBaseMessage outMsg;

try

{

//create outgoing message

outMsg = pContext.GetMessageFactory().CreateMessage();

outMsg.AddPart("Body", pContext.GetMessageFactory().CreateMessagePart(), true);

outMsg.Context.Promote("MessageType", systemPropertiesNamespace, namespaceURI + "#" + rootElement.Replace("ns0:", ""));

byte[] bufferOoutgoingMessage = System.Text.ASCIIEncoding.ASCII.GetBytes(messageString);

outMsg.BodyPart.Data = new MemoryStream(bufferOoutgoingMessage);

qOutputMsgs.Enqueue(outMsg);

}

catch (Exception ex)

{

throw new ApplicationException("Error in queueing outgoing messages: " + ex.Message);

}

}

 

}

}

5. Sign and then build project.

6. Copy MessageBatchPipelineCompoent.dll to C:\Program Files\Microsoft BizTalk Server 2006\Pipeline Components.

7. Pipeline component is now ready to use. In BTS pipeline project, add this pipeline component dll in toolbar and then use in disassemble stage.

 

Usage Scenario

Sometimes in BizTalk orchestrations/messaging, incoming messages come with huge chunks of records where as a part of business process each record is processed individually one by one. Publishing this huge record set in message box and further their processing consumes a lot of resources because orchestration takes a longer time to process each record individually.

Using this custom pipeline, huge messages can be broken into multiple messages of smaller records. And when published in a message box, orchestrations can run in parallel to process more than one message (broken ones) at the same time. Result is better performance and small processing time.

If process area (host) is scaled out, then broken messages are processed in different machines to give more scalable and fast execution.