Updated TPL Dataflow CTP

Stephen Toub - MSFT

It’s been a few months since April when we last released a Community Technology Preview (CTP) of System.Threading.Tasks.Dataflow.dll, aka “TPL Dataflow”.  Today for your programming pleasure, we have another update.

As mentioned in “What’s New for Parallelism in .NET 4.5”, System.Threading.Tasks.Dataflow.dll is part of the .NET Framework 4.5 Developer Preview released last week at the BUILD conference.  In addition to that release, however, we’ve also refreshed the standalone CTP bits available for download on the MSDN DevLabs site at https://msdn.microsoft.com/en-us/devlabs/gg585582. You’ll find that the two DLLs are very similar though not identical. We’re continuing to work hard on getting both the API surface area and the implementation of TPL Dataflow to be just right, addressing your great feedback about the library’s functionality and also focusing on improving performance.  Resulting changes are highlighted in the TPL Dataflow CTP on DevLabs.

In addition to bug fixes around robustness, here are some of the improvements you’ll find:

  • New DataflowBlockOptions properties
    • NameFormat. You can now assign a name to your dataflow block.  This NameFormat property is by default the string “{0} Id={1}”, with the {0} and {1} substitution parameters being replaced at run time with the dataflow block’s type and ID, respectively.  However, you can change this string to be whatever you like (still using these same substitution parameters to your liking), and that name will be available for inspection in the debugger and via ToString.
    • SingleProducerConstrained.  Dataflow blocks by default are usable by any number of threads concurrently.  While flexible, this also places more synchronization requirements, and therefore cost, on the blocks than might otherwise be necessary.  If a block is only ever going to be used by a single producer at a time, meaning only one thread at a time will be using methods like Post, OfferMessage, and Complete on the block, this property may be set to true to inform the block that it need not apply extra synchronization.  For blocks that observe this property, you can significantly reduce synchronization overheads by setting this property to true.  Right now, only ActionBlock pays attention to this property, but more blocks could in the future as necessary.
  • New DataflowLinkOptions type
    • We’ve provided more configuration support around how blocks may be linked together.  We’ve replaced the overloads of LinkTo that accept a Boolean unlinkAfterOne with an overload that accepts an instance of a new type: DataflowLinkOptions.  DataflowLinkOptions currently exposes three properties: Append (defaults to true), MaxMessages (defaults to -1), and PropagateCompletion (defaults to false). The Append option controls whether the link is appended after all existing links or before; by setting this to false, you can instead force new links to take priority over existing links. The MaxMessages option is a replacement for unlinkAfterOne, where -1 is the equivalent to unlinkAfterOne==false and where 1 is the equivalent to unlinkAfterOne==true; however, MaxMessages may also be any positive integer, allowing the link to automatically be disposed of after the specified number of messages have successfully traversed the link. The PropagateCompletion option allows for automatically flowing completion from the source to the target.  If this option is set to true, a block that’s linked from the source during or after its completion will be automatically completed (using either Complete or Fault based on the completion state of the block).
  • New DataflowBlock methods
    • NullTarget<T>() provides a target block that accepts all messages offered to it and immediately drops them.  You can use this as a sink that sucks up any messages not consumed by other targets linked prior to it from a given source.
    • SendAsync and OutputAvailableAsync now have overloads that accept a CancellationToken, enabling you to cancel these operations.
  • Performance
    • The execution blocks ActionBlock, TransformBlock, and TransformManyBlock now internally employ a new queuing data structure optimized for the scenarios in which these blocks are typically employed.  This queue significantly minimizes the amount of garbage allocated by these blocks.
    • We’ve added many “fast paths” that optimize for common cases.  For example, we can avoid a fair number of allocations in DataflowBlock.Choose if data is already available in one of the sources and in SendAsync if the target immediately accepts the data being sent.
    • TransformManyBlock previously completely enumerated the user-returned IEnumerable<TOutput> before allowing any of the elements to be received or to be propagated to a target block.  In most situations, we now allow the values to be received and propagated as soon as the individual element has been enumerated, enabling lower latency.
    • Observers for dataflow block targets (created via AsObserver) will be propagated to asynchronously by an observable for a dataflow block source (created via AsObservable).  This means that multiple targets may be registered with the source through these interfaces, e.g. source.AsObservable(target.AsObserver()), getting broadcasting behavior without blocking threads and while still supporting backpressure via BoundedCapacity.
    • In general, across all of the dataflow blocks (though in particular for the execution blocks) we’ve reduced synchronization and allocation costs, leading to much more efficient processing.

As an example of some of these performance improvements, consider a few microbenchmarks (and as always with microbenchmarks, take these with large grains of salt, both in terms of what’s being measured and in terms of the representativeness of the hardware on which they’re executed).  Here’s a microbenchmark that just sees how fast we can push data through an ActionBlock:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks.Dataflow;

class Program
{
    static void Main()
    {
        var sw = new Stopwatch();
        const int ITERS = 6000000;
        var are = new AutoResetEvent(false);
       
        var ab = new ActionBlock<int>(i => { if (i == ITERS) are.Set(); });
        while (true)
        {
            sw.Restart();
            for (int i = 1; i <= ITERS; i++) ab.Post(i);
            are.WaitOne();
            sw.Stop();
            Console.WriteLine(“Messages / sec: {0:N0}”,
                (ITERS / sw.Elapsed.TotalSeconds));
        }
    }
}

On my 64-bit quad-core 1.6GHz i7 laptop, here are example throughput numbers I see from the April DevLabs CTP and the September DevLabs CTP:

  April CTP Sept CTP
ActionBlock throughput 4,801,434 10,942,715

Now consider a minor modification to this benchmark, simply configuring the ActionBlock to use the new SingleProducerConstrained option:

var ab = new ActionBlock<int>(i => { if (i == ITERS) are.Set(); } , 
    new ExecutionDataflowBlockOptions { SingleProducerConstrained = true});

With that change, I get:

  Sept CTP
ActionBlock throughput 37,456,691

As another example, consider the performance of sending and receiving asynchronously from a bounded buffer block, a common case when implementing asynchronous producer/consumer scenarios where you want to limit production so that producers never get too far ahead of consumers, and doing so in a way where all operations are represented using Tasks (which could then be awaited):

using System;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;

class Program
{
    static void Main()
    {
        var sw = new Stopwatch();
        const int ITERS = 1000000;

        var bb = new BufferBlock<int>(
            new DataflowBlockOptions { BoundedCapacity = 1 });
        while (true)
        {
            sw.Restart();
            for (int i = 0; i < ITERS; i++)
            {
                bb.SendAsync(i);
                bb.ReceiveAsync();
            }
            sw.Stop();
            Console.WriteLine(“Messages / sec: {0:N0}”,
                (ITERS / sw.Elapsed.TotalSeconds));
        }
    }
}

On that same machine with the two different builds, I see the following:

  April CTP Sept CTP
SendAsync / ReceiveAsync throughput 671,619 1,216,397

We hope you enjoy these updates!  As always, feedback is very welcome and encouraged, and we look forward to hearing from you.  You can discuss the TPL Dataflow CTP in the TPL Dataflow forum on MSDN.

0 comments

Discussion is closed.

Feedback usabilla icon