More Powerful Aggregations in PLINQ

In the June 2008 CTP, PLINQ aggregations are more powerful than they were in the December 2007 CTP. The reason why they are more powerful is a bit subtle, but this new power enables many useful scenarios, so it is worth it to follow along with the explanation.

To understand where the difference is coming from, let’s quickly review how aggregations work in LINQ to objects. As an example, consider a possible implementation of the Average operator, sans error handling:

    public static double Average(this IEnumerable<int> source)

    {

        return source.Aggregate(

            new double[2],

            (acc, elem) => {

                acc[0] += elem; acc[1]++; return acc;

            },

            acc => acc[0] / acc[1]);

    }

 

The call to Aggregate takes three arguments:

          The initial accumulator value.

          A fold function that updates the accumulator for each element.

          A result conversion function that converts the final accumulator value to the actual answer.

 

Aggregate will initialize an accumulator to the value passed in the first argument: an array containing two doubles. Then, it will iterate over all elements in the input sequence, and maintain the sum of the elements seen so far in the first element of the accumulator array, and their count in the second element.  Finally, to produce the desired answer, Aggregate will call the result conversion function, which divides the sum by the count, and returns the result.

 

Unfortunately, to parallelize the above example in PLINQ, it is not sufficient to simply add AsParallel on the data source. The problem is with the first argument in the Aggregate call. The user only gives us one array of doubles, and then reads and writes to it in the fold function. That is all fine if the fold function is always called from the same thread (and thus sequentially), but breaks down if we have multiple threads calling it concurrently.

 

In the December CTP, the aggregation APIs provided assumed that the user will never do such a thing. We always expected the fold function to return a new instance of the enumerator, like this:

 

    public static double Average(this IEnumerable<int> source)

    {

        return source.AsParallel().Aggregate(

            new double[2],

            (acc, elem) => {

                return new double[] { acc[0] + elem, acc[1] + 1 };

            },

            (acc1, acc2) => {

                return new double[] { acc1[0] + acc2[0], acc1[1] + acc2[1]};

            },

            acc => acc[0] / acc[1]);

    }

Notice that to help parallelize the aggregation, the user also gives us a function to combine two accumulators. That way, after each thread has iterated over a subset of the input, we can combine the partial answers to get the final result.

 

However, the main point of this example is to illustrate inefficiency in the above code. A small array has to be allocated for each element in the input. That is very likely to significantly hurt the performance of the aggregation, especially for aggregations like this one where the work involved in the actual aggregation operation (in this case, a few additions) is minimal.

 

To address this situation, the June 2008 CTP of PLINQ supports a new overload of aggregate. Rather than expecting a value to initialize the accumulator to, the user gives us a factory function that generates the value:

 

    public static double Average(this IEnumerable<int> source)

    {

        return source.AsParallel().Aggregate(

            () => new double[2],

            (acc, elem) => { acc[0] += elem; acc[1]++; return acc; },

            (acc1, acc2) => { acc1[0] += acc2[0]; acc1[1] += acc2[1]; return acc1; },

            acc => acc[0] / acc[1]);

    }

 

Now, PLINQ can initialize an independent accumulator for each thread. Now that each thread gets its own accumulator, both the folding function and the accumulator combining function are free to mutate the accumulators. PLINQ guarantees that accumulators will not be accessed concurrently from multiple threads .

 

Here are several algorithms that can be efficiently implemented with the new Aggregate overload:

 

          Randomly choose N elements from the input.

          Aggregate statistics over the input sequence. E.g., compute the letter frequencies in a string.

          Find N smallest or largest element in the input, for a small N.

 

Also, it is worth pointing out that there is a strong similarity between the new Aggregate overload and the Parallel.For overloads that manipulate thread-local state. They both address the same type of issue – to allow different threads to mutate some local state.

 

Hope you enjoyed reading about the new capabilities of PLINQ aggregations. If you have comments on the API, or ideas on how aggregations apply to problems in your domain, make sure to let us know in the comments!