The Magical Input Queue Of T (aka InputQueue<T>)


This post explains an essential class for asynchronous programming that lurks in the depths of the WCF samples: InputQueue<T>. If you need to write efficient server-side apps, you should consider reading through this and add InputQueue<T> to your arsenal. 

Let me start with: This blog post is 4 years late. Sorry! – and with that out of the way:

The WCF samples ship with several copies of a class that’s marked as internal in the System.ServiceModel.dll assembly: InputQueue<T>. Why are these samples – mostly those implementing channel-model extensions – bringing local copies of this class with them? It’s an essential tool for implementing the asynchronous call paths of many aspects of channels correctly and efficiently.

If you look closely enough, the WCF channel infrastructure resembles the Berkeley Socket model quite a bit – especially on the server side. There’s a channel listener that’s constructed on the server side and when that is opened (usually under the covers of the WCF ServiceHost) that operation is largely equivalent to calling ‘listen’ on a socket – the network endpoint is ready for business.  On sockets you’ll then call ‘accept’ to accept the next available socket connection from a client, in WCF you call ‘AcceptChannel’ to accept the next available (session-) channel. On sockets you then call ‘receive’ to obtain bytes, on a channel you call ’Receive’ to  obtain a message.

Before and between calls to ‘’AcceptChannel’ made by the server-side logic,  client-initiated connections – and thus channels – may be coming in and queue up for a bit before they handed out to the next caller of ‘AcceptChannel’, or the asynchronous equivalent ‘Begin/EndAcceptChannel’ method pair. The number of channels that may be pending is configured in WCF with the ‘ListenBacklog’ property that’s available on most bindings.

I wrote ‘queue up’ there since that’s precisely what happens – those newly created channels on top of freshly accepted sockets or HTTP request channels are enqueued into an InputQueue<T> instance and (Begin-)Accept is implemented as a dequeue operation on that queue. There are two particular challenges here that make the regular Queue<T> class from the System.Collections.Generic namespace unsuitable for use in the implementation of that mechanism: Firstly, the Dequeue method there is only available as a synchronous variant and also doesn’t allow for specifying a timeout. Secondly, the queue implementation doesn’t really help much with implementing the ListenBacklog quota where not only the length of the queue is limited to some configured number of entries, but accepting further connections/channels from the underlying network is also suspended for as long as the queue is at capacity and needs to resume as soon as the pressure is relieved, i.e. a caller takes a channel out of the queue.

To show that InputQueue<T> is a very useful general purpose class even outside of the context of the WCF channel infrastructure, I’ve lifted a version of it from one of the most recent WCF channel samples, made a small number of modifications that I’ll write about later, and created a little sample around it that I’ve attached to this post.

The sample I’ll discuss here is simulating parsing/reading IP addresses from a log-file and then performing a reverse DNS name resolution on those addresses – something that you’d do in a web-server log-analyzer or as the background task in a blog engine wile preparing statistics.

Reverse DNS name resolution is quite interesting since it’s embarrassingly easy to parallelize and each resolution commonly takes a really long time (4-5 seconds) –whereby all the work is done elsewhere. The process issuing the queries is mostly sitting around idle waiting for the response.  Therefore, it’s a good idea to run a number of DNS requests in parallel, but it’s a terrible idea to have any of these requests execute as a blocking call and burning a thread. Since we’re assuming that we’re reading from a log file that requires some parsing, it would also be a spectacularly bad idea to have multiple concurrent threads compete for access to that file and get into each other’s way. And since it is a file and we need to lift things up from disk, we probably shouldn’t do that ‘just in time’ as a DNS resolution step is done, but there should rather be some data readily waiting for processing.  InputQueue<T> is enormously helpful in such a scenario.

The key file of the sample code – the implementation of the queue itself aside – is obviously Program.cs. Here’s Main() :

static void Main(string[] args)
{
    int maxItemsInQueue = 10;
    InputQueue<IPAddress> logDataQueue = new InputQueue<IPAddress>();
    int numResolverLoops = 20;
    ManualResetEvent shutdownCompleteEvent = new ManualResetEvent(false);
    List<IPAddressResolverLoop> resolverLoops = new List<IPAddressResolverLoop>();
 
    Console.WriteLine("You can stop the program by pressing ENTER.");

We’re setting up a new InputQueue<IPAddress> here into which we’ll throw the parsed addresses from our acquisition loop that simulates reading from the log. The queue’s capacity will be limited to just 10 entries (maxItemsInQueue is the input value) and we will run 20 ‘resolver loops’, which are logical threads that process IP-to-hostname resolution steps.

    Console.WriteLine("You can stop the program by pressing ENTER.");
 
    // set up the loop termination callback
    WaitCallback loopTerminationCallback = o =>
    {
        if (Interlocked.Decrement(ref numResolverLoops) == 0)
        {
            shutdownCompleteEvent.Set();
        }
    };
 
    // set up the resolver loops
    for (int loop = 0; loop < numResolverLoops; loop++)
    {
        // add the resolver loop 'i' and set the done flag when the
        // last of them terminates
        resolverLoops.Add(
            new IPAddressResolverLoop(
                logDataQueue, loop, 
                loopTerminationCallback, null));
    }

Next we’re kicking off the resolver loops – we’ll look at these in detail a bit later. We’ve got a ManualResetEvent lock object that guards the program’s exit until all these loops have completed and we’re going to set that to signaled once the last loop completes – that’s what the loopTerminationCallback anonymous method is for.  We’re registering the method with each of the loops and as they complete the method gets called and the last call sets the event. Each loop gets a reference to the logDataQueue from where it gets its work.

   // set up the acquisition loop; the loop auto-starts
    using (LogDataAcquisitionLoop acquisitionLoop =
        new LogDataAcquisitionLoop(logDataQueue, maxItemsInQueue))
    {
        // hang main thread waiting for ENTER
        Console.ReadLine();
        Console.WriteLine("*** Shutdown initiated.");
    }

Finally we’re starting the acquisition loop that gets the data from the log file. The loop gets a reference to the logDataQueue where it places the acquired items and it’s passed the maxItemsInQueue quota that governs how many items may be read ahead into the queue. Once the user presses the ENTER key, the acquisition loop object is disposed by ways of exiting the using scope, which stops the loop.

    // shut down the queue; the resolvers will auto-close
    // as the queue drains. We don't need to close them here.
    logDataQueue.Shutdown();
 
    // wait for all work to complete
    shutdownCompleteEvent.WaitOne();
}

Lastly, the queue is shut down (by fittingly calling Shutdown). Shutdown closes the queue (all further enqueue operations are absorbed) and causes all pending readers for which no more entries are available on the queue to unblock immediately  and return null. The resolver loops will complete their respective jobs and will terminate whenever they dequeue null from the queue. As they terminate, they call the registered termination callback (loopTerminationCallback from above) and that will eventually cause shutdownCompletedEvent to become signaled as discussed above.

The log-reader simulator isn’t particularly interesting for this sample, even though one of the goodies is that the simulation executes on an I/O completion port instead of a managed thread-pool thread – that’s another blog post. The two methods of interest are Begin/EndGetLogData – all that’s of interest here is that EndGetLogData returns an IPAddress that’s assumed to be parsed out of a log.

class IPAddressLogReaderSimulator
{
    public IAsyncResult BeginGetLogData(AsyncCallback callback, object data);
    public IPAddress EndGetLogData(IAsyncResult result);
}

The simulator is used internally  by the LogDataAcquisitionLoop class – which we’ll drill into because it implements the throttling mechanism on the queue.

class LogDataAcquisitionLoop : IDisposable
{
    readonly IPAddressLogReaderSimulator ipAddressLogReaderSimulator;
    readonly InputQueue<IPAddress> logDataQueue;
    int maxItemsInQueue;
    int readingSuspended;
    bool shuttingDown;
 
    public LogDataAcquisitionLoop(InputQueue<IPAddress> logDataQueue, int maxItemsInQueue)
    {
        this.logDataQueue = logDataQueue;
        this.maxItemsInQueue = maxItemsInQueue;
        this.shuttingDown = false;
        this.ipAddressLogReaderSimulator = new IPAddressLogReaderSimulator();
        this.ipAddressLogReaderSimulator.BeginGetLogData(this.LogDataAcquired, null);
    }

The constructor sets up the shared state of the loop and kicks off the first read operation on the simulator. Once BeginGetLogData has acquired the first IPAddress (which will happy very quickly), the LogDataAcquired callback method will be invoked. 

    void LogDataAcquired(IAsyncResult result)
    {
        IPAddress address = this.ipAddressLogReaderSimulator.EndGetLogData(result);
 
        Console.WriteLine("-- added {0}", address);
        this.logDataQueue.EnqueueAndDispatch(address, this.LogDataItemDequeued);
        if (!this.shuttingDown && this.logDataQueue.PendingCount < this.maxItemsInQueue)
        {
            this.ipAddressLogReaderSimulator.BeginGetLogData(this.LogDataAcquired, null);
        }
        else
        {
            // the queue will be at the defined capacity, thus abandon 
            // the read loop - it'll be picked up by LogDataItemDequeued
            // as the queue pressure eases
            Interlocked.Exchange(ref this.readingSuspended, 1);
            Console.WriteLine("-- suspended reads");
        }
    }

The callback method gets the IPAddress and puts it into the queue – using the InputQueue<T>.EnqueueAndDispatch(T, Action) method. There are two aspects that are quite special about that method when compared to the regular Queue<T>.Enqueue(T) method. First, it does take a callback as the second argument alongside the item to be enqueued; second, the method name isn’t just Enqueue, it also says Dispatch.

When EnqueueAndDispatch() is called, the item and the callback get put into an internal item queue – that’s the ‘enqueue’ part. As we will see in context a bit later in this post, the ‘dequeue’ operation on the queue is the BeginDequeue/EndDequeue asynchronous method call pair. There can be any number of concurrent BeginDequeue requests pending on the queue. ‘Pending’ means that the calls – rather their async callbacks and async state – are registered in another queue internal to InputQueue<T> that preserves the call order. Thus, BeginDequeue always only puts the async callback and async state into that queue and returns afterwards. There is no thread spun or hung. That’s all it does. 

As things go, the best opportunity to service a pending dequeue operation on a queue is when an item is being enqueued. Consequently, EnqueueAndDispatch() will first put the item into the internal queue and will then look whether there are registered waiters and/or readers – waiters are registered by ‘(Begin-)WaitForItem’, readers are registered by ‘(Begin-)Dequeue’. Since it’s known that there a new item in the queue now, the operation will iterate overall waiters and complete them – and does so by invoking their async callbacks, effectively lending the  enqueue operation’s thread to the waiters. If there’s at least one pending reader, it’ll then pop a message from the head of the internal item queue and call the reader’s async callback, lending the enqueue operation’s thread to processing of the dequeue operation. If that just made your head spin – yes, the item may have been dequeued and processed as EnqueueAndDispatch returns.

There is an overload for EnqueueAndDispatch() that takes an extra boolean parameter that lets you cause the dispatch operation to happen on a different thread, and there is also a EnqueueWithoutDispatch() method that just won’t dispatch through and a standalone Dispatch() method. 

The callback supplied to EnqueueAndDispatch(), here the LogDataItemDequeued method, is am Action delegate. The queue will call this callback as the item is being dequeued and, more precisely, when the item has been removed from the internal item queue, but just before it is returned to the caller. That turns out to be quite handy. If you take another look at the LogDataAcquired method you’ll notice that we’ve got two alternate code paths after EnqueueAndDispatch(). The first branch is called when the queue has not reached capacity and it’s not shutting down. When that’s so, we’re scheduling getting the next log item – otherwise we don’t. Instead, we set the readingSuspended flag and quit – effectively terminating and abandoning the loop. So how does that get restarted when the queue is no longer at capacity? The LogDataItemDequeued callback!

    void LogDataItemDequeued()
    {
        // called whenever an item is dequeued. First we check 
        // whether the queue is no longer full after this 
        // operation and the we check whether we need to resume
        // the read loop.
        if (!this.shuttingDown &&
            this.logDataQueue.PendingCount < this.maxItemsInQueue &&
            Interlocked.CompareExchange(ref this.readingSuspended, 0, 1) == 1)
        {
            Console.WriteLine("-- resuming reads");
            this.ipAddressLogReaderSimulator.BeginGetLogData(this.LogDataAcquired, null);
        }
    }

The callback gets called for each item that gets dequeued. Which means that we’ll get an opportunity to restart the loop when it’s been stalled because the queue reached capacity. So we’re checking here whether the queue isn’t shuttong down and whether it’s below capacity and if that’s so and the readingSuspended flag is set, we’re  restarting the read loop. And that’s how the throttle works.

So now we’ve got the data from the log in the queue and we’re throttling nicely so that we don’t pull too much data into memory. How about taking a look at the DNS resolver loops that process the data?

class IPAddressResolverLoop : IDisposable
{
    readonly InputQueue<IPAddress> logDataQueue;
    readonly int loop;
    readonly WaitCallback loopCompleted;
    readonly object state;
    bool shutdown;
 
    public IPAddressResolverLoop(InputQueue<IPAddress> logDataQueue, int loop, WaitCallback loopCompleted, object state)
    {
        this.logDataQueue = logDataQueue;
        this.loop = loop;
        this.loopCompleted = loopCompleted;
        this.state = state;
        this.logDataQueue.BeginDequeue(TimeSpan.MaxValue, this.IPAddressDequeued, null);
    }

This loop is also implemented as a class and the fields hold shared that that’s initialized in the constructor. This loop also auto-starts and does so by calling BeginDequeue on the input queue. As stated above, BeginDequeue  commonly just parks the callback and returns.

    void IPAddressDequeued(IAsyncResult ar)
    {
        IPAddress address = this.logDataQueue.EndDequeue(ar);
        if (!this.shutdown && address != null)
        {
            Console.WriteLine("-- took {0}", address);
            Dns.BeginGetHostEntry(address, this.IPAddressResolved, new object[] { Stopwatch.StartNew(), address });
        }
        else
        {
            this.loopCompleted(this.state);
        }
    }

As an IPAddress is becomes available on the queue, the callback is being invoked and that’s quite likely on a thread lent by EnqueueAndDispatch() and therefore sitting  on the thread the log file generator is using to call back for completion of the BeginGetLogData method if you trace things back. If we get an address and the value isn’t null, we’ll then proceed to schedule the DNS lookup via Dns.BeginGetHostEntry. Otherwise we’ll terminate the loop and call the loopCompleted callback. In Main() that’s the anonymous method that counts down the loop counter and signals the event when it falls to zero.

    void IPAddressResolved(IAsyncResult ar)
    {
        var args = ((object[])ar.AsyncState);
        var stopwatch = (Stopwatch)args[0];
        var address = (IPAddress)args[1];
 
        stopwatch.Stop();
        double msecs = stopwatch.ElapsedMilliseconds;
 
        try
        {
            IPHostEntry entry = Dns.EndGetHostEntry(ar);
            Console.WriteLine("{0}: {1} {2}ms", this.loop, entry.HostName, msecs);
        }
        catch (SocketException)
        {
            // couldn't resolve. print the literal address
            Console.WriteLine("{0}: {1} {2}ms", this.loop, address, msecs);
        }
        // done with this entry, get the next
        this.logDataQueue.BeginDequeue(TimeSpan.MaxValue, this.IPAddressDequeued, null);
    }

The IPAddressResolved method just deals with the mechanics of printing out the result of the lookup and then schedules another BeginDequeue call to start the next iteration.

Summary: The enabler for and the core piece of the implementation of this scenario is InputQueue<T> – the dequeue-callback enables implementing throttling effectively and the dispatch logic provides an efficient way to leverage threads in applications that leverage asynchronous programming patterns, especially in I/O driven situations as illustrated here.

And last but not least – here’s teh codez; project file is for VS2010, throw the files into a new console app for VS2008 and mark the project to allow unsafe code (for the I/O completion thread pool code).

UsingInputQueue.zip (13.85 KB) 

or if you’d rather have a version of InputQueue that is using the regular thread pool, download the WCF samples and look for InputQueue.cs.

[The sample code posted here is subject to the Windows SDK sample code license]

Comments (0)

Skip to main content