Processing Streams with PLINQ

In many data-parallel scenarios, all of the data to be processed is available immediately. This blog post addresses the opposite scenario: the inputs arrive gradually (as if in a stream), and we want to start producing results even before reading the last element of the input sequence.

 

There is a variety of scenarios in which inputs become available gradually rather than all at once. The inputs could arrive as requests across the network, inputs entered by a user, data read from an I/O device, results computed from another computation, and so on. We want to process the inputs in parallel, and make the partial results available as they are computed.

 

Simple PLINQ queries support streaming. A query that only consists of simple Select and Where operators will do streaming:

 

    var q = inputSrc.Where(x => Foo(x)).Select(x => ExpensiveComputation(x));

    foreach(var x in q)

    {

         Process(x);

    }

 

In this example, Process(x) will get called on some input elements even before all elements of the input have been read.

 

On the other hand, not all queries support streaming. In fact, some queries cannot possibly support streaming. Consider this query that contains an OrderBy() operator:

 

    var q = src.AsParallel().Select(x => Foo(x)).OrderBy(x => x);

 

We need to compute Foo(x) for all input elements, and only then we can yield the smallest element. Thus, a query that contains an OrderBy operator does not (and cannot) run in a streaming fashion.

 

So, what kinds of PLINQ queries do run in a streaming fashion? Generally, they are simple queries. The query can contain any number of Select and Where operators, except for the special positional variants. By positional overloads, we mean the overloads of Select and Where that accept a delegate that accesses both element values and their positions in the sequence. A positional Select or Where operator does prevents streaming if there is a Where operator anywhere prior to it in the query.

 

A streaming query can produce both ordered or unordered results. As is typical in PLINQ, the results are unordered by default, but you can opt into ordering by using the AsOrdered() operator:

 

    var q = src.AsParallel().AsOrdered().Select(x => Foo(x)).Where(y => Bar(y));

 

In the above query, PLINQ will read the elements from the src enumerable, distribute them into different partitions, process them on multiple threads, and rearrange results into a correctly-ordered output sequence. All of these stages are happening concurrently, so first results will start getting produced while the inputs are still getting read (assuming that the input sequence is sufficiently long).

 

One important point about streaming PLINQ queries is that the algorithms we use are optimized for throughput rather than latency. PLINQ uses buffers internally, so a particular result may sit in an output buffer until a certain number of results have been produced. Obviously, that is undesirable if one of the results needs to be sent back to a client as quickly as possible.

 

One major benefit of streaming is that all data does not have to be loaded into memory at any one particular time. For example, if the query reads its inputs from one file and writes the outputs to another file, all of the inputs will not necessarily have to get loaded into memory at one time. Keep this use case in mind, and when you come across it in your development, use PLINQ to easily get your code to scale on multi-core machines.