Introducing ConcurrentStack

cristina manu

A common problem users run into when writing parallel applications is the lack of the thread-safety support in the .NET collection classes. Users typically need to implement their own synchronization mechanism for achieving the goal of safely reading/writing data to the same shared collection.

A largely deprecated solution was to use the synchronized collections introduced in the .NET Framework 1.0, or to use the SyncRoot mechanism exposed through the ICollection interface.

However, both of these approaches are not recommended to be used, for a variety of reasons, including less-than-ideal performance, but also because by using them, developers can set themselves up for a multitude of race conditions. See https://blogs.msdn.com/bclteam/archive/2005/03/15/396399.aspx for a discussion of these design issues.

Because of all of these reasons, the generic collections introduced in .NET Framework 2.0 do not offer a mechanism of synchronization anymore (i.e. they don’t provide a static Synchronized method), forcing the developers to do the synchronization manually.

Parallel Extensions to the .NET Framework aims to fill this gap by introducing new thread-safe generic collections that don’t suffer from the same issues as their antiquated “Synchronized” relatives. ConcurrentStack<T> is one of them.

In the current moment ConcurrentStack<T> is implemented as a LIFO linked list and uses an interlocked compare exchange for the Push and Pop actions. However, it not recommended as the user to rely on the internal implementation details.

The main and most used methods of a stack data structure are usually Push, Pop, and Peek. A quick look at ConcurrentStack<T> will show that it provides Push, but not Pop and Peek: instead, it provides a TryPop and TryPeek instead. This is quite intentional. One of the most common patterns when using a Stack<T> is to check the stack’s Count, and if it’s greater than 0, pop an item from it and use that item, e.g.:

T item;
if (s.Count > 0)
{
item = s.Pop();
UseData(item);
}

But in a world where this stack is being accessed by multiple threads concurrently, even if the individual Count and Pop methods were thread-safe, we still run into the issue that the stack could be emptied between a successful non-emptiness check and the attempt to pop. ConcurrentStack<T> takes this into account in the APIs it provides. To safely attempt to pop an item from the stack, we can instead write the code:

T item;
if (s.TryPop(out item))
{
UseData(item);
}

In this way using a ConcurrentStack<T> a producers/consumer scenario could be implemented like below:

//initialize an empty concurrent stack

ConcurrentStack<string> m_myConcurrentStack = new ConcurrentStack<string>();

//every producer will produce this number of elements

        const int COUNT = 10;

        public void ConsumeDataFromStack()

        {

            //create the producers

            for (int i = 0; i < COUNT; i++)

            {

         Thread currentProducer = new Thread(new ThreadStart(delegate()

                {

                    for(int currentIndex = COUNT; currentIndex > 0; currentIndex–)

                    {

                        m_myConcurrentStack.Push(

                           Thread.CurrentThread.ManagedThreadId.ToString() + “_” +

                           currentIndex.ToString());

                    }

                }));

                currentProducer.Start();

            }

            //allow the worker threads to start

            Thread.Sleep(500);

            //consume data

            string currentData = “”;

            while (m_myConcurrentStack.TryPop(out currentData))

            {

                //ConsumeData(currentData);

            }

        }

PLINQ queries over a concurrent stack

Being an IEnumerable<T> a concurrent stack can be used as data source in PLINQ queries as well.

Below is a sample of such usage.

volatile bool m_producersEnded = false;

const int COUNT = 10;

public void PLinqOverConcurrentStack()

        {

            //create the producers

            Thread[] producers = new Thread[COUNT];

            for (int i = 0; i < COUNT; i++)

            {

                Thread currentProducer = new Thread(new ThreadStart(delegate()

                {

                    for(int currentIndex = COUNT; currentIndex > 0; currentIndex–)

                    {

                        m_myConcurrentStack.Push(currentIndex.ToString());

                    }

                }));

                producers[i] = currentProducer;

            }

            Thread PLINQConsumer = new Thread(new ThreadStart(delegate()

            {

                while (!m_producersEnded)

                {

                    var currentValues = from data in m_myConcurrentStack 

                    where data.Contains(“9”) select data;

                    foreach (string currentData in currentValues)

                    {

                        //consume data

                    }

                }

            }));

            //start the consumer

            pLinqConsumer.Start();

            //start the producers

            foreach (Thread producer in producers)

            {

                producer.Start();

            }

            //join the producers and the consumer

            foreach (Thread producer in producers)

            {

                producer.Join();

            }

            m_producersEnded = true;

            pLinqConsumer.Join();

        }

0 comments

Discussion is closed.

Feedback usabilla icon