Increasing the number of mails that can be processed(Asynchronously) by a Routing Agent simultaneously

I recently ran in an issue where one of my customer was developing a context sensitive email analysis software for MS Exchange 2007. The results of the analysis was used to decide how that email would be dealt with.

Due to the complexity of the analysis, the estimated processing time per email was about 15 to 30 seconds. To realize an acceptable throughput of emails per minute, they needed to ensure that the email processing was strongly parallelized. Therefore they encapsulated the analysis itself into a separate web service which was being called from a Routing Agent.

Using the asynchronous programming pattern(https://msdn.microsoft.com/en-us/library/cc720860.aspx), recommended by Microsoft for long I/O waiting times,they encountered a limitation in parallel processing. They noticed that at one time they could only process 6 email at a time. The processing of the seventh email would begin only when one of the mails submitted earlier was done.

In case of the Hub role this was double throughput compared to the synchronous programming pattern but still nowhere near the required rate of parallel processing. At the existing rate they would only be able to process a maximum of 24 emails per minute. The requirement was to process a minimum of 100 emails per minute.

Fortunately I was able to reproduce the issue in my lab. Below is the code I used to reproduce the behaviour:

 using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Xml;
using System.Reflection;
using Microsoft.Exchange.Data.Mime;
using Microsoft.Exchange.Data.Transport;
using Microsoft.Exchange.Data.Transport.Email;
using Microsoft.Exchange.Data.Transport.Routing;

namespace AsyncRoutingAgent
{
    /// <summary>
    /// Agent factory
    /// </summary>
    public class MyRoutingAgentFactory :RoutingAgentFactory
    {
        /// <summary>
        /// Create a new RoutingAgent.
        /// </summary>
        /// <param name="server">Exchange Hub server</param>
        /// <returns>a new agent</returns>
        public override RoutingAgent CreateAgent(SmtpServer server)
        {
            return new MyRoutingAgent();
        }
    }

    public class MyRoutingAgent : RoutingAgent 
    {
        private object fileLock = new object();  

        public MyRoutingAgent()
        {
            base.OnSubmittedMessage += new SubmittedMessageEventHandler(MyRoutingAgent_OnSubmittedMessage);
        }

        void MyRoutingAgent_OnSubmittedMessage(SubmittedMessageEventSource source, QueuedMessageEventArgs e)
        {
            WriteLog("Entering OnSubmittedMessage");

            // Create an instance of the Web Service via the proxy class

            WriteLog("Creating Instance of Web Service");
            AsyncRoutingAgent.localhost.StockService objWebService = new AsyncRoutingAgent.localhost.StockService();

            // Get the async context so that the server knows that this event should be handled asynchronously.
            // Also save the event source and event args so that they can be accessed from the callback.
            WriteLog("Calling GetAgentAsyncContext");

            AsyncState<SubmittedMessageEventSource, QueuedMessageEventArgs> asyncState =
                new AsyncState<SubmittedMessageEventSource, QueuedMessageEventArgs>(
                    source,
                    e,
                    this.GetAgentAsyncContext());

            WriteLog("Hooking the Event Handler");

            objWebService.GetStockQuoteCompleted += new AsyncRoutingAgent.localhost.GetStockQuoteCompletedEventHandler(objWebService_GetStockQuoteCompleted);

            WriteLog("Calling the Service");

            objWebService.GetStockQuoteAsync("MSFT", 15, asyncState);
            WriteLog("Just before Returning");
            return;
        }

        void objWebService_GetStockQuoteCompleted(object sender, AsyncRoutingAgent.localhost.GetStockQuoteCompletedEventArgs e)
        {
            AsyncState<SubmittedMessageEventSource, QueuedMessageEventArgs> asyncState =
                (AsyncState<SubmittedMessageEventSource, QueuedMessageEventArgs>)e.UserState;

            WriteLog(e.Result);
            // Handle the results of the asynchronous I/O here.
            // The event source and event args are available from the asyncState.
            // Tell the server that this event is complete.
            asyncState.Complete();

            WriteLog("Call Completed");
        }

        public void WriteLog(String Comment)
        {
            lock (fileLock)
            {
                try
                {
                    string logDir = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location) + @"\Log";
                    string logFile = logDir + @"\log.txt";
        
                    if (!Directory.Exists(logDir))
                    {
                        Directory.CreateDirectory(logDir);
                    }
        
                    if (!File.Exists(logFile))
                    {
                        File.CreateText(logFile).Close();
                    }
        
                    using (StreamWriter logWriter = File.AppendText(logFile))
                    {
                        logWriter.Write(DateTime.Now.ToString()+ ":" + Comment+ Environment.NewLine);
                        logWriter.Flush();
                    }
                }
                catch (System.IO.IOException ex)
                {
                    Debug.WriteLine(ex.ToString());
                }
            }
        
        }
        
        /// <summary>
        /// This class wraps all the state that has to be saved
        /// for an asynchronous event.
        /// </summary>
        private class AsyncState<SourceType, ArgsType>
        {
            private SourceType source;
            private ArgsType args;
            private AgentAsyncContext asyncContext;

            public AsyncState(
                SourceType source,
                ArgsType args,
                AgentAsyncContext asyncContext)
            {
                this.source = source;
                this.args = args;
                this.asyncContext = asyncContext;
            }

            public SourceType Source
            {
                get { return this.source; }
            }

            public ArgsType Args
            {
                get { return this.args; }
            }

            public void Complete()
            {
                this.asyncContext.Complete();

                this.source = default(SourceType);
                this.args = default(ArgsType);
                this.asyncContext = null;
            }
        }
    }
}

The above code generates a log which clearly shows the problem.

09/04/2009 1:41:25 AM:Entering OnSubmittedMessage

09/04/2009 1:41:26 AM:Entering OnSubmittedMessage

09/04/2009 1:41:27 AM:Entering OnSubmittedMessage

09/04/2009 1:41:28 AM:Entering OnSubmittedMessage

09/04/2009 1:41:28 AM:Entering OnSubmittedMessage

09/04/2009 1:41:30 AM:Entering OnSubmittedMessage

09/04/2009 1:41:40 AM:Call Completed

09/04/2009 1:41:40 AM:Entering OnSubmittedMessage

09/04/2009 1:41:42 AM:Call Completed

09/04/2009 1:41:42 AM:Entering OnSubmittedMessage

09/04/2009 1:41:42 AM:Call Completed 

In the logs above you can see that "Entering OnSubmittedMessage" is fired 6 times between 1:41:25 Am - 1:41:30 and the next call to "Entering OnSubmittedMessage" is made at 1:41:40 AM only after one of the call from the Web Service returned.The web service has a delay of 15 seconds to simulated a time consuming service.

Now to the solution. The Exchange Transport creates multiple instances of the agent if it has multiple messages to process, and the number of instances is limited by the following settings:

MaxExecutingJobs: Range 1 to 50

MaxJobThreads: Range 1-25 (Default value 3)

These setting have to be defined in the edgetransport.exe.config under the appSettings section. In case when the agent is run asynchronously we will only need to alter the value of MaxExecutingJobs and increase its value.

Add the following lines to the edgetransport.exe.config under the appSettings section. You will need to restart the Microsoft Transport and the EdgeTransport.exe

    <add key="MaxExecutingJobs" value="50"/>

    <add key="MaxJobThreads" value="3"/>

In case you need more that 50 jobs to be executed at the same time, you will need to run the code on a machine that has more than 1 processors, since the actual MaxExecutingJobs value is calculated by multiplying it with the number of processors your machine has.

Enjoy!

Note:In a Routing Agent we should not be doing stuff that takes a long time to execute. We did advise the customer to take a alternative approach to the entire solution.