ParallelExtensionsExtras Tour – #4 – BlockingCollectionExtensions

Stephen Toub - MSFT

(The full set of ParallelExtensionsExtras Tour posts is available here.)  

BlockingCollection<T> encapsulates the core synchronization and coordination necessary to enable classic producer/consumer patterns. 

ParallelExtensionsExtras provides the BlockingCollectionExtensions.cs file, which contains several extension methods for BlockingCollection<T> to make several common solutions easier.

GetConsumingPartitioner

It’s common to want to use BlockingCollection<T> in conjunction with either Parallel.ForEach or PLINQ in streaming scenarios: while a producer pumps events into the BlockingCollection<T>, ForEach or PLINQ are used to process those events as quickly as possible.  The connection between BlockingCollection<T> and the consuming parallelization construct can be easily achieved using BlockingCollection<T>’s GetConsumingEnumerable method, which returns an enumerable that continually takes from the collection; this enumerable can then be passed as input to ForEach or PLINQ, e.g.

Parallel.ForEach(_bc.GetConsumingEnumerable(), item =>

{

    … // process item here

});

 

However, there are some potentially important flaws with this simple approach.  First, BlockingCollection<T>’s GetConsumingEnumerable implementation is using BlockingCollection<T>’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.  As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.  Second, the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it’ll take the lock, grab a group of elements (a chunk), and then release the lock.  While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.

We can solve both of these problems by writing a custom partitioner for BlockingCollection<T>.  Our goal for this partitioner is to address both of the drawbacks mentioned previously, and as it turns out doing so is easy.  A Partitioner<T> represents each partition with an IEnumerator<T>, and the object it uses to generate these partitions is an IEnumerable<T>.  We already have an appropriate generator of such an IEnumerable<T> in the form of GetConsumingEnumerable<T>, so to support a dynamic number of partitions in the partitioner, all we need to do is return the result of GetConsumingEnumerable<T> from the GetDynamicPartitions override.  Supporting a static number of partitions then just requires delegating to the dynamic support to retrieve a fixed number of positions.  Here is the implementation:

public static Partitioner<T> GetConsumingPartitioner<T>(

    this BlockingCollection<T> collection)

{

    return new BlockingCollectionPartitioner<T>(collection);

}

 

private class BlockingCollectionPartitioner<T> : Partitioner<T>

{

    private BlockingCollection<T> _collection;

 

    internal BlockingCollectionPartitioner(

        BlockingCollection<T> collection)

    {

        if (collection == null)

            throw new ArgumentNullException(“collection”);

        _collection = collection;

    }

 

    public override bool SupportsDynamicPartitions {

        get { return true; }

    }

 

    public override IList<IEnumerator<T>> GetPartitions(

        int partitionCount)

    {

        if (partitionCount < 1)

            throw new ArgumentOutOfRangeException(“partitionCount”);

        var dynamicPartitioner = GetDynamicPartitions();

        return Enumerable.Range(0, partitionCount).Select(_ =>

            dynamicPartitioner.GetEnumerator()).ToArray();

    }

 

    public override IEnumerable<T> GetDynamicPartitions()

    {

        return _collection.GetConsumingEnumerable();

    }

}

 

With this in place, we can implement our streaming with Parallel.ForEach just as we did previously, except by using GetConsumingPartitioner instead of GetConsumingEnumerable.  This relies on the fact that both Parallel.ForEach and PLINQ’s AsParallel have overloads that work with partitioners:

Parallel.ForEach(_bc.GetConsumingPartitioner(), item =>

{

    … // process item here

});

 

AddFromEnumerable and AddFromObservable

In producer/consumer scenarios, it’s sometimes the case that the producer is presenting the data in the form of an IEnumerable<T> already filled with the relevant data.  To get that data into a BlockingCollection<T>, we can utilize a simple utility method to transfer the contents into the BlockingCollection<T>:

public static void AddFromEnumerable<T>(

    this BlockingCollection<T> target, IEnumerable<T> source,

    bool completeAddingWhenDone)

{

    try { foreach (var item in source) target.Add(item); }

    finally { if (completeAddingWhenDone) target.CompleteAdding(); }

}

 

Similarly, the data may be arriving asynchronously in the form of an IObservable<T>.  We can use a simple DelegateBasedObserver class which just implements the IObserver<T> interface to marshal calls to OnNext, OnError, and OnCompleted to the respective delegates passed into the observer instance.  Then, when someone calls AddFromObservable, we subscribe a delegate that takes any data from the observable and adds it to the BlockingCollection<T>:

public static IDisposable AddFromObservable<T>(

    this BlockingCollection<T> target, IObservable<T> source,

    bool completeAddingWhenDone)

{

    return source.Subscribe(new DelegateBasedObserver<T>

    (

        onNext: item => target.Add(item),

        onError: error => {

            if (completeAddingWhenDone) target.CompleteAdding(); },

        onCompleted: () => {

            if (completeAddingWhenDone) target.CompleteAdding(); }

    ));

}

 

ToProducerConsumerCollection

The IProducerConsumerCollection<T> interface was introduced in the .NET Framework 4 to represent concurrent collections that support add/take operations and that are meant to be used in producer/consumer scenarios.  .NET 4 ships three such implementations in the form of ConcurrentQueue<T>, ConcurrentStack<T>, and ConcurrentBag<T>.  BlockingCollection<T> is also thread-safe and supports add/take operations, but it noticeably does not implement IProducerConsumerCollection.  This is a decision we struggled with, and in the end we decided not to implement the interface because there didn’t seem to be one right answer on how adds and takes should behave.  Should they be non-blocking or blocking? If blocking, should they incorporate timeouts?  Etc. Instead, we verified that it’s easy to create an IProducerConsumerCollection<T> wrapper for BlockingCollection<T> so that developers can specify whatever semantics they desire, and we added such an implementation into BlockingCollectionExtensions.cs in ParallelExtensionExtras.

The provided solution includes several overloads of a ToProducerConsumerCollection; here is the largest overload:

public static IProducerConsumerCollection<T> ToProducerConsumerCollection<T>(

    this BlockingCollection<T> collection, int millisecondsTimeout,

    CancellationToken cancellationToken)

{

    return new ProducerConsumerWrapper<T>(

        collection, millisecondsTimeout, cancellationToken);

}

 

In addition to the target BlockingCollection<T>, both a timeout and a CancellationToken are also provided: these serves to enable blocking operations with timeouts as well as with cancellation semantics, and these two values are used when calling BlockingCollection<T>’s TryAdd and TryTake methods.

The rest of the implementation is straightforward:

internal sealed class ProducerConsumerWrapper<T> :

    IProducerConsumerCollection<T>

{

    private readonly BlockingCollection<T> _collection;

    private readonly int _millisecondsTimeout;

    private readonly CancellationToken _cancellationToken;

 

    public ProducerConsumerWrapper(

        BlockingCollection<T> collection, int millisecondsTimeout,

        CancellationToken cancellationToken)

    {

        if (collection == null)

            throw new ArgumentNullException(“bc”);

        if (millisecondsTimeout < -1)

            throw new ArgumentOutOfRangeException(

                “millisecondsTimeout”);

        _collection = collection;

        _millisecondsTimeout = millisecondsTimeout;

        _cancellationToken = cancellationToken;

    }

 

    public bool TryAdd(T item)

    {

        return _collection.TryAdd(

            item, _millisecondsTimeout,

            _cancellationToken);

    }

 

    public bool TryTake(out T item)

    {

        return _collection.TryTake(

            out item, _millisecondsTimeout,

            _cancellationToken);

    }

 

    public void CopyTo(T[] array, int index)

    {

        _collection.CopyTo(array, index);

    }

 

    public T[] ToArray()

    {

        return _collection.ToArray();

    }

 

    … // the rest of ICollection’s implementation

}

 

0 comments

Discussion is closed.

Feedback usabilla icon