(The full set of ParallelExtensionsExtras Tour posts is available here.)
Producer/consumer is a fundamental pattern employed in many parallel applications. With producer/consumer, one or more producer threads generate data that is consumed by one or more consumer threads. These consumers can themselves also be producers of further data, typically based on the data they were consuming, which is then consumed again by one or more consumers. And so on. This forms a pipeline, where each level of consumers represents another “stage” in the pipeline, like workers on an assembly line.
Consider an algorithm that will compress and then encrypt blocks of data. The nature of the employed algorithms is such that there is carry-over data from one block to the next, which means we must compress and encrypt blocks in order. We can’t run the compression and encryption algorithm on independent blocks concurrently, but we can run the compression concurrently with the encryption, where while we’re encrypting the compressed block N we can be compressing block N+1. This is naturally modeled as a pipeline, e.g.:
var compressAndEncrypt =
Pipeline.Create(rawChunk => Compress(rawChunk))
.Next(compressedChunk => Encrypt(compressedChunk));
IEnumerable<byte> inputChunks= …;
IEnumerable<byte> results =
There are many ways pipelines can be implemented, and in some cases, you want to dedicate one or more threads to processing each stage of the pipeline. The Pipeline class in the Pipeline.cs file in ParallelExtensionsExtras provides this functionality, delivering a simple API on top of relatively powerful functionality.
Pipeline uses a Task to represent each stage of the pipeline. This Task is executed on a ThreadPerTaskScheduler so that each stage gets its own dedicated thread. Further, each stage supports a configurable degree of parallelism, such that multiple dedicated threads may be used to process each stage. To handle this, the Task uses a Parallel.ForEach to process the input enumerable to that stage, where the Parallel.ForEach is also targeted to the ThreadPerTaskScheduler instance. An instance of BlockingCollection<T> is employed between each stage as the channel for passing data between the producer and the consumer, and the entire pipeline is cancelable with a CancellationToken.