Write your services leveraging existing thread pool technologies

here is an idea on how to write your services leveraging the power of existing thread pool technologies.

Thread pooling is important for scalability.
Pooling algorithms generally include complex logic for self tuning, queuing incoming requests, etc.
These are some of the reasons why I'd rather not bother writing thread pooling code in my services and leverage existing technologies instead.

In this article I provide a sample code for a service to receive messages from a MSMQ queue and process them by means of the ASPNet Thread Pool.
Similar code can be written to receive commands from a socket or some other source.

The code is divided into 3 sections:

  • WAITING SECTION (PeekWorkItem() method): this section is used to enter wait mode ('peek mode') on the resource (the queue). Only one thread is allowed to enter 'peek mode' to avoid too many callbacks when a new message arrives. 'peek mode' is represented by flag _nIsPeekActive which is protected against concurrency.
    In case of exceptions (es. because of MSMQ service down) a new PeekWorkItem() is submitted with a delay to avoid 100% CPU loops on error conditions.
  • WAKE UP SECTION (PeekCompleted() method): this section is run when a new message arrives on the queue. In this section a number of requests are submitted to the thread pool to read messages from the queue and process them.
    The code keeps count of the number of requests submitted to the thread pool (into variable _nReceiveCount). It is the thread pool choice to really process those requests concurrently or to serve them in a more scattered way. In case of exceptions (es. because of MSMQ service down) a new PeekWorkItem() is submitted with a delay to avoid 100% CPU loops on error conditions.
  • RECEIVE SECTION (ReceiveWorkItem() method): this section is the work item run by the thread pool that reads messages from the queue and process them.
    Code sections run on a thread pool should never be 'too short' or 'too long' as that may waste resources. So, ReceiveWorkItem() is designed to read and process as many messages as possible up to a configurable timeout (or the queue becomes empty).

 

Figure 1: MSMQ Custom Listener engine

WAITING SECTION (PeekWorkItem() method)

IO mechanisms generally provide asynchronous ways of receiving data.
This saves the need of creating threads just to wait on them.
In facts, there are cheaper ways to wait on empty resources, which are usually exposed as Asynchronous APIs.
.Net MSMQ asynchronous API to read messages from a queue is Queue.BeginPeek() method.

PeekWorkItem() is the code section where my service calls _queue.BeginPeek() method to look for messages into the MSMQ queue.
_queue variable refers to the queue the service is looking for messages from.

Before calling _queue.BeginPeek() , PeekWorkItem() sets flag _nIsPeekActive to 1 (i.e. it enters 'Peek mode').

_nIsPeekActive flag keeps track that somebody is waiting on the queue and avoids other threads call BeginPeek() on it.
This reduces the number of callbacks that are processed when new messages arrive on the queue.
 

If _queue is empty, _queue.BeginPeek() doesn't block the calling thread.
So, no thread objects are wasted waiting on empty queues and no threads are kept blocked for long time.
That's why, PeekWorkItem() may safely be run on thread pool threads.
 

#region PeekWorkItem

/// <summary>submits a BeginPeek() to the queue.</summary>

/// <remarks>only one BeginPeek() may be active on the queue at any moment.

/// BeginPeek() invocation is protected by the synchronized flag 'IsPeekActive' which is set just before calling it and reset at the PeekCompleted() callback.

/// In case BeginPeek fails (ex. because of invalid access or any other reason) 'IsPeekActive' is also reset and a delayed peek is submit to check whether it may succeed at a later time.</remarks>

public void PeekWorkItem() {

try {

// Try a BeginPeek() on the queue; if _nIsPeekActive==1 somebody else is in peek mode, so, PeekWorkItem does nothing...

if (Interlocked.CompareExchange(ref _nIsPeekActive, 1, 0) == 0) {

// Entering Peekmode: Only one thread is allowed to call BeginPeek() to prevent too many callbacks when a new message arrives

_queue.BeginPeek(MessageQueue.InfiniteTimeout, null, new AsyncCallback(PeekCompleted));

}

} catch (Exception ex) {

// Submit a delayed PeekWorkItem() and ignore the exception.

// BeginPeek may fail when there are problems with the MSMQ service.

// The new PeekWorkItem() is delayed to avoid an loop with 100% CPU in such case.

ICall call = new DelayedCall(new DelegateCall(new VoidEmptyArgsDelegate(PeekWorkItem)), _nPeekDelayOnException);

// Release _nIsPeekActive flag: the next PeekWorkItem() will be allows to enter peekmode!

Interlocked.Exchange(ref _nIsPeekActive, 0);

// Start the delayed PeekWorkItem() and save the call object into '_call member'.

// if '_call member' already contains a delayed call object dispose it to have only one active call object at a time

call.Invoke();

IDisposable oldCall = Interlocked.Exchange<ICall>(ref _call, call) as IDisposable;

if (oldCall != null) { oldCall.Dispose(); }

}

}

#endregion

WAKE UP SECTION (PeekCompleted() method)

The wake up section is run when new messages arrive on the queue (or, exceptional conditions happen waiting on it).
In facts PeekCompleted() method was registered as a callback on the queue, in the 'waiting section', when calling BeginPeek() method.
Remember that only one thread is allowed to set _nIsPeekActive flag and call _queue.BeginPeek() .
So, only one callback is run when messages arrive on the queue.

Just after wake up, PeekCompleted() calls _queue.EndPeek() to check whether new messages are available on the queue.

If _queue.EndPeek() succeeds, there are messages available so, PeekCompleted() submits some requests for processing them (i.e. delegates to ReceiveWorkItem() ), to the thread pool.
PeekCompleted() doesn't use the message returned by _queue.EndPeek() ; in facts, it is the Receive section that chooses how to receive messages, whether to share the receive transaction with the message processing etc.

PeekCompleted() keeps count of the requests currently submitted to the thread pool by means of variable _nReceiveCount.
In facts, PeekCompleted() increments _nReceiveCount every time it submits a request for processing to the thread pool and, in turn, _nReceiveCount is decremented every time a thread pool request completes.    

PeekCompleted() makes sure that requests for message processing are sent to the thread pool with _nReceiveCount up to a configurable limit (held in variable _nMaxConcurrency) .

PeekCompleted() releases always _nIsPeekActive flag: it does that before sending the last request for processing or in the finally block.
This allows a new thread enter 'peek mode', when the queue becomes empty or some receive work item complete for other reasons (e.g. by timeout).

In case of exceptions there might be problems on the queue so (after releasing _nIsPeekActive flag) PeekCompleted() submits a delayed request to PeekWorkItem() (the waiting section).
The delay is necessary to prevent a CPU overhead on the PeekWorkItem()/PeekCompleted() loop during the exceptional condition.

     

#region PeekCompleted

/// <summary>PeekCompleted is triggered when a BeginPeek() is active and any message is received by the queue.</summary>

/// <remarks>PeekCompleted resets 'IsPeekActive' and submits an appropriate number of ReceiveWorkItems to the thread pool for receiving messages.

/// ReceiveWorkItems are submitted according to the concurrency configuration setting for the queue.

/// currently active ReceiveWorkItems can be monitored by means of 'Requests Current' performance counter.</remarks>

public void PeekCompleted(IAsyncResult asyncResult) {

bool bPeekActiveReset = false;

try {

// any BeginPeek() must be matched with an EndPeek()...

Message msg = this._queue.EndPeek(asyncResult);

// "Submits receive requests up to the maximun concurrency allowed; current ReceiveCount '_nReceiveCount', Max Concurrency '_nMaxConcurrency'."

int nReceiveCount = Interlocked.CompareExchange(ref _nReceiveCount, -1, -1); // access to _nReceiveCount is sinchronized to allow

for (; nReceiveCount < _nMaxConcurrency; ) {

nReceiveCount = Interlocked.Increment(ref _nReceiveCount);

if (nReceiveCount <= _nMaxConcurrency) {

// "Releases the peekactive mode before sending the last ReceiveWorkItem()"

if (nReceiveCount == _nMaxConcurrency) { Interlocked.Exchange(ref _nIsPeekActive, 0); bPeekActiveReset = true; }

ThreadPool.QueueUserWorkItem(new WaitCallback(ReceiveWorkItem), this);

}

}

} catch (Exception ex) {

// trace a warning and submits a delayed PeekWorkItem() in the finally block.

} finally {

if (bPeekActiveReset == false) { // Submits a delayed PeekWorkItem()

ICall call = new DelayedCall(new DelegateCall(new VoidEmptyArgsDelegate(PeekWorkItem)), _nPeekDelayOnException);

Interlocked.Exchange(ref _nIsPeekActive, 0);

call.Invoke();

IDisposable oldCall = Interlocked.Exchange<ICall>(ref _call, call) as IDisposable;

if (oldCall != null) { oldCall.Dispose(); }

}

}

}

#endregion

     

RECEIVE SECTION (ReceiveWorkItem() method)

ReceiveWorkItem() is run on the thread pool to receive messages from the queue and process them.
Code sections run on a thread pool should never be 'too short' or 'too long' as that may waste resources.
For this reason ReceiveWorkItem() is designed to process multiple messages up to a configurable time interval ( _nWorkItemTimeLimit variable) or the queue becomes empty.

if the queue becomes empty, every running ReceiveWorkItem() completes and (only) the last one processes a PeekWorkItem() to enter 'Peek Mode' on the queue.
Also, PeekWorkItem() is processed when a ReceiveWorkItem() completes because of _nWorkItemTimeLimit expiration.
if a ReceiveWorkItem() completes because of an exception receiving or processing a message, the call to PeekWorkItem() is delayed to limit CPU overhead in exceptional conditions.

In case of failures processing a message, ReceiveWorkItem() tries to save the failed message to a '_failedmessages' queue by means of hlpSaveDeadMessage() method.

#region ReceiveWorkItem

/// <summary>ReceiveWorkItem is processed on the thread pool to receive messages from the queue and process them.</summary>

/// <remarks>ReceiveWorkItem retrieves messages according to the configuration and process them.

/// in case of a processing failure a message, ReceiveWorkItem() tries to save the failed message to the '_failedmessages' queue.

/// the move of the message to the failedmessages queue is performed with the same transactional settings used for retrieval from the original queue. in case of failure saving the message to the failedmessages queue the original message is restored to the original queue ONLY if the queue is configured for use of external transactions.</remarks>

public void ReceiveWorkItem(object oThis) {

int nReceiveCount = 0; bool bIsEmpty = false; bool bDelayPeekWorkItem = false; string sMessageId = null;

try {

// Receives a message from the queue '_queue' with transaction mode '_transactionType' (ReceiveCount: '_nReceiveCount')

DateTime dtExpiration = DateTime.MinValue; if (_nWorkItemTimeLimit >= 0) { dtExpiration = DateTime.Now.AddMilliseconds(_nWorkItemTimeLimit); }

     

for (bool bTimeLimitExceeded = false; bIsEmpty == false && bTimeLimitExceeded == false; sMessageId = null) {

Message oMessage = null; bool bIsReceiveCommitted = false;

try {

// creates a transaction scope for use of external transactions. A null transaction scope is used otherwise");

TransactionScope oTransactionScope = (_transactionType == MessageQueueTransactionType.Automatic) ? new TransactionScope(TransactionScopeOption.RequiresNew) : null;

using (oTransactionScope) {

try { oMessage = _queue.Receive(TimeSpan.Zero, _transactionType); }

catch (MessageQueueException tex) {

if (tex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) { bIsEmpty = true; }

else { throw; }

}

if (_transactionType != MessageQueueTransactionType.Automatic) { bIsReceiveCommitted = true; }

if (oMessage != null) {

// Processing message 'oMessage.Id'

sMessageId = oMessage.Id;

if (_formatter != null) { oMessage.Formatter = _formatter; }

ServiceMessage oServiceMessage = (ServiceMessage)oMessage.Body;

ServiceMessage oRetMessage = hlpProcessMessage(oServiceMessage);

}

if (_transactionType == MessageQueueTransactionType.Automatic) { oTransactionScope.Complete(); }

}

} catch (Exception ex) {

if (sMessageId == null) { throw new CommunicationEngineTargetException(string.Format("Exception '{0}' - '{1}' occurred receiving a message from queue '{2}' and the message ID could not be retrieved; the message cannot be saved to the deadqueue. If the queue is not configured with automatic transactions the message may be lost.", ex.GetType().Name, ex.Message, _queueConf.FormatName)); }

bIsEmpty = hlpSaveDeadMessage(sMessageId, oMessage, ref bIsReceiveCommitted);

if (bIsReceiveCommitted == false) { bDelayPeekWorkItem = true; return; }

}

if (_nWorkItemTimeLimit >= 0 && dtExpiration <= DateTime.Now) { bTimeLimitExceeded = true; }

}

} catch (Exception ex) {

// "Exception occurred receiving a message from queue '_queue'; message id is 'sMessageId'

bDelayPeekWorkItem = true; // "Ignores the exception. and run finally block

} finally {

nReceiveCount = Interlocked.Decrement(ref _nReceiveCount);

PublishEndRequestCounters(_queueConf);

if (!bIsEmpty || nReceiveCount == 0) {

if (!bDelayPeekWorkItem) {

// Submits a PeekWorkItem()

PeekWorkItem();

} else if (nReceiveCount == 0) {

// Submits a delayed PeekWorkItem()

ICall call = new DelayedCall(new DelegateCall(new VoidEmptyArgsDelegate(PeekWorkItem)), _nPeekDelayOnException);

call.Invoke();

IDisposable oldCall = Interlocked.Exchange<ICall>(ref _call, call) as IDisposable;

if (oldCall != null) { oldCall.Dispose(); }

}

}

}

}

#endregion

   

20060908 Write your services leveraging existing thread pool technologies Figure 1.JPG