(The full set of ParallelExtensionsExtras Tour posts is available here.)
Producer/consumer scenarios could logically be split into two categories: those where the consumers are synchronous, blocking waiting for producers to generate data, and those where the consumers are asynchronous, such that they’re alerted to data being available and only then spin up to process the produced data. BlockingCollection<T> is very useful for the former case, as it encapsulates patterns that simplify the implementation of a blocking consumer. BlockingCollection<T> is not as ideal for cases where the consumer is asynchronous.
For asynchronous producer/consumer, typically the producer adds data to a shared data structure, such as a thread-safe queue, and notifies any listening consumers that data is available. This then causes the consumer to check the queue, pull out the relevant data, and process it. There are a variety of choices one has to make when applying this approach. Should data be processed in order? Should parallelism be employed to process multiple data elements concurrently if possible? etc.
The AsyncCall<T> type in the AsyncCall.cs file in ParallelExtensionsExtras implements this producer/consumer functionality. The programming model is quite straightforward. You instantiate a new AsyncCall<T> with the delegate to be executed for each piece of data, e.g.:
var call = new AsyncCall<int>(i =>
and then you post data to the instance, e.g.:
As is common in producer/consumer scenarios, consumers may themselves be producers for later stages of a pipeline, and in that fashion you can chain together AsyncCall<T> instances by having the delegate for one post to another, e.g.:
AsyncCall<double> call1 = null;
AsyncCall<string> call2 = null;
AsyncCall<int> call3 = null;
call1 = new AsyncCall<double>(d => call2.Post(d.ToString()));
call2 = new AsyncCall<string>(s => call3.Post(s.Length));
call3 = new AsyncCall<int>( i => Console.WriteLine(i));
Under the covers, AsyncCall<T> is implemented using Tasks, a fact evident from the AsyncCall<T>’s constructor:
int maxDegreeOfParallelism = 1,
int maxItemsPerTask = Int32.MaxValue,
TaskScheduler scheduler = null);
The constructor not only accepts the action delegate to be executed for each item, but also the maximum number of delegates to run concurrently (by default, it executes sequentially), the maximum number of items to be processed per Task (more on this in a moment), and the TaskScheduler to which to schedule tasks. There are other quite interesting constructors on the type but not detailed here, including one that uses a Func<T,Task> instead of an Action<T>. This allows a Task to be returned from the handler, where the handler won’t be considered completed until that task has asynchronous completed. This allows AsyncCall<T> to be used in cases where, for example, you want to employ asynchronous I/O to process data, but you also want to ensure that no more than one item is being processed at a time.
AsyncCall<T>’s implementation is relatively simple. When Post is called to provide new data, that data is enqueued into an internal ConcurrentQueue<T>. The instance then checks to see whether a Task has been scheduled to process data: if one hasn’t, we schedule such a Task. This check enables us to minimize the number of tasks created, such that one Task can process all of the data posted while it happens to be running. When no more data is available, the task will exit, and the next piece of data to be posted will cause another Task to spin up and continue processing. If maxDegreeOfParallelism is set to something greater than 1, the body of the Task will employ a Parallel.ForEach loop to process available data instead of using a normal serial foreach loop.
I mentioned earlier that the AsyncCall<T>’s constructor accepts a maxItemsPerTask parameter. This is used for fairness. If continuous calls to Post were made such that the Task doing the underlying processing always had more data available, this Task would never exit. This could cause starvation problems for other work that needs some time to execute. To deal with this, the Task employed by AsyncCall<T> exits after processing maxItemsPerTask items, even if there’s more to be done. On its way out, the Task queues up another Task to complete the processing, and that task may then be scheduled fairly with regards to other work to be done.