Blocking queues

In many concurrent systems, one thread performs some work, the result of which another thread consumes. This producer/consumer pattern is frequently implemented on top of blocking queues.

If you examine the behavior of System.Collections.Queue and System.Collections.Generic.Queue<T>, you’ll find that they both throw InvalidOperationException from the Dequeue method (which attempts to remove and return an item from the queue) if there are no items in the Queue (and, thus, there’s nothing to be removed). For a producer/consumer scenario, that behavior is typically suboptimal. A producer creating results can store those results into a queue, and a consumer can retrieve those results from the queue in order to process them. But for a concurrent application, where the producer and consumer are executing on separate threads (and where there might be multiple instances of both the producers and consumers executing simultaneously), synchronization mechanisms are necessary on top of queue. More specifically, typically a design calls for a consumer to block until an item is available for it to consume. Rather than attempting to reimplement such blocking functionality every time we need it, we can wrap it up into a new type:

 class BlockingQueue<T> : IDisposable
{
    private Queue<T> _queue = new Queue<T>();
    private Semaphore _semaphore = new Semaphore(0, int.MaxValue);

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException(“data”);
        lock (_queue) _queue.Enqueue(data);
        _semaphore.Release();
    }

    public T Dequeue()
    {
        _semaphore.WaitOne();
        lock (_queue) return _queue.Dequeue();
    }

    void IDisposable.Dispose()
    {
        if (_semaphore != null)
        {
            _semaphore.Close();
            _semaphore = null;
        }
    }
}

BlockingQueue<T> contains two private fields and exposes two public methods (plus an implementation of IDisposable to allow for timely cleanup). The public methods mimic those of Queue<T>: Enqueue, accepting the data of type T to be enqueued, and Dequeue, returning an item of type T. Enqueue has the same semantics as Queue<T>, except BlockingQueue<T>.Enqueue is thread-safe (Queue<T>.Enqueue is not). Dequeue, however, has very different semantics from both Queue.Dequeue and Queue<T>.Dequeue. In addition to being thread-safe (which the other Dequeue methods are not), it also blocks until it knows for sure it'll be able to dequeue an item from the queue.

The first of the two private methods is fairly self-explanatory. The _queue member of type Queue<T> is the actual queue that's storing the data. Any calls to BlockingQueue<T>.Enqueue or BlockingQueue<T>.Dequeue delegate to the corresponding methods on _queue. However, as you can see a few additional things are taking place. First, a monitor is used to synchronize access to the queue. But more importantly, a counting semaphore is used to keep track of the number of items that are available to be removed from the queue. The semaphore starts with a count of 0, which means that any calls to Dequeue will block. Every call to Enqueue increments the semaphore's counter, allowing another non-blocking call to Dequeue.

This, of course, isn't the only way to build a BlockingQueue, nor do we absolutely require the System.Threading.Semaphore class. For example, you can implement a semaphore in purely managed code with a simple class like the following:

 public class ManagedSemaphore
{
    private int _count;
    private object _lock;

    public ManagedSemaphore(int initialCount)
    {
        if (initialCount < 0) throw new ArgumentOutOfRangeException("initialCount", "Initial count must be >= 0.");
        _count = initialCount;
        _lock = new object();
    }

    public void WaitOne()
    {
        lock (_lock)
        {
            while (_count <= 0) Monitor.Wait(_lock);
            _count--;
        }
    }

    public void Release()
    {
        lock (_lock)
        {
            _count++;
            Monitor.Pulse(_lock);
        }
    }
}

Like System.Threading.Semaphore, this ManagedSemaphore class exposes a WaitOne and a Release method. Internally, it has two member fields, an integer count and an object used for a monitor lock. The Release method aquires the lock, and while holding it, increments the internal count and then signals a single thread that might be waiting on the same lock that it should wake up and attempt to retrieve an item (if no thread is waiting, the next time one does call WaitOne, it'll see a positive count). The WaitOne method similarly takes the lock. While holding the lock, it checks the internal count. If the count is positive, it simply decrements the count and returns. However, if the count is 0, it uses the Monitor.Wait method to temporarily release the lock and wait for it to be signaled by another thread (which will happen when another thread calls Release).

We could substitute this ManagedSemaphore for Semaphore in our BlockingQueue<T>. However, given that this is all managed code, we could also incorporate the semaphore's implementation directly into BlockingQueue<T>:

 class BlockingQueue<T> : IEnumerable<T>
{
    private int _count = 0;
    private Queue<T> _queue = new Queue<T>();

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_count <= 0) Monitor.Wait(_queue);
            _count--;
            return _queue.Dequeue();
        }
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        lock (_queue)
        {
            _queue.Enqueue(data);
            _count++;
            Monitor.Pulse(_queue);
        }
    }
}

While nothing official, in a few basic tests this does show some performance improvements over the original BlockingQueue<T> implemented using System.Threading.Semaphore, clocking in at around 3x the speed.

We can further extend BlockingQueue<T> to implement IEnumerable<T>, making it easy to consume all of the items "in" the queue from a foreach loop:

 class BlockingQueue<T> : IEnumerable<T>
{
    private int _count = 0;
    private Queue<T> _queue = new Queue<T>();

    public T Dequeue()
    {
        lock (_queue)
        {
            while (_count <= 0) Monitor.Wait(_queue);
            _count--;
            return _queue.Dequeue();
        }
    }

    public void Enqueue(T data)
    {
        if (data == null) throw new ArgumentNullException("data");
        lock (_queue)
        {
            _queue.Enqueue(data);
            _count++;
            Monitor.Pulse(_queue);
        }
    }

    IEnumerator<T> IEnumerable<T>.GetEnumerator()
    {
        while (true) yield return Dequeue();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return ((IEnumerable<T>)this).GetEnumerator();
    }
}

This allows code like the following:

 BlockingQueue<int> _queue = new BlockingQueue<int>();
...
foreach(int item in _queue)
{
    ...
}

Note that, as written, any code after the foreach loop won't actually execute, since the foreach is, for all intents and purposes, an infinite loop. The implementation of the enumerator (as implemented by the C# compiler) will block on _queue.Dequeue until another thread Enqueues an item into _queue. If you want to follow a pattern like this, unless you actually do want an infinite loop, you can use a break statement to get out of it if some condition is true. For example, if you only wanted the first five items, you could modify the loop as follows:

 int count=0;
foreach(int item in _queue)
{
    ...
    if (5 == ++count) break;
}