ParallelExtensionsExtras Tour – #5 – StaTaskScheduler

(The full set of ParallelExtensionsExtras Tour posts is available here.)  

The Task Parallel Library (TPL) supports a wide array of semantics for scheduling tasks, even though it only includes two in the box (one using the ThreadPool, and one using SynchronizationContext, which exists primarily to run tasks on UI threads).  This support comes from the extensibility of the abstract TaskScheduler class, which may be derived from to implement custom scheduling algorithms.  ParallelExtensionsExtras includes a multitude of such derived types, and in our tour through ParallelExtensionsExtras, we’ll walk through a few of them… today, we’ll start with StaTaskScheduler.

In an ideal world, there would be no legacy code; all code would be up-to-date on best practices and would be using the latest and greatest technologies.  In the real world, that’s almost never possible, and we need to be able to accommodate designs of the past.  Single-threaded apartments (STA) from COM represent one such technology we need to be able to integrate with (rather than describing STA here, for more information see http://msdn.microsoft.com/en-us/library/ms680112(VS.85).aspx).  If a components is STA, it must only be used from an STA thread.  However, all of the ThreadPool’s threads are multi-threaded apartment (MTA) threads, and by default Tasks from TPL run on these threads.  The good news is that TPL’s implementation is able to run on either MTA or STA threads, and takes into account relevant differences around underlying APIs like WaitHandle.WaitAll (which only supports MTA threads when the method is provided multiple wait handles).  Given that, we can take advantage of the TaskScheduler API to implement an StaTaskScheduler which runs tasks on STA threads.  This then allows you to use Tasks with STA components.  StaTaskScheduler is implemented in the StaTaskScheduler.cs file in ParallelExtensionsExtras.

Our scheduler will derive from TaskScheduler and will also implement IDisposable in order to enable shutdown of the STA threads being used by the scheduler.  It will contain two fields: a List<Thread> for storing the threads created and used by the scheduler, and a BlockingCollection<Task> for storing all tasks scheduled to the scheduler, enabling the scheduler’s threads to consume and process them.

public sealed class StaTaskScheduler : TaskScheduler, IDisposable

{

    private readonly List<Thread> _threads;

    private BlockingCollection<Task> _tasks;

    …

}

 

When created, the scheduler will instantiate the BlockingCollection<Task> and spin up numberOfThreads threads, where numberOfThreads is provided to the constructor of the type.  Each of the threads is explicitly marked with ApartmentState.STA.  The thread procedure for each thread is a simple dispatch loop that continually takes from the BlockingCollection<Task> (blocking if the collection is empty) and executes the discovered Task.

public StaTaskScheduler(int numberOfThreads)

{

    if (numberOfThreads < 1)

        throw new ArgumentOutOfRangeException(“concurrencyLevel”);

 

    _tasks = new BlockingCollection<Task>();

 

    _threads = Enumerable.Range(0, numberOfThreads).Select(i =>

               {

                   var thread = new Thread(() =>

                   {

                       foreach (var t in

                           _tasks.GetConsumingEnumerable())

                       {

                           TryExecuteTask(t);

                       }

                   });

                   thread.IsBackground = true;

                   thread.SetApartmentState(ApartmentState.STA);

                   return thread;

               }).ToList();

 

    _threads.ForEach(t => t.Start());

}

 

That’s the most complicated part of the implementation.  The QueueTask method on StaTaskScheduler just takes the Task provided by the TPL infrastructure and stores it into the BlockingCollection<Task> (this will be called, for example, during Task.Factory.StartNew):

protected override void QueueTask(Task task)

{

    _tasks.Add(task);

}

 

The GetScheduledTasks method, which is used by the debugging infrastructure in Visual Studio to feed the Parallel Tasks and Parallel Stacks window, returns a copy of the contents of the BlockingCollection<Task>:

protected override IEnumerable<Task> GetScheduledTasks()

{

    return _tasks.ToArray();

}

 

TryExecuteTaskInline is implemented to allow task inlining (running the Task on the current thread, which might happen in a call to Task.Wait, Task.WaitAll, Task<TResult>.Result, or Task.RunSynchronously), but only if the current thread is an STA thread:

protected override bool TryExecuteTaskInline(

    Task task, bool taskWasPreviouslyQueued)

{

    return

        Thread.CurrentThread.GetApartmentState() ==

            ApartmentState.STA &&

        TryExecuteTask(task);

}

 

MaximumConcurrencyLevel returns the number of threads utilized by the scheduler:

public override int MaximumConcurrencyLevel

{

    get { return _threads.Count; }

}

 

And finally, our Dispose implementation marks the BlockingCollection<Task> as complete for adding (so that as soon as the collection is empty, the threads will exit), waits for all threads to exit, and then cleans up the BlockingCollection<Task>.

public void Dispose()

{

    if (_tasks != null)

    {

        _tasks.CompleteAdding();

 

        foreach (var thread in _threads) thread.Join();

 

        _tasks.Dispose();

        _tasks = null;

    }

}

 

With this in place, we can now use our StaTaskScheduler.  Commonly, an StaTaskScheduler will be instantiated with only one thread, so that the same thread is used to process all tasks queued to the scheduler:

var sta = new StaTaskScheduler(numberOfThreads:1);

for(int i=0; i<10; i++)

{

    // All 10 tasks will be executed sequentially

    // due to using only one thread.

    Task.Factory.StartNew(() =>

    {

        … // run code here that requires an STA thread

    }, CancellationToken.None, TaskCreationOptions.None, sta);

}

 

Another approach, depending on the use case, is to use multiple threads, but to dedicate one STA instance to each thread.  For example, let’s say you have an STA object CoolObject that provides a method DoStuff.  You want to be able to have multiple tasks take advantage of DoStuff concurrently, but that can’t be done on the same object instance, and unfortunately CoolObject is relatively expensive to instantiate.  A solution then is to store one CoolObject instance in thread-local storage for each of the scheduler’s threads, and each task can then access that thread-local copy:

var sta = new StaTaskScheduler(numberOfThreads:4);

var coolObject = new ThreadLocal<CoolObject>(() => new CoolObject());

for(int i=0; i<N; i++)

{

    Task.Factory.StartNew(() =>

    {

       

        coolObject.Value.DoStuff();

       

    }, CancellationToken.None, TaskCreationOptions.None, sta);

}

 

In general, it’s desirable to avoid STA objects, but with legacy code that’s a luxury we often can’t afford, and it’s great to be able to still use TPL with such objects.