Merge sequences in parallel

In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for enumerable and observable sequences however at some point it was decided to no longer support it for enumerable sequences in Reactive Extensions as it may be expressed through the same combinator for observable sequences. 

I find it quite interesting to implement Merge combinator for enumerable sequences but to make it more interesting let’s change behavior a little bit. The original behavior of the EnumerableEx.Merge was to drain source sequences as fast as workers can and buffer results in a resulting queue until it is consumed by merged sequence enumerator. It can be implemented (maybe this is not the most efficient way but still) using

  • BlockingCollection<T> to synchronize producers that drain original sequences and consumer that merges elements from all sequences into resulting sequence
  • CancellationTokenSource to notify producers that early termination is requested

Instead let’s change behavior to allow producer proceed with getting next element from source sequence only once previously “produced” element is consumed (which basically means merged sequence enumerator reached it). This is sort of two way synchronization between producer and consumer where consumer cannot proceed unless there are ready elements and producer cannot proceed unless previous element is consumed. It is kind of similar to blocking collection. However in this case there is only one consumer and consumers aren’t blocked because the queue is full but rather until previously enqueued element is dequeued.Let’s code the thing!

 class Consumer<T>
{
    private readonly IEnumerable<IEnumerable<T>> _sources;

    private readonly object _sync = new object();

    private readonly Queue<Producer<T>> _globalQueue = new Queue<Producer<T>>();
    private readonly Queue<Producer<T>> _localQueue = new Queue<Producer<T>>();

    // No need to mark these fields as volatile as 
    // only consumer thread updates and reads them
    private bool _done;
    private int _count;

    private readonly IList<Exception> _exceptions = new List<Exception>();

    public Consumer(IEnumerable<IEnumerable<T>> sources)
    {
        _sources = sources;
    }

    // Merges sources in parallel
    public IEnumerable<T> Merge()
    {
        Start();
        while (true)
        {
            Wait();
            while (_localQueue.Count > 0)
            {
                // Use local queue to yield values
                var producer = _localQueue.Dequeue();
                // Get the actual value out of ready producer
                // while simultaneously allowing it to proceed 
                // and observe early termination request or get 
                // next value
                var next = producer.GetNext(!_done);
                // Using Notificaiton<T> from Rx to simplify
                // sequence processing
                if (next.Kind != NotificationKind.OnNext)
                {
                    _count--;
                    if (next.Kind == NotificationKind.OnError)
                    {
                        // Observed exception leads to early 
                        // termination however merge will allow 
                        // already running (not waiting) 
                        // producers to finish (potentially 
                        // bringing more exceptions to be 
                        // observed) while requesting waiters 
                        // to terminate
                        _done = true;
                        // Store observed exception to be further 
                        // thrown as part of the aggregate 
                        // exception
                        _exceptions.Add(next.Exception);
                    }
                }
                else
                {
                    yield return next.Value;
                }
            }
            // Loop until all producers finished
            if (_count == 0)
            {
                // Either normally
                if (_exceptions.Count == 0)
                    yield break;
                // Or exceptions were observed
                throw new AggregateException(_exceptions);
            }
        }
    }

    // Notifies consumer of ready elements
    public void Enqueue(Producer<T> producer)
    {
        // Notify consumer of a ready producer
        lock (_sync)
        {
            // Consumer will either observe non-empty 
            // global queue (if it is not waiting already) 
            // or pulse (if it is waiting)
            _globalQueue.Enqueue(producer);
            Monitor.Pulse(_sync);
        }
    }

    // Waits for ready elements
    private void Wait()
    {
        lock (_sync)
        {
            // As the only consumer is draining the global queue
            // once an empty queue is observed and consumer is 
            // notified the queue will be non-empty
            if (_globalQueue.Count == 0)
                Monitor.Wait(_sync);
            // Copy whatever available to local queue to further 
            // drain without bothering taking a lock
            while (_globalQueue.Count > 0)
                _localQueue.Enqueue(_globalQueue.Dequeue());
        }
    }

    // Starts producers
    private void Start()
    {
        try
        {
            foreach (var source in _sources)
            {
                var producer = new Producer<T>(this, source.GetEnumerator());
                Task.Factory.StartNew(producer.Enumerate);
                _count++;
            }
        }
        catch (Exception ex)
        {
            // If none of producers are started successfully 
            // just rethrow the exception
            if (_count == 0)
                throw;
            // Some producers started notify them of early 
            // termination
            _done = true;
            // Store observed exception to be further thrown as 
            // part of the aggregate exception
            _exceptions.Add(ex);
        }
    }
}

class Producer<T>
{
    private readonly object _sync = new object();
    private readonly Consumer<T> _consumer;

    private IEnumerator<T> _enum;

    private volatile Notification<T> _next;
    private volatile bool _cont = true;
    private volatile bool _awake;

    public Producer(Consumer<T> consumer, IEnumerator<T> enumerator)
    {
        _consumer = consumer;
        _enum = enumerator;
    }

    // Drains source sequence
    public void Enumerate()
    {
        try
        {
            // Loop always observes non-null value as 
            // it is initially non-null and everytime consumer 
            // awakes producer it is set non-null value
            while (_cont && _enum.MoveNext())
            {
                // Set continuation flag to null and wait to be 
                // awoken by the consumer
                _awake = false;
                // Notify consumer of a ready element
                _next = new Notification<T>.OnNext(_enum.Current);
                _consumer.Enqueue(this);

                lock (_sync)
                {
                    // If consumer was pretty quick and producer 
                    // missed the pulse it will observe the awake 
                    // flag and thus won't wait
                    if (!_awake)
                        Monitor.Wait(_sync);
                }
            }
            _next = new Notification<T>.OnCompleted();
        }
        catch (Exception ex)
        {
            _next = new Notification<T>.OnError(ex);
        }
        finally
        {
            _enum.Dispose();
            _enum = null;
        }
        // Notify consumer that the producer completed 
        _consumer.Enqueue(this);
    }

    // Awakes/notifies producer that it can proceed
    public Notification<T> GetNext(bool cont)
    {
        // Store ready element in local variable 
        // because once producer is unleashed below 
        // it may override ready yet not returned element
        var next = _next;

        lock (_sync)
        {
            // Set awake flag in case producer miss the pulse 
            _awake = true;
            _cont = cont;
            Monitor.Pulse(_sync);
        }

        return next;
    }
}

With Producer<T> and Consumer<T> in place Merge combinator can be implemented as:

 static class EnumerableEx
{
    public static IEnumerable<T> Merge<T>(IEnumerable<IEnumerable<T>> sources)
    {
        Contract.Requires(sources != null);
        return new Consumer<T>(sources).Merge();
    }
}

At any given moment at most n elements will be in a global queue where n is the number of source sequences.

Great exercise!