Quick Overview of LINQ Aggregations
In order to explain the issues we encounter when parallelizing aggregations in PLINQ, let’s first take a quick look at how aggregations work in LINQ.
Aggregation is an operation that iterates over a sequence of input elements, maintaining an accumulator that contains the intermediate result. At each step, a reduction function takes the current element and accumulator value as inputs, and returns a value that will overwrite the accumulator. The final accumulator value is the result of the computation. A variety of interesting operations can be expressed as aggregations: sum, average, min, max, sum of squares, variance, concatenation, count, count of elements matching a predicate, and so on.
LINQ provides several overloads of Aggregate. A possible implementation (without error checking) of the most general of them is given below:
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IEnumerable<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> func,
Func<TAccumulate, TResult> resultSelector
)
{
TAccumulate accumulator = seed;
foreach (TSource elem in source)
{
accumulator = func(accumulator, elem);
}
return resultSelector(accumulator);
}
To compute a particular aggregation, the user provides the input sequence (as method parameter source), the initial accumulator value (seed), the reduction function (func), and a function to convert the final accumulator to the result (resultSelector). As a usage example, consider the method below that computes the sum of squares of integers:
public static int SumSquares(IEnumerable<int> source)
{
return source.Aggregate(0, (sum, x) => sum + x * x, (sum) => sum);
}
LINQ also exposes a number of predefined aggregations, such as Sum, Average, Max, Min, etc. Even though each one can be implemented using the Aggregate operator, a direct implementation is likely to be more efficient (for example, to avoid a delegate call for each input element).
Parallelizing LINQ Aggregations
Let’s say that we call SumSquares(Enumerable.Range(1,4)) on a dualcore machine. How can we split up the computation among two threads? We could distribute the elements of the input among the threads. For example, Thread 1 could compute the sum of squares of {1,4} and Thread 2 would compute the sum of squares of {3,2}*. Then, as a last step, we combine the results – add them in this case – and we get the final answer.
Sequential Answer = ((((0 + 1^2) + 2^2) + 3^2) + 4^2) = 30
Parallel Answer = (((0 + 1^2) + 4^2) + ((0 + 3^2) + 2^2)) = 30
Note: Notice that elements within each partition do not necessarily appear in the order in which they appear in the input. The reason for this may not be apparent, but it has to do with the presence of other operators in the query.
Combining Accumulators
In the parallel aggregation, we need to do something that we didn’t need to in the sequential aggregation: combine the intermediate results (i.e. accumulators). Notice that combining two accumulators may be a different operation than combining an accumulator with an input element. In the SumSquares example, to combine the accumulator with an input element, we square the element and add it to the accumulator. But, to combine two accumulators, we simply add them, without squaring the second one.
In the cases where the accumulator type is different from the element type, it is even more obvious combining accumulators and combining an accumulator with an element are different operations: even their input argument types differ!
Therefore, the most general PLINQ Aggregate overload accepts an intermediate reduce function as well as a final reduce function, while the most general LINQ Aggregate only needs the intermediate reduce function. The signature of the most general PLINQ Aggregate overload is below (compare with the most general LINQ Aggregate overload shown above):
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IParallelEnumerable<TSource> source,
TAccumulate seed,
Func<TAccumulate, TSource, TAccumulate> intermediateReduceFunc,
Func<TAccumulate, TAccumulate, TAccumulate> finalReduceFunc,
Func<TAccumulate, TResult> resultSelector
)
The Quick and Easy Solution
So, how to tell whether a particular aggregation can be parallelized with PLINQ? The simple approach is to imagine the above parallelization process. The input sequence will be reordered and split up into several partitions. Each partition will be accumulated separately on its own thread, with its accumulator initialized to the seed. Then, all accumulators will be combined using the final reduce function. Does this process produce the correct answer? If it does, then the aggregation can be parallelized using PLINQ.
In the rest of this posting, I will describe more in depth the properties that an aggregation must have in order to parallelize correctly. In typical cases, imagining the parallelization process is the easiest way to find out whether an aggregation will produce the correct answer when ran on PLINQ.
Purity of Reduction Functions
Just as in other types of PLINQ queries, delegates that form a part of the query must be pure, or at least observationally pure. So, if any shared state is accessed, appropriate synchronization must be used.
Associativity and Commutativity
The parallel version of an aggregation does not necessarily apply the reduction functions in the same order as the sequential computation. In the SumSquares example, the sequential result is computed in a different order than the parallel result. Of course, the two results will be equal because of the special properties of the + operator: associativity and commutativity.
Operator F(x,y) is associative if F(F(x,y),z) = F(x,F(y,z)), and commutative if F(x,y) = F(y,x), for all valid inputs x,y,z. For example, operator Max is commutative because Max(x,y) = Max(y,x) and also associative because Max(Max(x,y),z) = Max(x,Max(y,z)). Operator – is not commutative because it is not true in general that xy = yx, and it is not associative because it is not true in general that x(yz) = (xy)z.
The following table gives examples of operations that fall into different categories with respect to associativity and commutativity:


Associative




No

Yes

Commutative

No

(a, b) => a / b (a, b) => a – b (a, b) => 2 * a + b

(string a, string b) => a.Concat(b) (a, b) => a (a, b) => b

Yes

(float a, float b) => a + b (float a, float b) => a * b (bool a, bool b) => !(a && b) (int a, int b) => 2 + a * b (int a, int b) => (a + b) / 2

(int a, int b) => a + b (int a, int b) => a * b (a, b) => Min(a, b) (a, b) => Max(a, b)

An operation must be both associative and commutative in order for the PLINQ parallelization to work correctly. The good news is that many of the interesting aggregations turn out to be both associative and commutative.
Note: For simplicity, this section only considers aggregations where the type of the accumulator is the same as the type of the element (not only the .Net type, but also the “logical” type). After all, if the accumulator type is different from the element type, the intermediate reduction function cannot possibly be commutative because its two arguments are of different types! In the general case, the final reduction function must be associative and commutative, and the intermediate reduction function must be related to the final reduction function in a specific way. See section “Constraints on Reduce Function and Seed” for details.
Seed is an Identity
LINQ allows the user to initialize the accumulator to an arbitrary seed value. In the following example, the user sets the seed to 5, and thus computes 5 + the sum of squares of integers in a sequence.
public static int SumSquaresPlus5(IEnumerable<int> source)
{
return source.Aggregate(5, (sum, x) => sum + x * x, (sum) => sum);
}
Unfortunately, if we parallelize this query, several threads will split up the input, and each will initialize its accumulator to 5. As a result, 5 will be added to the result as many times as there are threads, and the computed answer will be incorrect.
Can PLINQ do something to fix this problem?
Nonsolution 1: Initialize one accumulator to the seed and the rest of them to the default value of T.
For example, if the input contains integers, why not initialize one thread’s accumulator to the userprovided seed, and the other accumulators to 0? The problem is that while 0 is a great initial accumulator value for some aggregations, such as sum, it does not work at all for other aggregations. One such operation is product: if we initialize the accumulator
I don’t understand why you’re building the Aggregate method that complicated. If you make func a binary method of the form TAccumulate x TAccumulate > TAccumulate, you’re getting an algebraic structure.
So, (TAccumulate, func) has to be a commutative Monoid which implies all the properties you described.
This can be easily achieved by introducing a valueSelector: TSource > TAccumulate.
You’re getting the following method signature:
public static TResult Aggregate<TSource, TAccumulate, TResult>(
this IEnumerable<TSource> source,
Func<TSource, TAccumulate> valueSelector,
TAccumulate identityElement,
Func<TAccumulate, TAccumulate, TAccumulate> func,
Func<TAccumulate, TResult> resultSelector
)
{
TAccumulate accumulator = identityElement;
foreach (TSource elem in source)
{
accumulator = func(accumulator, valueSelector(elem));
}
return resultSelector(accumulator);
}
Now you can split up the source and compute threadlocal accumulators and use also func(accumulator1, accumulator2) to combine them.
I do not like the fact to provide two different accumulator functions. It makes no sense either because the second method must be quasi the same as the first because of the nondeterminism (i.e. every value could be processed by a seperate thread).
With this version, you can implement the Aggregate method like a binary tree where the two childs are always independent from the other.
func(
func(
valueSelector(elem0),
valueSelector(elem1))
func(
valueSelector(elem2),
valueSelector(elem3)))
The leaves of the tree are just calls to valueSelector(elem). In this tree you have always the option to run a child in an seperate thread.
Thanks for your suggestion, Thomas.
We have thought about the approach you are proposing previously. I agree that it is logically cleaner, and has all the desired algebraic properties. Unfortunately, it also comes with a price: an extra delegate invocation for each element in the input.
Another possible issue is that your proposed approach would rule out aggregations with mutable accumulators. For example, say that you have an aggregation over an IEnumerable<char>, and you want to compute how many times each letter (AZ) appears. TAccumulate might be an integer array with 26 elements, representing the count of each letter. Value selector would have to return an array of 26 integers, so we’d end up constructing an array for each input element. On the other hand, a (TAccumulate, TElement)>TAccumulate reduce function could simply increment one element in the array, and return the same array.
Note that as of the CTP release, PLINQ does not support mutable accumulators, but it is an extension we are considering. We would need an overload where the user provides a seedConstructor delegate instead of a seed instance, so that we can have multiple independent accumulators.
So, that’s my take on this issue. However, the Aggregate design is far from closed, and we are are open to consider other solutions.
Igor
Another issue with simply mapping TSource => TAccumator is that operation of combining TSource’s may be singificantly cheaper than combining TAccumator’s (think Maxtrixmatrix multiplication instead of matrixvector multiplication).
A different use of a function mapping TSource>TAccumaultor is to avoid the change of "seed" to "identify". Seed is nice because it gives semantics to zerolength sequences. A mapping from source to accumulator can be used to handle sequences of lenght 1 and avoid the need for a separate zero element or change in semantics of the seed elelement.
Late comment here… We have used the mutable version of aggregate to prepare and then Union hash sets of values into a single set, effectively an input.selectmany(e => e).distinct(). After tweaking the degree of parallelism to be suitable to the number of input arrays, the Plinq aggregate outperforms exactly the same algorithm written as two parallel for loops with equivalent batch size. The reason appears to be that each batch in the parallel for engenders a larger overhead than each task created by the Plinq aggregate. Can this be correct, and if so, how and why?