How To Boost Message Transformations Using the XslCompiledTransform class Extended


Problem Statement

Some of you posted the following feedback regarding my post How To Boost Message Transformations Using the XslCompiledTransform class:

"Very nice post. I just have a question. How would you handle multiple input messages in a map? I have a map that has 2 input source messages that i would like to unit test. The BTS expression shape has transform(outmsg) = map(msg1, msg2), but i have not yet found a way to do it in a C# class."

Solution 

Well, before answering to this question, I'll briefly explain how BizTalk Server handles maps with multiple input messages. BizTalk Server uses a trick to manage this situation. In fact, when you create a transformation map with multiple source document, BizTalk uses an envelope to wrap the individual input messages. For those of you who love to disassemble BizTalk code with Reflector, the envelope is created at runtime by the private class called CompositeStreamReader that can be found within the assembly Microsoft.XLANGs.Engine. In particular, the ConstructCompositeOutline method uses the code reported in the table below

public void ConstructCompositeOutline()
{
    ...
    XmlTextWriter writer = new XmlTextWriter(this.outlineStream, ...);
    writer.WriteStartElement("Root", "http://schemas.microsoft.com/BizTalk/2003/aggschema");
    for (int i = 0; i < this.readerCount; i++)
    {
        writer.WriteStartElement("InputMessagePart_" + i.ToString(), "");
        writer.WriteComment("_");
        writer.WriteEndElement();
    }
    writer.WriteEndElement();
    writer.Flush();
    ...
}

 

to create an envelope which targetNamespace is equal to 'http://schemas.microsoft.com/BizTalk/2003/aggschema' and that contains as many InputMessagePart elements as the incoming documents to the map.

<ns0:Root xmlns:ns0='http://schemas.microsoft.com/BizTalk/2003/aggschema'>
    <InputMessagePart_0>
        -
    </InputMessagePart_0>
    <InputMessagePart_1>
        -
    </InputMessagePart_1>
</ns0:Root>

 

Therefore, I decided to extend the code of my classes XslCompiledTransformHelper and XslTransformHelper to handle the case of maps with multiple input messages. In particular, I developed a new class called CompositeStream to wrap an array of Stream objects, one for each input message to the map, an return the above envelope. The implementation of this custom class is quite smart, because instead of copying the bytes of the input streams within a new buffer or a new stream, the Read method just makes up the content of the envelope with the data of the inbound streams to return a composite message with the format expected by the map. When the inbound streams are significantly large,  this approach allows saving time and memory for copying data from the inbound streams to a new object, regardless if this latter is a buffer or a stream. The code of the CompositeStream class is shown in the table below:

CompositeStream Class

 

#region Copyright
//-------------------------------------------------
// Author:  Paolo Salvatori
// Email:   paolos@microsoft.com
// History: 2010-04-07 Created
//-------------------------------------------------
#endregion

#region Using References
using System;
using System.IO;
using System.Text;
using System.Collections.Generic;
using System.Configuration;
using System.Xml;
using System.Xml.Xsl;
using System.Diagnostics;
using Microsoft.XLANGs.BaseTypes;
using Microsoft.XLANGs.Core;
using Microsoft.BizTalk.Streaming;
using Microsoft.BizTalk.CAT.Samples.DynamicTransforms.Helpers.Properties;
#endregion

namespace Microsoft.BizTalk.CAT.Samples.DynamicTransforms.Helpers
{
    public class CompositeStream : Stream
    {
        #region Private Types
        enum State
        {
            Start,
            Overflow,
            Stream,
            End
        }
        #endregion

        #region Private Constants
        private const string DefaultPrefix = "babo";
        private const string StartRoot = "<{0}:Root xmlns:{0}='http://schemas.microsoft.com/BizTalk/2003/aggschema'>";
        private const string EndRoot = "</{0}:Root>";
        private const string StartInputMessagePart = "<InputMessagePart_{0}>";
        private const string EndInputMessagePart = "</InputMessagePart_{0}>";
        #endregion

        #region Private Fields
        private int currentStream = 0;
        private int currentIndex = 0;
        private string prefix;
        private State state;
        private byte[] overflowBuffer;
        private Stream[] streams;
        private bool endOfDocument = false;
        #endregion

        #region Public Constructors
        public CompositeStream(Stream[] streams)
        {
            this.prefix = DefaultPrefix;
            this.streams = streams;
            this.state = State.Start;
        }

        public CompositeStream(Stream[] streams, string prefix)
        {
            this.prefix = prefix;
            this.streams = streams;
            this.state = State.Start;
        }
        #endregion

        #region Public Properties
        public override bool CanRead
        {
            get 
            {
                return true;
            }
        }

        public override bool CanSeek
        {
            get
            {
                return true;
            }
        }

        public override bool CanTimeout
        {
            get
            {
                return false;
            }
        }

        public override bool CanWrite
        {
            get
            {
                return false;
            }
        }

        public override long Length
        {
            get
            {
                int prefixLength = prefix.Length;
                long length = 76 + 3 * prefixLength;
                if (streams != null &&
                    streams.Length > 0)
                {
                    string index;
                    for (int i = 0; i < streams.Length; i++)
                    {
                        if (streams[i].CanSeek)
                        {
                            index = i.ToString();
                            length += streams[i].Length + 39 + 2 * index.Length;
                        }
                        else
                        {
                            throw new NotImplementedException(); 
                        }
                    }
                }
                return length;
            }
        }

        public override long Position
        {
            get 
            { 
                if (state == State.Start)
                {
                    return 0L;
                }
                else
                {
                    throw new NotImplementedException(); 
                }
            }
            set
            {
                if (value == 0L)
                {
                    ResetStream();
                }
                else
                {
                    throw new NotImplementedException();
                } 
            }
        }
        #endregion

        #region Public Methods
        public override int Read(byte[] buffer, int offset, int count)
        {
            int bytesWritten = 0;
            int bytesRead = 0;
            int length = 0;
            byte[] localBuffer;
            StringBuilder builder;

            if (state == State.End)
            {
                return 0;
            }
            while (bytesWritten < count &&
                   state != State.End)
            {
                switch (state)
                {
                    case State.Start:
                        builder = new StringBuilder(128);
                        builder.AppendFormat(StartRoot, prefix);
                        if (streams != null &&
                            streams.Length > 0)
                        {
                            builder.AppendFormat(StartInputMessagePart, currentStream);
                        }
                        localBuffer = Encoding.UTF8.GetBytes(builder.ToString());

                        if (localBuffer.Length <= count)
                        {
                            Array.Copy(localBuffer, 0, buffer, offset, localBuffer.Length);
                            bytesWritten += localBuffer.Length;
                            offset += bytesWritten;
                            state = State.Stream;
                        }
                        else
                        {
                            Array.Copy(localBuffer, 0, buffer, offset, count);
                            overflowBuffer = localBuffer;
                            currentIndex = count;
                            state = State.Overflow;
                            return count;
                        }
                        break;
                    case State.Overflow:
                        length = overflowBuffer.Length - currentIndex;
                        if (length <= count)
                        {
                            Array.Copy(overflowBuffer, currentIndex, buffer, offset, length);
                            bytesWritten += length;
                            offset += length;
                            overflowBuffer = null;
                            currentIndex = 0;
                            if (endOfDocument)
                            {
                                state = State.End;
                            }
                            else
                            {
                                state = State.Stream;
                            }
                        }
                        else
                        {
                            Array.Copy(overflowBuffer, currentIndex, buffer, offset, count);
                            currentIndex += count;
                            return count;
                        }
                        break;
                    case State.Stream:
                        length = count - bytesWritten;
                        bytesRead = streams[currentStream].Read(buffer, offset, length);
                        bytesWritten += bytesRead;
                        offset += bytesRead;
                        if (bytesWritten < count)
                        {
                            builder = new StringBuilder(128);
                            builder.AppendFormat(EndInputMessagePart, currentStream);
                            currentStream++;
                            if (currentStream < streams.Length)
                            {
                                builder.AppendFormat(StartInputMessagePart, currentStream);
                                localBuffer = Encoding.UTF8.GetBytes(builder.ToString());
                            }
                            else
                            {
                                builder.AppendFormat(EndRoot, prefix);
                                localBuffer = Encoding.UTF8.GetBytes(builder.ToString());
                                endOfDocument = true;
                            }
                            if (localBuffer.Length <= count - bytesWritten)
                            {
                                Array.Copy(localBuffer, 0, buffer, offset, localBuffer.Length);
                                bytesWritten += localBuffer.Length;
                                offset += localBuffer.Length;
                                if (endOfDocument)
                                {
                                    if (bytesWritten <= count)
                                    {
                                        state = State.End;
                                    }
                                    return bytesWritten;
                                }
                                break;
                            }
                            else
                            {
                                length = count - bytesWritten;
                                Array.Copy(localBuffer, 0, buffer, offset, length);
                                overflowBuffer = localBuffer;
                                currentIndex = length;
                                state = State.Overflow;
                                return count;
                            }
                        }
                        else
                        {
                            return count;
                        }
                        break;                        
                }
            }
            return bytesWritten;
        }

        public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
        {
            return base.BeginRead(buffer, offset, count, callback, state);
        }

        public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
        {
            return base.BeginWrite(buffer, offset, count, callback, state);
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            if (offset == 0 &&
                origin == SeekOrigin.Begin)
            {
                ResetStream();
            }
            else
            {
                throw new NotImplementedException();
            }
            return 0;
        }

        public override void Flush()
        {
            throw new NotImplementedException();
        }
        #endregion

        #region Private Methods
        private void ResetStream()
        {
            for (int i = 0; i < streams.Length; i++)
            {
                if (streams[i].CanSeek)
                {
                    streams[i].Seek(0, SeekOrigin.Begin);
                }
                else
                {
                    throw new NotImplementedException(); 
                }
            }
            state = State.Start;
            endOfDocument = false;
            currentStream = 0;
            currentIndex = 0;
        }
        #endregion
    }
}

 

Then I extended the XslCompiledTransformHelper class with a set of new methods that accept as parameter an array of objects of type Stream or XLANGMessage and use an instance of the CompositeStream class to apply a transformation map to these latter.

XslCompiledTransformHelper Class

#region Copyright
//-------------------------------------------------
// Author:  Paolo Salvatori
// Email:   paolos@microsoft.com
// History: 2010-01-26 Created
//-------------------------------------------------
#endregion

#region Using References
using System;
using System.IO;
using System.Text;
using System.Collections.Generic;
using System.Configuration;
using System.Xml;
using System.Xml.Xsl;
using System.Xml.XPath;
using System.Diagnostics;
using Microsoft.XLANGs.BaseTypes;
using Microsoft.XLANGs.Core;
using Microsoft.BizTalk.Streaming;
using Microsoft.BizTalk.CAT.Samples.DynamicTransforms.Helpers.Properties;
#endregion

namespace Microsoft.BizTalk.CAT.Samples.DynamicTransforms.Helpers
{
    public class XslCompiledTransformHelper
    {
... public static XLANGMessage Transform(XLANGMessage[] messageArray, int[] partIndexArray, string mapFullyQualifiedName, string messageName, string partName, bool debug, int bufferSize, int thresholdSize) { try { if (messageArray != null && messageArray.Length > 0) { Stream[] streamArray = new Stream[messageArray.Length]; for (int i = 0; i < messageArray.Length; i++) { streamArray[i] = messageArray[i][partIndexArray[i]].RetrieveAs(typeof(Stream)) as Stream; } Stream response = Transform(streamArray, mapFullyQualifiedName, debug, bufferSize, thresholdSize); CustomBTXMessage customBTXMessage = null; customBTXMessage = new CustomBTXMessage(messageName, Service.RootService.XlangStore.OwningContext); customBTXMessage.AddPart(string.Empty, partName); customBTXMessage[0].LoadFrom(response); return customBTXMessage.GetMessageWrapperForUserCode(); } } catch (Exception ex) { ExceptionHelper.HandleException(Resources.XslCompiledTransformHelper, ex); TraceHelper.WriteLineIf(debug, null, ex.Message, EventLogEntryType.Error); throw; } finally { if (messageArray != null && messageArray.Length > 0) { for (int i = 0; i < messageArray.Length; i++) { if (messageArray[i] != null) { messageArray[i].Dispose(); } } } } return null; } public static Stream Transform(Stream[] streamArray, string mapFullyQualifiedName) { return Transform(streamArray, mapFullyQualifiedName, false, DefaultBufferSize, DefaultThresholdSize); } public static Stream Transform(Stream[] streamArray, string mapFullyQualifiedName, bool debug) { return Transform(streamArray, mapFullyQualifiedName, debug, DefaultBufferSize, DefaultThresholdSize); } public static Stream Transform(Stream[] streamArray, string mapFullyQualifiedName, bool debug, int bufferSize, int thresholdSize) { try { MapInfo mapInfo = GetMapInfo(mapFullyQualifiedName, debug); if (mapInfo != null) { CompositeStream compositeStream = null; try { VirtualStream virtualStream = new VirtualStream(bufferSize, thresholdSize); compositeStream = new CompositeStream(streamArray); XmlTextReader reader = new XmlTextReader(compositeStream); mapInfo.Xsl.Transform(reader, mapInfo.Arguments, virtualStream); virtualStream.Seek(0, SeekOrigin.Begin); return virtualStream; } finally { if (compositeStream != null) { compositeStream.Close(); } } } } catch (Exception ex) { ExceptionHelper.HandleException(Resources.XslCompiledTransformHelper, ex); TraceHelper.WriteLineIf(debug, null, ex.Message, EventLogEntryType.Error); throw; } return null; } ... } }

 

In a similar way, I extended the XslTransformHelper class to support maps with multiple input documents. For brevity, I omitted the code of this latter, but you can download the new version of both classes here.

Test

To test the new methods exposed by the classes XslCompiledTransformHelper and XslTransformHelper, I created three XML schemas:

  • Credentials
  • Address
  • Customer

In particular, the Credentials and Address schemas define two complex types used to build the Customer schema. Then I created a transformation map (see the picture below) called AddressAndCredentialsToCustomer that accepts 2 input messages, the first of type Credentials and the second of type Address, and returns a document of type Customer.

MultiSourceMap

Finally I created 2 Unit Test methods called:

  • TestXslTransformHelperWithMultipleInputs
  • TestXslCompiledTransformHelperWithMultipleInputs

to test the CompositeStream and the new methods exposed by the the classes XslCompiledTransformHelper and XslTransformHelper. You can change the entries contained in the appSettings section of the App.Config file within the UnitAndLoadTests project to test the 2 classes against your documents and multi-input-message maps.

Conclusions

I updated the original post to reflect the extensions I made. Here you can find a new version of my helper classes and the artifacts (schemas, maps, unit tests) I used to test them.

Note: I spent less than one day to write and test the new code. In particular, I conducted a basic code coverage of the CompositeStream class, but I didn’t test it with more than 2 streams or with large messages. If you find any error, please send me a repro and I’ll do my best, time permitting, to fix the code as soon as possible. Instead, should you find an error in my code and decide to sort it out by yourselves, please let me have the fixed version. 😉

Comments (0)

Skip to main content