Avoiding Contention using Combinable Objects

When attempting to parallelize an algorithm, programmers are frequently thwarted by the presence of shared state.  Any state that can potentially be modified by multiple threads simultaneously during a parallel operation must be synchronized somehow to prevent race conditions, data corruption, and all kinds of nasty behaviour.  If the shared state is modified infrequently, the easiest and arguably best solution is simply to lock it using a critical section or some other simple mutual exclusion primitive.  However, if the shared state is modified very frequently, then this approach will exhibit a correspondingly frequent amount of lock contention, which will ultimately result in a loss of scalability.  Sometimes this loss of scalability is so severe that the parallel algorithm will run more slowly than the serial version.

One common pattern that programmers use to reduce the effect of shared state is to split the data into separate thread-local pieces.  Each thread in the parallel algorithm then operates only on its thread-local piece, thus removing all contention on it.  After the parallel algorithm completes, all the thread-local pieces are combined to produce the final result.  If the operations performed on this shared state are associative (i.e., (x + y) + z = x + (y + z)) and commutative (i.e., x + y = y + x), this pattern will produce the same result as the serial algorithm.  In Microsoft’s Parallel Pattern Library, this pattern is exposed via the combinable<T> construct.

To illustrate, let us use a very simple example:

int sum = 0;

for (vector<int>::iterator it = myVec.begin(); it != myVec.end(); ++it) {

int element = *it;

SomeFineGrainComputation(element);

sum += element;

}

Now we would like to parallelize this.  Using the Microsoft Parallel Pattern Library, we can simply replace the for loop with a call to parallel_for_each, and use C++’s fancy new lambda syntax:

int sum = 0;

parallel_for_each(myVec.begin(),  myVec.end(), [&sum] (int element) {

SomeFineGrainComputation(element);

sum += element;

});

The parallelized code looks almost as simple (perhaps simpler) than the original code thanks to lambdas.  However, as written it has an obvious race condition:  multiple threads can be updating our shared “sum” variable at the same time, and this will almost certainly produce an incorrect result.  We have to synchronize it somehow.  As I mentioned above, the naïve way is to use a mutual exclusion primitive, such as a Windows critical section.  This would look like:

CRITICAL_SECTION sumCS;

InitializeCriticalSection(&sumCS);

int sum = 0;

parallel_for_each(myVec.begin(),  myVec.end(), [&sum] (int element) {

SomeFineGrainComputation(element);

EnterCriticalSection(&sumCS);

sum += element;

LeaveCriticalSection(&sumCS);

});

DeleteCriticalSection(&sumCS);

This fixes our race condition and produces the same result as the serial algorithm.  However, since the shared “sum” variable is accessed every iteration, the critical section we’ve introduced will cause a lot of contention, quite likely enough to cause the parallel algorithm to run slower than the serial algorithm.  (Ninja programmers may note that in this case we could use “InterlockedExchangeAdd” instead of a critical section to achieve the same effect; however, this only works for integers.  What if the shared state was not an integer or we were doing some operation other than “+”?  As well, although interlocked operations are usually faster than full-blown locks, they still have a large negative impact on performance because they involve expensive cache coherency operations.)

Using Microsoft’s Parallel Pattern Library and the combinable<T> object we can virtually eliminate the shared state altogether and thus eliminate the need for a lock or an interlocked operation.  The initial portion of the modified parallel algorithm looks strikingly similar to the original serial algorithm above:

combinable<int> localSums;

parallel_for_each(myVec.begin(),  myVec.end(), [&localSums] (int element) {

SomeFineGrainComputation(element);

localSums.local() += element;

});

The body of the loop now performs its computation on a thread-local integer by calling “localSums.local()”.  This member function returns a reference to the thread local integer, so the body of the loop can perform whatever operation(s) on it are necessary, subject to the associativity and commutativity requirements.  No locks are required and we have virtually eliminated all contention.

Hey wait a minute, didn’t we simply replace a shared “int” variable with a shared “combinable<int>” variable?  How has this solved our contention problem?  Yes, each thread still operates on a shared combinable object.  However, the combinable object employs a simple trick such that synchronization (i.e., a fence) is only required the first time a thread-local sub-computation is initialized.  Subsequent accesses to the thread-local value are fenceless.  (For example, if a 10000 iteration loop is parallelized over 8 threads, there will be only 8 fences.)  The combinable object hides all this trickiness so you don’t have to be concerned about it.

Now the only thing missing is the final sum.  When this parallel algorithm finishes, it has computed some number of thread-local sub-computations which we must combine (hence the name combinable) to produce the final result.  Using lambdas again, this looks like:

int sum = localSums.combine([] (int left, int right) {

return left + right;

});

Or we could be even more succinct by using the “std::plus” functor from the standard <functional> header:

int sum = localSums.combine(std::plus<int>);

The performance improvement that can be realized by using combinable varies, of course, depending on how much the contention dominates the other computations done during the parallel algorithm.  In extreme cases (e.g. no other work done except updating shared state), my 4-core machine showed a 5-7x speedup over using a critical section.  The improvement for the same extreme case was even more dramatic on a 24-core machine we have in our lab:  over 100x faster!

The combinable object has a second combine function “combine_each”.  Consider a slightly different algorithm that collects items into a result set<int>.

combinable<set<int>> localSets;

parallel_for_each(myVec.begin(), myVec.end(), [&localSets] (int element) {

if (SomeFineGrainComputation(element))

localSets.local().insert(element);

});

If we were to use the binary combine function above, it would look like:

set<int> result = localSets.combine([] (const set<int>& left,

const set<int>& right) -> set<int> {

set<int> temp = left;

temp.insert(right.begin(), right.end());

return temp;

});

Unfortunately, this form of combine will create and destroy lots of “set<int>” objects, which would be very expensive and wasteful.  The “combine_each” function will accumulate each thread-local sub-computation into a final result using no temporary values:

set<int> result;

localSets.combine_each([&result] (const set<int>& localSet) {

result.insert(localSet.begin(), localSet.end());

});

The combinable<T> object is a very simple but powerful construct that can help eliminate shared state in parallel algorithms that would otherwise limit scalability.  It adds very little extra complexity, and can be many times faster than alternative solutions that require traditional synchronization primitives.  While it is optimized for use with the Microsoft Parallel Pattern Library, it also works just fine in any other parallel algorithm, whether it’s built on OpenMP or your own hand-rolled threading.

1. Axel says:

Great, but where can we find the Parallel Pattern Library?

2. rickmolloy says:

We have not announced any ship or CTP dates for the PPL at this time. Currently this blog and the ideas discussed are still under investigation.

3. Tom Kirby-Green says:

Joe Duffy has a preview of .NET 4 in the appendix of his upcoming Concurreny on Windows book. Can we expect coverage of the PPL there? Is there any chance PDC attendees might be leaving with a preview of this stuff?

Tom, Rick will be previewing much of the PPL at the PDC in October.  You can check http://microsoftpdc.com for a list of relevant sessions we’ll be giving.

5. Bharath says:

I understood till here. But the below is confusing to read:

int sum = localSums.combine([] (int left, int right) {

return left + right;

});

Does this mean the combine is going sweep the thread locals. Or who is providing left and right params

Can u be more descriptive on what combine does?

Bharath, at the end of the parallel algorithm, my combinable object called "localSums" will have a set of thread-local values.  The "combine" method here just walks (or "sweeps", to use your term) through them and applies the supplied binary function object.  For example, pretend our combine functor is std::plus<int>, and that you have 4 values, [v1, v2, v3, v4].  Then the combine step will behave as if:

result = std::plus<int>(v1, v2);

result = std::plus<int>(result, v3);

result = std::plus<int>(result, v4);

return result;

(Except that we make no guarantee about the order we visit the values.)

Hope that helps.

7. Dmitriy V'jukov says:

parallel_for_each(myVec.begin(),  myVec.end(), [&sum] (int element)

What is the default grain size for parallel_for_each()? Can it be changed? Or rather how it be changed?

If default grain size is 1, then numbers you provide (my 4-core machine showed a 5-7x speedup) are basically senseless. If all done correctly one must see speedup around 50x on quad-core machine.

8. Dmitriy V'jukov says:

"However, if the shared state is modified very frequently, then this approach [critical sections] will exhibit a correspondingly frequent amount of lock contention, which will ultimately result in a loss of scalability"

It’s not critical sections the cause of performance degradation and negative scalability. It’s frequent modification of shared data itself.

You can make frequent modification of shared data with atomic RMW operations, or even with plain stores, no matter, result will be the same – dead-slow and negative scalability.

9. Dmitriy V'jukov says:

I’m curious how combinable<> is implemented. Can you provide some details?

AFAIK, there is no optimal general solution (general in the sense that number of threads is arbitrary, number of combinable<> objects is arbitrary, the fact that objects vary in size further complicate situation).

One can use OS TLS. But TLS slots will be quickly exhausted.

One can use something like std::set<thread_id_t, T> internally. But it will be impossible to find required element quickly.

One can use per-thread array indexed by ‘combinable_id’. But this can waste a lot of memory.

Is combinable<>::local() make some sort of lookup in the list?

10. Dmitriy V'jukov says:

"Ninja programmers may note that in this case we could use “InterlockedExchangeAdd” instead of a critical section to achieve the same effect"

Probably you mean amateur whitebelt ninja programmers. Real blackbelt ninja master will instantly suggest to distribute data, because he knows that atomic RMW operations won’t anyhow help here 😉

11. In this blog I will be giving an introduction to the native concurrency support in Visual Studio 2010.

12. Visual Studio 2010 Beta1 has been released, and it is a full install version . The team has been busy,