Fork/Join parallelism with .NET CountdownEvent

cristina manu

A common asynchronous pattern in code today is the pattern known as fork/join parallelism. This typically manifests by starting n pieces of work and later joining with that work.

The existing set of the .NET synchronization primitives does not offer a simple solution for handling this common scenario. It can be achieved, for example, by using a ManualResetEvent and a separate count that’s manipulated with Interlocked operations, but the implementations are not trivial, and they easily could lead to race conditions and other potential problems.

Parallel Extensions to the .NET Framework introduces several new synchronization and coordination primitives, one of which is meant to address this common pattern: CountdownEvent.

The CountdownEvent is a synchronization primitive that allows a thread to wait until one or more threads finish some other operations. It is similar in concept to the CountDownLatch that Java supports.

CountdownEvent is initialized with an integral count, typically greater than 0. Threads can block waiting on the event until the count reaches 0, at which point the CountdownEvent will be set. The count is decremented using CountdownEvent’s Decrement method, which is typically called by a worker thread completing a unit of work being tracked by the event.

CountdownEvent’s Decrement method has two overloads. The simplest overload decrements the count by 1, whereas the other overload allows the caller to provide a value signifying how much to decrement the count. The result with either is the same, however: when the CountdownEvent‘s count reaches 0, any waiting threads will be woken up.

CountdownEvent ctDownEv = new CountdownEvent(10);

ctDownEv.Decrement(5);

In order to improve performance and scalability, a spin-waiting mechanism is built into CountdownEvent’s Wait method. This can help for scenarios where waits will be satisfied very quickly, and thus the extra cost of busy waiting a little is less than the costs associated with unnecessary context switches and kernel transitions.

While CountdownEvent does offer a WaitHandle property, it is only intended for scenarios where a true WaitHandle is needed (such as when using WaitHandle.WaitAll to wait on a CountdownEvent and another type of primitive);  for most waiting scenarios, you should plan to use the Wait methods

Using a CountdownEvent to implement a fork/join scenario could be written like:

const int NUMBER_OF_WORKER_THREADS = 10;

void CountDownEventForkJoin()

        {

            using (CountdownEvent signalEv =

            new CountdownEvent(NUMBER_OF_WORKER_THREADS – 1))

            {

                for (int i = 1; i < NUMBER_OF_WORKER_THREADS; i++)

                {

                    int currentIndex = i;

                    ThreadPool.QueueUserWorkItem(

                        delegate

                        {

                            try

                            {

                                DoSomeWork(currentIndex);

                            }

                            finally

                            {

                                signalEv.Decrement();

                               //more work can be done here if needed

                            }

                        });

                }

                //main thread do some work

                DoSomeWork(0);

                //waits for the worker threads 

                signalEv.Wait();

            }

        }

For supporting scenarios when the developer does not know from the beginning how many asynchronous operations will be used, the CountdownEvent offers an Increment method. A worker thread can decide to split its work in more parts that will be run in parallel for better performance. (However, the Increment method cannot be called on an already signaled event.)

Such a scenario could be implemented like below:

void CountDownEventIncrement()

        {

            string[] myData = new string[]{“a”, “b”, “c”};

            //start with only 1 intended worker threads

            using (CountdownEvent signalEv = new CountdownEvent(1))

            {        

                    //split the worker threads

                    foreach (string s in myData)

                    {

                        //a new worker will do some work

                        signalEv.Increment();

                        ThreadPool.QueueUserWorkItem(

                            delegate(object args)

                            {

                                try

                                {

                                   DoSomeWork((string)args);

                                }

                                finally

                                {

                                    //worker ended

                                    signalEv.Decrement();

                                }

                            }, s );

                    }

                    signalEv.Decrement();

                signalEv.Wait();

            }

        }

0 comments

Discussion is closed.

Feedback usabilla icon