Implementing Adapter Inbound Message Exchange Handler

Scenario: Receive business events from the target system and trigger a method in the WCF Service

The Inbound Handler normalizes the notifications and target-system initiated operations, accepts incoming messages and dispatches them to the interested consumers. It can listens for any communication initiated by the target system. In case of a database adapter, just like Outbound Handler can be used to run various CRUD operations in the database, the Inbound Handler can listen to database table changes through a trigger. Let’s take a scenario in case of SAP adapter. When a sales representative enters a new order into an SAP application, other applications connected to SAP might need to know about that order. The event trigger (provided in native API used to connect to SAP from the adapter) can then be used to fetch the new order and callback into the communicating application. This post provides sample code for implementing inbound behavior with WCF LOB Adapter SDK in the custom WCF-based adapter. This post will be followed by another post that will then show the Adapter Consumer experience using WCF Channel Model and WCF Service Model.

 

Implement Inbound Communication Support

Here are the steps for the Adapter Developer to provide inbound communication support in their adapter.

1) Plan and design

2) If the operation is browsable through Add Adapter Service Reference Visual Studio Plug-In/Consume Adapter Service BizTalk Project Add-In, ensure the inbound operation is defined in the AdapterMetadataBrowseHandler

3) Implement ResolveOperationMetadata (and ResolveTypeMetadata) in AdapterMetadataResolverHandler in order to generate the Service Interface to be used by the Adapter Consumer to provide service implementation

4) Implement the class AdapterInboundHandler

5) Implement the class AdapterInboundReply

Plan and Design

 Adapter Metadata Browse Handler

 

    public class HelloWorldAdapterMetadataBrowseHandler : HelloWorldAdapterHandlerBase, IMetadataBrowseHandler

    {

        #region IMetadataBrowseHandler Members

        public MetadataRetrievalNode[] Browse(string nodeId, int childStartIndex, int maxChildNodes, TimeSpan timeout)

        {

            string context = "Hello";

            // browsing from the root node

            // in this sample we have two operations, one inbound and another outbound as children of the root node

            if (MetadataRetrievalNode.Root.NodeId.CompareTo(nodeId) == 0)

            {

                // Create outbound operation with action Hello/SayHelloWorld

                string opName1 = "SayHelloWorld";

                string opName1UID = context + "/" + opName1;

                MetadataRetrievalNode outboundNode = new MetadataRetrievalNode(opName1UID);

                outboundNode.NodeId = opName1UID;

                outboundNode.DisplayName = opName1;

                outboundNode.Direction = MetadataRetrievalNodeDirections.Outbound;

                outboundNode.Description = "This operation echos returns text Hello World!";

                outboundNode.IsOperation = true;

                // Create inbound operation with action Hello/OnReceiveGreeting

                // On the Adapter Metadata Utility Tool, this will be filtered through

                // "Select Contract Type" combo box.

                string opName2 = "OnReceiveGreeting";

                string opName2UID = context + "/" + opName2;

                MetadataRetrievalNode inboundNode = new MetadataRetrievalNode(opName2UID);

                inboundNode.NodeId = opName2UID;

                inboundNode.DisplayName = opName2;

                inboundNode.Direction = MetadataRetrievalNodeDirections.Inbound;

                inboundNode.Description = "This operation receives a greeting of an event going to the service.";

                inboundNode.IsOperation = true;

                // Add to metadata retrieval nodes

                List<MetadataRetrievalNode> nodes = new List<MetadataRetrievalNode>(2);

                nodes.Add(inboundNode);

                nodes.Add(outboundNode);

                return nodes.ToArray();

            }

            return null;

        }

        #endregion IMetadataBrowseHandler Members

    }

 

Adapter Metadata Resolver Handler

 

This method will be used at design-time by the Add Adapter Service Reference Visual Studio Plug-In/Consume Adapter Service BizTalk Project Add-In to generate the Service Contract for selected inbound operation.

  public class HelloWorldAdapterMetadataResolverHandler : HelloWorldAdapterHandlerBase, IMetadataResolverHandler

    {

        #region IMetadataResolver Members

              …

        public OperationMetadata ResolveOperationMetadata(string operationId, TimeSpan timeout, out TypeMetadataCollection extraTypeMetadataResolved)

        {

            bool useCustomSchema = false;

            extraTypeMetadataResolved = null;

            ParameterizedOperationMetadata om = null;

            switch(operationId)

        {

                case "Hello/OnReceiveGreeting":

                    om = new ParameterizedOperationMetadata(operationId, "OnReceiveGreeting");

                    om.OperationNamespace = HelloWorldAdapter.SERVICENAMESPACE;

                    om.OperationGroup = "HelloWorld";

                    // syntax: void OnReceiveGreeting( string name, string data );

                    OperationParameter nameParm = new OperationParameter("name", OperationParameterDirection.In, QualifiedType.StringType, false);

                    nameParm.Description = "This is the path of the file.";

                    OperationParameter dataParm = new OperationParameter("data", OperationParameterDirection.In, QualifiedType.StringType, false);

                    dataParm.Description = "This is greeting text from the file.";

                    om.Parameters.Add(nameParm);

                    om.Parameters.Add(dataParm);

            // If om.OperationResult = null, the contract generated will be one-way, such as below:

                    // [OperationContractAttribute(IsOneWay=true, Action="Hello/OnReceiveGreeting")]

                    // void OnReceiveGreeting(string name, object data)

                    // If om.OperationResult = OperationResult.Empty, the generated contract will be two-way

                    // [OperationContractAttribute(Action="Hello/OnReceiveGreeting")]

                    // void OnReceiveGreeting(string name, object data)

                    // All WCF clients and services that communicate with the BizTalk WCF Adapter are expected

                    // to be two-way, with the exception of queues (such as NetMsmq). This is so that BizTalk

                    // knows when to delete the message from the Message Box and thereby ensuring no message loss.

                    om.OperationResult = OperationResult.Empty;

                    return om;

            }

            return null;

        }

             

              …

        #endregion IMetadataResolver Members

    }

 

Adapter Inbound Handler

This class provides methods for Adapter Developer to provide inbound message exchange pattern. The following doesn’t show the asynchronous methods. The asynchronous methods can be implemented by deriving from IAsyncInboundHandler instead of IInboundHandler.

Once the class is added, ensure it is hooked up with the adapter.  

HelloWorldAdapter.cs

        protected override bool IsHandlerSupported<TConnectionHandler>()

        {

            return (

                  typeof(IOutboundHandler) == typeof(TConnectionHandler)

                || typeof(IInboundHandler) == typeof(TConnectionHandler)

                || typeof(IMetadataResolverHandler) == typeof(TConnectionHandler)

                || typeof(IMetadataBrowseHandler) == typeof(TConnectionHandler));

        }

HelloWorldAdapterInboundHandler.cs

    public class HelloWorldAdapterInboundHandler : HelloWorldAdapterHandlerBase, IInboundHandler

    {

        // internal queue for dispatching the messages

        private Queue<Message> inboundQueue;

        // use the file system watcher as a trigger

        private FileSystemWatcher inboundWatcher;

        // use the semaphore in the file system watcher callback

        private Object lockThis;

        // TODO define this path as the binding property instead of hard coding it

        private string path = @"C:\temp\HelloWorld";

        private Message message;

        /// <summary>

        /// Initializes a new instance of the HelloAdapterInboundHandler class

        /// </summary>

        public HelloWorldAdapterInboundHandler(HelloWorldAdapterConnection connection

            , MetadataLookup metadataLookup)

            : base(connection, metadataLookup)

        {

            inboundWatcher = null;

            lockThis = new Object();

        }

        #region IInboundHandler Members

        /// <summary>

        /// Start the listener

        /// </summary>

        /// <param name="actions"></param>

        /// <param name="timeout"></param>

        public void StartListener(string[] actions, TimeSpan timeout)

        {

     // Write a trace entry

            HelloWorldAdapterUtilities.Trace.Trace(TraceEventType.Verbose, "HelloWorldAdapterInboundHandler::StartListener", "Entering StartListener method.");

            try

            {

                inboundQueue = new Queue<Message>();

                 foreach (string action in actions)

                 {

                     if ("Hello/OnReceiveGreeting".Equals(action))

                     {

                        HelloWorldAdapterUtilities.Trace.Trace(TraceEventType.Information, "HelloWorldAdapterInboundHandler::StartListener", "Attempting to start the listner..." );

                        if (inboundWatcher == null)

                        {

                            inboundWatcher = new FileSystemWatcher(path);

                            // Look for files with extension *.txt only

                            inboundWatcher.Filter = "*.txt";

                            // Set the callback for the event - when a *.txt is copied into the folder being monitored

    inboundWatcher.Created += new FileSystemEventHandler(FileMonitor_Event_Created);

                            // Begin monitoring

                            inboundWatcher.EnableRaisingEvents = true;

                        }

                    }

                }

            }

            catch (System.ArgumentException e)

            {

                HelloWorldAdapterUtilities.Trace.Trace(TraceEventType.Error, "HelloWorldAdapterInboundHandler::StartListener", "Error starting the listner." );

                throw new AdapterException(e.Message);

            }

        }

        /// <summary>

        /// Stop the listner

        /// </summary>

        /// <param name="timeout"></param>

        public void StopListener(TimeSpan timeout)

        {

            if (inboundWatcher != null)

            {

                // End monitoring the file system

                inboundWatcher.EnableRaisingEvents = false;

                inboundWatcher = null;

            }

            inboundQueue.Clear();

            inboundQueue = null;

        }

        /// <summary>

        /// Instead of indicating the failure to receive through

        /// an exception, TryReceive has a boolean return value that indicates whether the

        /// operation succeeded. A successful attempt to receive provides you with the message

        /// as normal.

        ///

        /// If there is some way for you to recover from lack of

        /// messages, then TryReceive allows you to avoid the expense of throwing and

        /// catching the TimeoutException. Other exceptions can still be thrown such as

        /// errors occurance in receiving/processing of messages.

        /// </summary>

        /// <param name="timeout"></param>

  /// <param name="message"></param>

        /// <param name="reply"></param>

        /// <returns></returns>

        public bool TryReceive(TimeSpan timeout, out Message aMessage, out IInboundReply reply)

        {

            reply = new HelloAdapterInboundReply();

            aMessage = null;

            if (inboundQueue.Count != 0)

            {

                aMessage = inboundQueue.Dequeue();

                if (aMessage != null)

                {

                    return true;

    }

            }

            return false;

        }

        /// <summary>

        /// This method is type of receive and is essentially a peek operation.

        /// Like TryReceive, WaitForMessage uses a boolean return value to indicate whether the

        /// operation succeeded. However, WaitForMessage never returns a message to you.

        /// It simply says that if you had actually performed a Receive operation,

        /// then you would have seen a message arrive before the timeout expired.

        /// Waiting for a message can be used to defer work until a message is ready for

        /// processing. One example of this type of work deferment is the

        /// receipt of a message inside of a transaction. Suppose that you had a

        /// loop checking for messages to arrive. If you just called Receive or

        /// TryReceive, then you would need to start a transaction every time through

        /// the loop in case a message showed up. Each time a receive attempt failed,

        /// the transaction would have to be canceled. With WaitForMessage, you could

        /// instead peek for messages and then only start a transaction when you knew that

        /// a message was likely to be there. (From Nicholas Allen's Blog)

  /// </summary>

        /// <param name="timeout"></param>

        /// <returns></returns>

        public bool WaitForMessage(TimeSpan timeout)

        {

            // just true for now

            return true;

        }

        /// <summary>

        /// TThis is the callback for the FileSystemWatcher.

        /// </summary>

        /// <param name="sender"></param>

        /// <param name="e"></param>

        void FileMonitor_Event_Created(object sender, FileSystemEventArgs e)

   {

            lock (lockThis)

            {

                if (e.ChangeType == WatcherChangeTypes.Created || e.ChangeType == WatcherChangeTypes.Changed)

                {

                    HelloWorldAdapterUtilities.Trace.Trace(TraceEventType.Information, "HelloWorldAdapterInboundHandler::FileMonitor_Event_Created", "File: " + e.FullPath + " " + e.ChangeType, e.Name + " Created");

                    // wait for file to close, when a new file is being created - should do this in a better manner

                    // that's why copy a new file into the folder, instead of creating it right there

                    System.Threading.Thread.Sleep(20000);

                    // create file info object

                    FileInfo fileInfo = new FileInfo(e.FullPath);

                    try

                    {

                        // read file contents

                        TextReader textReader = new StreamReader(e.FullPath);

                        string content = textReader.ReadToEnd();

  textReader.Close();

                        // get the operation metadata from cache

                        OperationMetadata op = this.MetadataLookup.GetOperationDefinition("Hello/OnReceiveGreeting", TimeSpan.MaxValue);

           // create WCF request message

                        message = CreateOnReceiveGreetingRequestMessage(e.FullPath, content, op);

                        message.Headers.To = new Uri(path);

                        inboundQueue.Enqueue(message);

                    }

                    catch (IOException ex)

                    {

                        String error = String.Format("An exception was thrown while trying to open file {0}", e.FullPath);

                        HelloWorldAdapterUtilities.Trace.Trace(TraceEventType.Information, "HelloWorldAdapterInboundHandler::FileMonitor_Event_Created", ex.Message);

                        throw new AdapterException(error);

                    }

                }

            }

        }

        /// <summary>

        /// Message created by the adapter to dispatch to the service.

        /// </summary>

        /// <param name="fileName"></param>

        /// <param name="fileContent"></param>

        /// <returns></returns>

        private Message CreateOnReceiveGreetingRequestMessage(string fileName, object fileContent, OperationMetadata op)

        {

            // create XML input string for constructing WCF Message

            String xmlData = String.Format(@"<OnReceiveGreeting xmlns=""{2}""><name>{0}</name><data>{1}</data></OnReceiveGreeting>", fileName, fileContent, op.OperationNamespace);

            // set action string

            XmlReader reader = XmlReader.Create(new StringReader(xmlData));

            // create WCF message

            Message requestMessage = Message.CreateMessage(MessageVersion.Default, op.InputMessageAction, reader);

            return requestMessage;

        }

        #endregion IInboundHandler Members

    }

 

Adapter Inbound Reply

We don't have to implement this for now, as our contract is void return type.

    internal class HelloAdapterInboundReply : InboundReply

    {

        public override void Abort()

        {

            // throw new Exception("The method or operation is not implemented.");

        }

        public override void Reply(Message message, TimeSpan timeout)

        {

            // throw new Exception("The method or operation is not implemented.");

        }

    }