A custom BizTalk 2004 disassembling pipeline component (3)

Part 1 Part 2 Part 3

The real work happens here. This is the implementation of IDisassemblerComponent. At this point, it is probably useful to review the definition of this interface at https://msdn.microsoft.com/library/en-us/sdk/htm/frlrfmicrosoftbiztalkcomponentinteropidisassemblercomponentmemberstopic.asp?frame=true. Basically, the messaging engine calls Disassemble() and then GetNext() until GetNext() returns null. One input message can then be disassembled into more than one message.

The approach here is to remember all tables and their name/value pairs into a structure in memory. Then when GetNext() is called, we rebuild the XML document we need. Observe that we maintain the original's message context's by copying the properties. This way, we claim that we actually received that XML message. If we did not copy the properties, we would have received an error message at runtime “this message has no subscription“.

         

  public IBaseMessage GetNext(IPipelineContext pContext)

  {

     IBaseMessage newMsg = null;

     // We have something to return if there was at least one table built

     if ((tablesArray != null) && (tablesArray.Count >= 1))

     {

        // Creates a new message

        newMsg = pContext.GetMessageFactory().CreateMessage();

        // Get a new stream so we can prepare the XML and connect it to an XMLTextWriter

        MemoryStream outputStream = new MemoryStream();

        XmlTextWriter xmlWriter = new XmlTextWriter(outputStream, Encoding.UTF8);

        // Prepare the message: We combine all tables into one XML document

        // like the following:

        // <Tables>

        // <Table name = "<name>">

        // <Field name = "<field_name>" value="<fieldvalue>" />

        // [...]

        // </Table>

        // [...]

        // </Tables>

        xmlWriter.Formatting = Formatting.Indented;

        xmlWriter.WriteStartDocument(true);

        xmlWriter.WriteStartElement("", "Tables", "");

        foreach (object entry in tablesArray)

        {

           Table tableEntry = (Table) entry;

           xmlWriter.WriteStartElement("", "Table", "");

           xmlWriter.WriteAttributeString("name", tableEntry.Name);

           // Emit all name/value pairs

           foreach (object key in tableEntry.Keys)

           {

              xmlWriter.WriteStartElement("", "Field", "");

              xmlWriter.WriteAttributeString("name", key.ToString());

              xmlWriter.WriteAttributeString("value", tableEntry[(string) key].ToString());

              xmlWriter.WriteEndElement();

           }

                             

           xmlWriter.WriteEndElement();

        }

        // Close the document

        xmlWriter.WriteEndElement();

        xmlWriter.WriteEndDocument();

        xmlWriter.Flush();

        // Rewind the stream so it is ready for use by the messaging engine

        outputStream.Seek(0, SeekOrigin.Begin);

        // Add one part to it. This part will be the "body"

        IBaseMessagePart newMsgPart = pContext.GetMessageFactory().CreateMessagePart();

        newMsgPart.Charset = "UTF-8";

        newMsgPart.ContentType = "text/xml";

        newMsgPart.Data = outputStream;

        newMsg.AddPart("body", newMsgPart, true);

        // Copy the original message context to the newly created message

        for (int iProp = 0; iProp < originalMsgContext.CountProperties; iProp++)

        {

          string strName;

          string strNSpace;

          object val = originalMsgContext.ReadAt(iProp, out strName, out strNSpace);

          // If the property has been promoted, respect the settings

          if (originalMsgContext.IsPromoted(strName, strNSpace))

              newMsg.Context.Promote(strName, strNSpace, val);

          else

              newMsg.Context.Write(strName, strNSpace, val);

        }

        // There is a new stream so let's make it visible to the resourceTracker

        pContext.ResourceTracker.AddResource(outputStream);

        // We returned this message so drop our internal table representation

        tablesArray = null;

    }

    return newMsg;

  }

 

Disassembling is shown below. Two important things here: we clone the original's message context and we also use the “SetErrorInfo()“ method to report errors when disassembling this message.

  /// <summary>

  /// Called once per incoming document by the Messaging engine.

  /// </summary>

  /// <param name="pContext">IPipelineContext: context for this pipeline.</param>

  /// <param name="pInMsg">IBaseMessage: Base message received.</param>

  public void Disassemble(IPipelineContext pContext, IBaseMessage pInMsg)

  {

     // Consider this document only if it has parts

     if (pInMsg.PartCount >= 1)

     {

        IBaseMessagePart bodyPart = pInMsg.BodyPart;

                       

        // Clone the message context so we can associate it with the new message later

        originalMsgContext = PipelineUtil.CloneMessageContext(pInMsg.Context);

        // Consider this only if this is not null

        if (bodyPart != null)

        {

          Stream docStream = bodyPart.GetOriginalDataStream();

          string lineRead = null;

          long lineNum = 1;

          // We got the original data stream (not a copy) so rewind the stream

          docStream.Seek(0, SeekOrigin.Begin);

          // Use a stream reader to facilitate reading line by line

          StreamReader streamReader = new StreamReader(docStream);

          // Parse all tables and all values in betwen them

          lineRead = streamReader.ReadLine();

          while (lineRead != null)

          {

       // Strip empty lines

             if (regExpBlankLine.Match(lineRead).Success)

             {

               // Did we encounter a Table definition entry?

               Match match = regExpTableParser.Match(lineRead);

               if (match.Success)

               {

                 // Create a new table structure

                 tablesArray.Add(new Table(match.Groups["TableName"].Value));

               }

               else

               {

                 // We are parsing a name/value pair for the current table

                 Match fieldMatch = regExpLineParser.Match(lineRead);

                 if (fieldMatch.Success)

                 {

                   string fieldName = fieldMatch.Groups["FieldName"].Value;

                   string fielSep = fieldMatch.Groups["Separator"].Value;

                   // NOTE: The value field is not exactly what one might want

                   // for cases like "Status@C Completed". "C Completed" will be

                   // seen as the value while one might want to have "Completed".

                   // This could be fixed easily by changing the regualr expression.

                   string fieldVal = fieldMatch.Groups["FieldValue"].Value;

                   // Insert into the current table

                   ((Table) tablesArray[tablesArray.Count - 1])[fieldName] = fieldVal;

                 }

                 else

                 {

                   // Wrong file format: report error.

                   string errMsg = resMgr.GetString(wrongFlatFileRsrcID).Replace("%{LineNum}%",

                                                                                 lineNum.ToString());

                   pInMsg.SetErrorInfo(new Exception(errMsg));

                   tablesArray = null;

                   break;

                 }

              }

           }

           // Get the next line

           lineRead = streamReader.ReadLine();

           lineNum++;

        }

      }

    }

  }

IPropertyBag's implemetation is shown below. Since we do not expose any properties, the implementation is a no-operation.

          

  /// <summary>

  /// Initializes a new Property Bag.

  /// </summary>

  public void InitNew()

  {

  }

  /// <summary>

  /// Returns the ClassID of this component for usage from native code.

  /// </summary>

  /// <param name="classID">Guid: ClassID of the component.</param>

  public void GetClassID(out Guid classID)

  {

    classID = GetType().GUID;

  }

  /// <summary>

  /// Load data from a persisted object into a new object.

  /// </summary>

  /// <param name="propertyBag">Property bag to load.</param>

  /// <param name="errorLog">Error log.</param>

  public void Load(IPropertyBag propertyBag, int errorLog)

  {

  }

  /// <summary>

  /// Saves properties to the property bag.

  /// </summary>

  /// <param name="propertyBag">The proeprty bag to manipulate.</param>

  /// <param name="clearDirty">Indicates if we should reset our "isDirty" flag.</param></param>

  /// <param name="saveAllProperties">Should all properties be saved?</param>

  public void Save(IPropertyBag propertyBag, bool clearDirty, bool saveAllProperties)

  {

  }