Parallel Aggregations in PLINQ

Quick Overview of LINQ Aggregations

In order to explain the issues we encounter when parallelizing aggregations in PLINQ, let’s first take a quick look at how aggregations work in LINQ.

Aggregation is an operation that iterates over a sequence of input elements, maintaining an accumulator that contains the intermediate result. At each step, a reduction function takes the current element and accumulator value as inputs, and returns a value that will overwrite the accumulator. The final accumulator value is the result of the computation. A variety of interesting operations can be expressed as aggregations: sum, average, min, max, sum of squares, variance, concatenation, count, count of elements matching a predicate, and so on.

LINQ provides several overloads of Aggregate. A possible implementation (without error checking) of the most general of them is given below:

    public static TResult Aggregate<TSource, TAccumulate, TResult>(

        this IEnumerable<TSource> source,

        TAccumulate seed,

        Func<TAccumulate, TSource, TAccumulate> func,

        Func<TAccumulate, TResult> resultSelector

    )

    {

        TAccumulate accumulator = seed;

        foreach (TSource elem in source)

        {

            accumulator = func(accumulator, elem);

        }

        return resultSelector(accumulator);

    }

To compute a particular aggregation, the user provides the input sequence (as method parameter source), the initial accumulator value (seed), the reduction function (func), and a function to convert the final accumulator to the result (resultSelector).  As a usage example, consider the method below that computes the sum of squares of integers:

 

    public static int SumSquares(IEnumerable<int> source)

    {

        return source.Aggregate(0, (sum, x) => sum + x * x, (sum) => sum);

    }

LINQ also exposes a number of predefined aggregations, such as Sum, Average, Max, Min, etc. Even though each one can be implemented using the Aggregate operator, a direct implementation is likely to be more efficient (for example, to avoid a delegate call for each input element).

Parallelizing LINQ Aggregations

Let’s say that we call SumSquares(Enumerable.Range(1,4)) on a dual-core machine. How can we split up the computation among two threads? We could distribute the elements of the input among the threads. For example, Thread 1 could compute the sum of squares of {1,4} and Thread 2 would compute the sum of squares of {3,2}*. Then, as a last step, we combine the results – add them in this case – and we get the final answer.

Sequential Answer = ((((0 + 1^2) + 2^2) + 3^2) + 4^2) = 30
Parallel Answer = (((0 + 1^2) + 4^2) + ((0 + 3^2) + 2^2)) = 30

 

 Note: Notice that elements within each partition do not necessarily appear in the order in which they appear in the input. The reason for this may not be apparent, but it has to do with the presence of other operators in the query.

Combining Accumulators

In the parallel aggregation, we need to do something that we didn’t need to in the sequential aggregation: combine the intermediate results (i.e. accumulators). Notice that combining two accumulators may be a different operation than combining an accumulator with an input element. In the SumSquares example, to combine the accumulator with an input element, we square the element and add it to the accumulator. But, to combine two accumulators, we simply add them, without squaring the second one.

In the cases where the accumulator type is different from the element type, it is even more obvious combining accumulators and combining an accumulator with an element are different operations: even their input argument types differ!

Therefore, the most general PLINQ Aggregate overload accepts an intermediate reduce function as well as a final reduce function, while the most general LINQ Aggregate only needs the intermediate reduce function. The signature of the most general PLINQ Aggregate overload is below (compare with the most general LINQ Aggregate overload shown above):

    public static TResult Aggregate<TSource, TAccumulate, TResult>(

        this IParallelEnumerable<TSource> source,

        TAccumulate seed,

        Func<TAccumulate, TSource, TAccumulate> intermediateReduceFunc,

        Func<TAccumulate, TAccumulate, TAccumulate> finalReduceFunc,

        Func<TAccumulate, TResult> resultSelector
    )


The Quick and Easy Solution

So, how to tell whether a particular aggregation can be parallelized with PLINQ? The simple approach is to imagine the above parallelization process. The input sequence will be reordered and split up into several partitions. Each partition will be accumulated separately on its own thread, with its accumulator initialized to the seed. Then, all accumulators will be combined using the final reduce function. Does this process produce the correct answer? If it does, then the aggregation can be parallelized using PLINQ.

In the rest of this posting, I will describe more in depth the properties that an aggregation must have in order to parallelize correctly. In typical cases, imagining the parallelization process is the easiest way to find out whether an aggregation will produce the correct answer when ran on PLINQ.

Purity of Reduction Functions

Just as in other types of PLINQ queries, delegates that form a part of the query must be pure, or at least observationally pure. So, if any shared state is accessed, appropriate synchronization must be used.

Associativity and Commutativity

The parallel version of an aggregation does not necessarily apply the reduction functions in the same order as the sequential computation. In the SumSquares example, the sequential result is computed in a different order than the parallel result. Of course, the two results will be equal because of the special properties of the + operator: associativity and commutativity.

Operator F(x,y) is associative if F(F(x,y),z) = F(x,F(y,z)), and commutative if F(x,y) = F(y,x), for all valid inputs x,y,z. For example, operator Max is commutative because Max(x,y) = Max(y,x) and also associative because Max(Max(x,y),z) = Max(x,Max(y,z)). Operator – is not commutative because it is not true in general that x-y = y-x, and it is not associative because it is not true in general that x-(y-z) = (x-y)-z.

The following table gives examples of operations that fall into different categories with respect to associativity and commutativity:

 

 

Associative

 

 

No

Yes

Commutative

No

(a, b) => a / b

(a, b) => a – b

(a, b) => 2 * a + b

 

(string a, string b) => a.Concat(b)

(a, b) => a

(a, b) => b

 

Yes

(float a, float b) => a + b

(float a, float b) => a * b

(bool a, bool b) => !(a && b)

(int a, int b) => 2 + a * b

(int a, int b) => (a + b) / 2

 

(int a, int b) => a + b

(int a, int b) => a * b

(a, b) => Min(a, b)

(a, b) => Max(a, b)

 

 

An operation must be both associative and commutative in order for the PLINQ parallelization to work correctly. The good news is that many of the interesting aggregations turn out to be both associative and commutative.

Note: For simplicity, this section only considers aggregations where the type of the accumulator is the same as the type of the element (not only the .Net type, but also the “logical” type). After all, if the accumulator type is different from the element type, the intermediate reduction function cannot possibly be commutative because its two arguments are of different types! In the general case, the final reduction function must be associative and commutative, and the intermediate reduction function must be related to the final reduction function in a specific way. See section “Constraints on Reduce Function and Seed” for details.

Seed is an Identity

LINQ allows the user to initialize the accumulator to an arbitrary seed value. In the following example, the user sets the seed to 5, and thus computes 5 + the sum of squares of integers in a sequence.

    public static int SumSquaresPlus5(IEnumerable<int> source)

    {

        return source.Aggregate(5, (sum, x) => sum + x * x, (sum) => sum);

    }

 

Unfortunately, if we parallelize this query, several threads will split up the input, and each will initialize its accumulator to 5. As a result, 5 will be added to the result as many times as there are threads, and the computed answer will be incorrect.

Can PLINQ do something to fix this problem?

Non-solution 1: Initialize one accumulator to the seed and the rest of them to the default value of T.

For example, if the input contains integers, why not initialize one thread’s accumulator to the user-provided seed, and the other accumulators to 0? The problem is that while 0 is a great initial accumulator value for some aggregations, such as sum, it does not work at all for other aggregations. One such operation is product: if we initialize the accumulator