Parallel Extensions and I/O

Danny Shih

In this post, we’ll investigate some ways that Parallel Extensions can be used to introduce parallelism and asynchrony to I/O scenarios.

Here’s a simple scenario.  I want to retrieve data from a number of web resources.

static string[] Resources = new string[]

{

    “http://www.microsoft.com”, “http://www.msdn.com”,

    “http://www.msn.com”, “http://www.bing.com”

};

 

Using the WebClient class, I might end up with the following.

var data = new List<byte[]>();

var wc = new WebClient();

 

foreach (string resource in Resources)

{

    data.Add(wc.DownloadData(resource));

}

 

// Use the data.

 

However, these days, downloading data from the web usually utilizes only a small fraction of my available bandwidth.  So there are potential performance gains here, and with TPL’s parallel ForEach loop, they are easily had.

var data = new ConcurrentBag<byte[]>();

 

Parallel.ForEach(Resources, resource =>

{

    data.Add((new WebClient()).DownloadData(resource));

});

 

// Use the data.

 

Note that WebClient instances do not support multiple pending asynchronous operations (and the class is not thread-safe), so I need a separate instance for each operation.  Also, since the normal BCL collections (List<T>, etc.) are not thread-safe, I need something like ConcurrentBag<T> to store the results.  Of course, storing all the data in a collection assumes the scenario requires that all retrieval operations complete before processing.  If this was not the case, I could start processing each data chunk right after obtaining it right in the loop, exploiting more parallelism.  However, for the purposes of this investigation, I wanted to determine the possible performance gains in the absence of CPU-intensive work.

As it turns out, the above often yields linear speedup against sequential, with some variation due to the inconsistent nature of web site response times.  And it was pretty straightforward.  However, things would have been even easier had I started out with a “LINQ” frame of mind.  First, I can convert my original sequential code to a LINQ query.  Then, I can turn it into PLINQ using the AsParallel method and use WithDegreeOfParallelism to control the number of concurrent retrievals.

var data =

    from resource in Resources

        .AsParallel()

        .WithDegreeOfParallelism(numConcurrentRetrievals)

    select (new WebClient()).DownloadData(resource);

 

// Sometime later…

foreach (byte[] result in data) { }

 

(As an aside, it’s worth noting that WithDegreeOfParallelism causes PLINQ to use exactly numConcurrentRetrivals Tasks.  This differs from the MaxDegreeOfParallelism option that I could have used with my previous Parallel.ForEach code, because that option sets the maximum; the actual number of threads still depends on the ThreadPool’s thread-adjusting logic.)

This code offers enhanced readability and makes storing the data easier.  In addition, I can continue on the main thread, as PLINQ queries do not execute until the data they represent is accessed – that is, when MoveNext is called on the relevant enumerator.  However, in this particular case, I don’t want to delay my query’s execution until I need the data; I actually want to execute my query while continuing on the main thread.  To do so, I can wrap my query in a Task and force its immediate execution using ToArray.

var t = Task.Factory.StartNew(() =>

{

    return

        from resource in Resources

            .AsParallel()

            .WithDegreeOfParallelism(numConcurrentRetrievals)

        select (new WebClient()).DownloadData(resource).ToArray();

});

 

// Sometime later…

foreach (byte[] result in t.Result) { }

 

// OR, use a continuation

t.ContinueWith(dataTask =>

{

    foreach (byte[] result in dataTask.Result) { }

});

 

Now, I’ve got asynchrony, and I still get similar speedup.  However, there’s still something about this code that is not ideal.  The work (sending off download requests and blocking) requires almost no CPU, but it is being done by ThreadPool threads since I’m using the default scheduler.  Ideally, threads should only be used for CPU-bound work (when there’s actually work to do).  Of course, this probably won’t matter much for most typical client applications, but in scenarios where resources are tight, it could be a serious issue.  Therefore, it’s worth investigating how we might reduce the number of blocked threads, perhaps by not using threads at all where possible.

To achieve this, I’ll be using ideas from a previous post: Tasks and the Event-based Asynchronous Pattern.  That article explained how to create a Task<TResult> from any type that implements the EAP, and it presented an extension method for WebClient (available along with many others in the ParallelExtensionsExtras):

public static Task<byte[]> DownloadDataTask(

    this WebClient webClient, Uri address);

 

The key point is that this method produces a Task<TResult> by integrating WebClient’s EAP implementation with a TaskCompletionSource<TResult>, and I can use it to rewrite my scenario.

var tasks = new Queue<Task<byte[]>>();

 

foreach (string resource in Resources)

{

    WebClient wc = new WebClient();

    tasks.Enqueue(wc.DownloadDataTask(new Uri(resource)));

}

 

// Sometime later…

while (tasks.Count > 0)

{

    byte[] result = tasks.Dequeue().Result;

}

 

// OR, use a continuation

Task<byte[]>.Factory.ContinueWhenAll(tasks.ToArray(), dataTasks =>

{

    foreach (var dataTask in dataTasks)

    {

        byte[] result = dataTask.Result;

    }

});

 

With this, I’ve got a solution that uses parallelism for speed-up, is asynchronous, and does not burn more threads than necessary!

To recap, in this post, we considered a typical I/O scenario.  First, we saw how easy it was to arrive at solutions that are better than the sequential one.  Then, we delved deeper to discover a more complex solution (integrating EAP with Tasks) that offers even more benefits.

PFX-and-IO.zip

0 comments

Discussion is closed.

Feedback usabilla icon