Controlling Message Flow to a Service Host

This post presents a custom service behavior implementation for controlling the message flow to a WCF service host that reads messages from MSMQ.

Let’s assume that you want to limit the rate at which the service host reads messages from a queue. This requirement usually can be derived from scalability or throttling requirements. And, when the system is “under stress” you want to stop further message consumption.

The WCF ServiceHostBase class provides a property called “ManualFlowControlLimit” (abbreviated as MFCL throughout the article) to limit the number of messages received by the host. This property behaves as a counter and decremented each time a message is received. The default value of MFCL is Int32.Max which means read with “no limit”. When MFCL’s value is 0, the host does not receive any more messages until it is set to non-zero again.

Through MFCL, we will limit the rate at which the service host reads messages. We will also monitor system resource usage to stop the message-flow to the host when the system is under stress.

First, to maintain a max message flow rate to a host, we will set MFCL value to N every t seconds. This way we get approximately max of N/t (messages/sec) flow to the service host. Note that this will be the upper limit as to the throughput of messages to the host. If this value is set higher than what the service can handle, it will only process as many as it can. For example, even if the max flow rate is set to 2000, it is possible that the host will not read 1000 msgs/sec; as it may be limited to only read 150 without being throttled at all.

In our solution, we will simply use a System.Timers.Timer object that fires every t=2 seconds to set the value of MFCL. The max flow rate value that is set on the throttling behavior in the configuration file of the service host is used to calculate the value of MFCL.

Second, to stop message flow when the system is “under stress”, we will set MFCL to 0. In the solution provided in this article, we define system busy state as the case when the current CPU usage exceeds 99%. We get the current CPU usage using the PerformanceCounter API. Note that in this example we choose a simple diagnostic, CPU usage via performance counter, however you can use a more complex algorithm that could utilize a voting pattern, quota limits, calculations or other performance counters.

The algorithm is summarized below:

Every t sec

Get current CPU usage

IF current CPU usage is greater than 99

     Set MFCL to 0

ELSE

     Get Max Flow Rate (MFR)

     Set MFCL to MFR * t

A sample implementation for the controller is as follows:

Code Snippet

    internal class ServiceHostFlowController : IDisposable

    {

        const int TimerIntervalSecond = 2;

        Timer updateFlowTimer;

        int maxFlowRate = Int32.MaxValue;

        bool disposed;

        PerformanceCounter cpuPerformanceCounter = new PerformanceCounter("Processor", "% Processor Time", "_Total");

        ServiceHostBase serviceHost;

        internal ServiceHostFlowController(ServiceHostBase serviceHostBase, int maxMessageFlowRate)

        {

            serviceHost = serviceHostBase;

            maxFlowRate = maxMessageFlowRate;

            updateFlowTimer = new Timer(ServiceHostFlowController.TimerIntervalSecond * 1000); // msec

            // Set initial flow to 0

            MessageFlowLimit = 0;

            updateFlowTimer.Elapsed += new ElapsedEventHandler(UpdateFlow);

        }

        public void Dispose()

        {

            Dispose(true);

            GC.SuppressFinalize(this);

        }

        private void Dispose(bool disposing)

        {

            // Check to see if Dispose has already been called.

            if (!disposed)

            {

                if (disposing)

                {

           // Dispose managed resources.

                    updateFlowTimer.Close();

                    cpuPerformanceCounter.Close();

                }

                // Note disposing has been done.

                disposed = true;

            }

        }

        private bool IsServerBusy

        {

            get { return cpuPerformanceCounter.NextValue() > 99; }

        }

        private int MessageFlowLimit

        {

            set { serviceHost.ManualFlowControlLimit = value; }

        }

        internal void UpdateFlow(object source, ElapsedEventArgs e)

        {

            if (IsServerBusy)

            {

                MessageFlowLimit = 0;

                return;

            }

            int flowLimit;

            try

            {

                flowLimit = checked(maxFlowRate * ServiceHostFlowController.TimerIntervalSecond);

            }

            catch (OverflowException)

            {

                flowLimit = Int32.MaxValue;

      }

            MessageFlowLimit = flowLimit;

        }

        internal void Start()

        {

            updateFlowTimer.Start();

        }

        internal void Stop()

        {

            updateFlowTimer.Stop();

        }

    }

To add the flow controller to the service host, we define a custom service behavior as follows:

Code Snippet

public class CustomThrottlingBehaviorExtensionElement : BehaviorExtensionElement

    {

        ///<summary>

        /// If the value is less than or equal to 0

        ///</summary>

        [ConfigurationProperty("maxMessageFlowRate", IsRequired = false, DefaultValue=Int32.MaxValue), IntegerValidator(MinValue=1)]

        public int MaxMessageFlowRate

        {

            get { return (int)base["maxMessageFlowRate"]; }

            set { base["maxMessageFlowRate"] = value; }

        }

        protected override object CreateBehavior()

        {

            return new CustomThrottlingBehavior(MaxMessageFlowRate);

        }

        public override Type BehaviorType

        {

            get

            {

                return typeof(CustomThrottlingBehavior);

            }

        }

    }

    public class CustomThrottlingBehavior : IServiceBehavior, IDisposable

    {

        ServiceHostFlowController flowController;

        private int maxFlowRate;

        public CustomThrottlingBehavior(int maxMessageFlowRate)

        {

            maxFlowRate = maxMessageFlowRate;

        }

        public void Dispose()

        {

            Dispose(true);

            GC.SuppressFinalize(this);

        }

        protected virtual void Dispose(bool disposing)

        {

            if (disposing)

            {

                if (flowController != null)

                {

  flowController.Dispose();

                }

            }

        }

        public void AddBindingParameters(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase, Collection<ServiceEndpoint> endpoints, BindingParameterCollection bindingParameters)

        { }

        public void ApplyDispatchBehavior(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)

        {

            if (maxFlowRate != Int32.MaxValue)

            {

            // Flow rate throttling is enabled, initilize flow controller

                flowController = new ServiceHostFlowController(serviceHostBase, maxFlowRate);

                serviceHostBase.Opened += new EventHandler(serviceHostBase_Opened);

            serviceHostBase.Closed += new EventHandler(serviceHostBase_Closed);

            }

        }

        void serviceHostBase_Closed(object sender, EventArgs e)

        {

            flowController.Stop();

        }

        void serviceHostBase_Opened(object sender, EventArgs e)

        {

            flowController.Start();

        }

        public void Validate(ServiceDescription serviceDescription, ServiceHostBase serviceHostBase)

        { }

    }

Now, the behavior can be added to the service in the web.config file as follows:

Code Snippet

< system.serviceModel >

    < bindings >

      < netMsmqBinding >

        < bindingname = "netMsmqConfig">

          < securitymode = "None" />

        </ binding >

      </ netMsmqBinding >

    </ bindings >

    < extensions >

      < behaviorExtensions >

        < addname = "customThrottlingBehavior"type="ServiceFlowControllerSample.CustomThrottlingBehaviorExtensionElement, ServiceFlowControllerSample, Version=1.0.0.0, Culture=neutral, PublicKeyToken=3ce62a76d7e8bebe"/>

      </ behaviorExtensions >

    </ extensions >

    < services >

      < servicename = "ServiceFlowControllerSample.Service1"behaviorConfiguration="ServiceFlowControllerSample.Service1Behavior">

        <!-- Service Endpoints -->

        < endpointaddress = "net.msmq://localhost/private/ServiceFlowControllerSample/Service1.svc"binding="netMsmqBinding"bindingConfiguration="netMsmqConfig"contract="ServiceFlowControllerSample.IService1">

          <!--

              Upon deployment, the following identity element should be removed or replaced to reflect the

              identity under which the deployed service runs. If removed, WCF will infer an appropriate identity

              automatically.

-->

          < identity >

            < dnsvalue = "localhost"/>

          </ identity >

        </ endpoint >

        < endpointaddress = "mex"binding="mexHttpBinding"contract="IMetadataExchange"/>

      </ service >

    </ services >

    < behaviors >

      < serviceBehaviors >

        < behaviorname = "ServiceFlowControllerSample.Service1Behavior">

          <!-- To avoid disclosing metadata information, set the value below to false and remove the metadata endpoint above before deployment -->

          < serviceMetadatahttpGetEnabled = "true"/>

          <!-- To receive exception details in faults for debugging purposes, set the value below to true. Set to false before deployment to avoid disclosing exception information -->

          < serviceDebugincludeExceptionDetailInFaults = "false"/>

          < customThrottlingBehaviormaxMessageFlowRate = "2"/>

        </ behavior >

      </ serviceBehaviors >

    </ behaviors >

  </ system.serviceModel >

Execution and Results

To verify the implementation, we will use perfmon by adding the following counters to its view.

(Processor, % Processor Time, _Total)

(MSMQ Queue, [machineName]\private$\serviceflowcontrollersample\service1.svc)

 

We start our test by sending 100 messages to the queue. The host starts reading messages at a max flow rate of 2 msgs/sec as set in the configuration. After about half the messages are consumed, we start another app that performs a spin-wait that will allow us simulate busy system by increasing the CPU usage above 99%, which is the stop threshold in our case. As seen in the following picture, no further messages are read from the queue at this time. Next, we stop the spin-wait app to release CPU resources. Observe that the message flow is resumed and number of messages in the queue continues to decline.

ManualFlowControlLimit_Results

 

by Hülya Pamukçu, May 09