Avoiding Contention using Combinable Objects

When attempting to parallelize an algorithm, programmers are frequently thwarted by the presence of shared state. Any state that can potentially be modified by multiple threads simultaneously during a parallel operation must be synchronized somehow to prevent race conditions, data corruption, and all kinds of nasty behaviour. If the shared state is modified infrequently, the easiest and arguably best solution is simply to lock it using a critical section or some other simple mutual exclusion primitive. However, if the shared state is modified very frequently, then this approach will exhibit a correspondingly frequent amount of lock contention, which will ultimately result in a loss of scalability. Sometimes this loss of scalability is so severe that the parallel algorithm will run more slowly than the serial version.

One common pattern that programmers use to reduce the effect of shared state is to split the data into separate thread-local pieces. Each thread in the parallel algorithm then operates only on its thread-local piece, thus removing all contention on it. After the parallel algorithm completes, all the thread-local pieces are combined to produce the final result. If the operations performed on this shared state are associative (i.e., (x + y) + z = x + (y + z)) and commutative (i.e., x + y = y + x) , this pattern will produce the same result as the serial algorithm. In Microsoft’s Parallel Pattern Library, this pattern is exposed via the combinable<T> construct.

To illustrate, let us use a very simple example:

int sum = 0;

for (vector<int>::iterator it = myVec.begin(); it != myVec.end(); ++it) {

    int element = *it;

    SomeFineGrainComputation(element);

    sum += element;

}

 

Now we would like to parallelize this. Using the Microsoft Parallel Pattern Library, we can simply replace the for loop with a call to parallel_for_each, and use C++’s fancy new lambda syntax:

int sum = 0;

parallel_for_each(myVec.begin(), myVec.end(), [&sum] (int element) {

    SomeFineGrainComputation(element);

    sum += element;

});

The parallelized code looks almost as simple (perhaps simpler) than the original code thanks to lambdas. However, as written it has an obvious race condition: multiple threads can be updating our shared “sum” variable at the same time, and this will almost certainly produce an incorrect result. We have to synchronize it somehow. As I mentioned above, the naïve way is to use a mutual exclusion primitive, such as a Windows critical section. This would look like:

CRITICAL_SECTION sumCS;

InitializeCriticalSection(&sumCS);

int sum = 0;

parallel_for_each(myVec.begin(), myVec.end(), [&sum] (int element) {

   SomeFineGrainComputation(element);

   EnterCriticalSection(&sumCS);

   sum += element;

   LeaveCriticalSection(&sumCS);

});

DeleteCriticalSection(&sumCS);

This fixes our race condition and produces the same result as the serial algorithm. However, since the shared “sum” variable is accessed every iteration, the critical section we’ve introduced will cause a lot of contention, quite likely enough to cause the parallel algorithm to run slower than the serial algorithm. (Ninja programmers may note that in this case we could use “InterlockedExchangeAdd” instead of a critical section to achieve the same effect; however, this only works for integers. What if the shared state was not an integer or we were doing some operation other than “+”? As well, although interlocked operations are usually faster than full-blown locks, they still have a large negative impact on performance because they involve expensive cache coherency operations.)

Using Microsoft’s Parallel Pattern Library and the combinable<T> object we can virtually eliminate the shared state altogether and thus eliminate the need for a lock or an interlocked operation. The initial portion of the modified parallel algorithm looks strikingly similar to the original serial algorithm above:

combinable<int> localSums;

parallel_for_each(myVec.begin(), myVec.end(), [&localSums] (int element) {

   SomeFineGrainComputation(element);

   localSums.local() += element;

});

The body of the loop now performs its computation on a thread-local integer by calling “localSums.local()”. This member function returns a reference to the thread local integer, so the body of the loop can perform whatever operation(s) on it are necessary, subject to the associativity and commutativity requirements. No locks are required and we have virtually eliminated all contention.

Hey wait a minute, didn’t we simply replace a shared “int” variable with a shared “combinable<int>” variable? How has this solved our contention problem? Yes, each thread still operates on a shared combinable object. However, the combinable object employs a simple trick such that synchronization (i.e., a fence) is only required the first time a thread-local sub-computation is initialized. Subsequent accesses to the thread-local value are fenceless. (For example, if a 10000 iteration loop is parallelized over 8 threads, there will be only 8 fences.) The combinable object hides all this trickiness so you don’t have to be concerned about it.

Now the only thing missing is the final sum. When this parallel algorithm finishes, it has computed some number of thread-local sub-computations which we must combine (hence the name combinable) to produce the final result. Using lambdas again, this looks like:

int sum = localSums.combine([] (int left, int right) {

    return left + right;

});

Or we could be even more succinct by using the “std::plus” functor from the standard <functional> header:

int sum = localSums.combine(std::plus<int>);

The performance improvement that can be realized by using combinable varies, of course, depending on how much the contention dominates the other computations done during the parallel algorithm. In extreme cases (e.g. no other work done except updating shared state), my 4-core machine showed a 5-7x speedup over using a critical section. The improvement for the same extreme case was even more dramatic on a 24-core machine we have in our lab: over 100x faster!

The combinable object has a second combine function “combine_each”. Consider a slightly different algorithm that collects items into a result set<int>.

combinable<set<int>> localSets;

parallel_for_each(myVec.begin(), myVec.end(), [&localSets] (int element) {

    if (SomeFineGrainComputation(element))

        localSets.local().insert(element);

});

If we were to use the binary combine function above, it would look like:

set<int> result = localSets.combine([] (const set<int>& left,

                 const set<int>& right) -> set<int> {

    set<int> temp = left;

    temp.insert(right.begin(), right.end());

    return temp;

});

 

Unfortunately, this form of combine will create and destroy lots of “set<int>” objects, which would be very expensive and wasteful. The “combine_each” function will accumulate each thread-local sub-computation into a final result using no temporary values:

set<int> result;

localSets.combine_each([&result] (const set<int>& localSet) {

    result.insert(localSet.begin(), localSet.end());

});

The combinable<T> object is a very simple but powerful construct that can help eliminate shared state in parallel algorithms that would otherwise limit scalability. It adds very little extra complexity, and can be many times faster than alternative solutions that require traditional synchronization primitives. While it is optimized for use with the Microsoft Parallel Pattern Library, it also works just fine in any other parallel algorithm, whether it’s built on OpenMP or your own hand-rolled threading.