ParallelExtensionsExtras Tour – #7 – Additional TaskSchedulers

Stephen Toub - MSFT

(The full set of ParallelExtensionsExtras Tour posts is available here.)

In our last two ParallelExtensionsExtras blog tour posts, we’ve discussed two TaskScheduler implementations in ParallelExtensionsExtras: StaTaskScheduler and ConcurrentExclusiveInterleave.  These are just 2 of more than 10 schedulers in ParallelExtensionsExtras, and rather than explore the rest in-depth, in this post we’ll provide a short description of other key schedulers included.

QueuedTaskScheduler

Found in the QueuedTaskScheduler.cs file, QueuedTaskScheduler provides a wealth of functionality all wrapped up into a single scheduler type.  It supports:

  • Priorities.  You create a queue of off the scheduler with a particular priority, and that queue is itself a TaskScheduler.  Any tasks you schedule to that scheduler then are tagged with that priority, and the scheduler will service tasks in priority order.  e.g.

QueuedTaskScheduler qts = new QueuedTaskScheduler();
TaskScheduler pri0 = qts.CreateQueue(priority:0);
TaskScheduler pri1 = qts.CreateQueue(priority:1);

Any tasks scheduled to pri0 will get priority over tasks scheduled to pri1, even if scheduled after.

  • Fairness.  It’s often the case that you have batches of work, and you want each of these batches to be treated fairly with each other, such that if a large batch arrives and then a small batch arrives, the small batch doesn’t have to wait until the entire large batch is completed; instead, tasks from both the large and small batches will be scheduled fairly and round-robin’d between.  The queues created on a QueuedTaskScheduler are scheduled in just such a manner, also taking into account priorities.  e.g.

QueuedTaskScheduler qts = new QueuedTaskScheduler();
TaskScheduler pri0a = qts.CreateQueue(priority:0);
TaskScheduler pri0b = qts.CreateQueue(priority:0);
TaskScheduler pri1 = qts.CreateQueue(priority:1);

Tasks scheduled to both pri0a and pri0b will take priority over tasks scheduled to pri1.  However, tasks scheduled to pri0a and pri0b will be round-robin’d between, as they exist at the same priority level.

  • Concurrency Levels.  In a large system, you may want to control how much parallelism is afforded to different parts of the system.  With parallel loops and PLINQ queries, you can control this on a per-loop or per-query basis, but out-of-the-box there’s no way to control it across loops, and there’s no built-in way to control it for tasks.  By scheduling all related work to a TaskScheduler that enforces a maximum concurrency level, that functionality is gained.

var qts = new QueuedTaskScheduler(
    TaskScheduler.Default, maxConcurrencyLevel:4);
var options = new ParallelOptions { TaskScheduler = qts };

Task.Factory.StartNew(() =>
{
    Parallel.For(0, 100, options, i=> { … });
}, CancellationToken.None, TaskCreationOptions.None, qts);

Task.Factory.StartNew(() =>
{
    Parallel.For(0, 100, options, i=> { … });
}, CancellationToken.None, TaskCreationOptions.None, qts); 

Both tasks and the parallel loops they contain will be limited to a maximum concurrency level of four.

  • Thread control.  The priorities, fairness, and concurrency level control all apply when QueuedTaskScheduler is used on top of another TaskScheduler as well as when used with dedicated threads for the scheduler.  However, QueuedTaskScheduler also provides very low-level control over the threads utilized by the scheduler when dedicated threads are requested.  Here’s the relevant constructor, which provides insight into the various knobs provided:

public QueuedTaskScheduler(
    int threadCount,
    string threadName = “”,
    bool useForegroundThreads = false,
    ThreadPriority threadPriority = ThreadPriority.Normal,
    ApartmentState threadApartmentState = ApartmentState.MTA,
    int threadMaxStackSize = 0,
    Action threadInit = null,
    Action threadFinally = null);

IOTaskScheduler

While we often refer to “the” .NET thread pool, the ThreadPool abstraction in .NET is actually built on top of two pools, one referred to as the worker pool and one referred to as the I/O pool.  The former is what’s targeted by the QueueUserWorkItem method as well as by the default TaskScheduler, while the latter is targeted by the UnsafeQueueNativeOverlapped method, and is frequently used for work in Windows Communication Foundation and Windows Workflow Foundation.  The IOTaskScheduler scheduler in the IOTaskScheduler.cs file runs tasks on this I/O thread pool via the UnsafeQueueNativeOverlapped method.

WorkStealingTaskScheduler

In .NET 4, the ThreadPool was augmented with “work-stealing” algorithms.  However, the ThreadPool does not allow you to create instances of pools, only to share the global pool.  If you desire individual pool instances with a dedicated set of threads, and if you also want work-stealing, the WorkStealingTaskScheduler class in the WorkStealingTaskScheduler.cs file is your friend.

SynchronizationContextTaskScheduler

The TaskScheduler.FromCurrentSynchronizationContext static method returns a TaskScheduler instance that wraps the SynchronizationContext instance returned from SynchronizationContext.Current.  In rare circumstances, however, you might want to wrap an arbitrary SynchronizationContext that is not set as the current context.  SynchronizationContextTaskScheduler.cs contains a scheduler that does exactly that.  When a Task is queued to this scheduler, it’s added to an internal concurrent queue, and a delegate is then posted to the target SynchronizationContext, which was passed into the scheduler’s constructor:

protected override void QueueTask(Task task)
{
    _tasks.Enqueue(task);
    _context.Post(delegate
    {
        Task nextTask;
        if (_tasks.TryDequeue(out nextTask)) TryExecuteTask(nextTask);
    }, null);
}

We could have skipped storing the task, and instead passed it directly to the delegate via a closure.  We opted for the shown approach, however, in order to enable debugger integration, so that the GetScheduledTasks method can return the contents of the queue:

protected override IEnumerable<Task> GetScheduledTasks()
{
    return _tasks.ToArray();
}

ThreadPerTaskScheduler

In rare cases, it’s desirable to dedicate a thread to each individual task, rather than having tasks share a pool of threads.  For this, the ThreadPerTaskScheduler.cs file contains a very simple ThreadPerTaskScheduler class that does exactly what its name implies, spinning up a dedicated thread for each task that gets queued to the scheduler.  Here is its QueueTask method:

protected override void QueueTask(Task task)
{
    new Thread(() => TryExecuteTask(task)) { IsBackground = true }.Start();
}

CurrentThreadTaskScheduler

Largely for testing purposes, a synchronous scheduler is provided in CurrentThreadTaskScheduler.cs that runs all tasks on the current thread when scheduling is requested.  The implementation is trivial. Here is its QueueTask method:

protected override void QueueTask(Task task)
{
    TryExecuteTask(task);
}

LimitedConcurrencyLevelTaskScheduler, OrderedTaskScheduler, and ReprioritizableTaskScheduler

The LimitedConcurrencyLevelTaskScheduler.cs, OrderedTaskScheduler.cs, and ReprioritizableTaskScheduler.cs files contain schedulers that are largely limited variants of the QueuedTaskScheduler class.  LimitedConcurrencyLevelTaskScheduler sits on top of the ThreadPool and supports a user-provided maximum degree of concurrency.  OrderedTaskScheduler derives from LimitedConcurrencyLevelTaskScheduler and simply sets that maximum degree of concurrency to one, which guarantees that tasks are processed in the order that they were scheduled.  ReprioritizableTaskScheduler is a simple first-in-first-out scheduler on top of the ThreadPool, except that it supports moving tasks around in the queue after they’ve been queued; this effectively enables prioritizing and deprioritizing previously queued tasks.

0 comments

Discussion is closed.

Feedback usabilla icon