返璞归真(并行):不要阻塞你的线程,让Async I/O为你工作

[原文发表地址] Back to (Parallel) Basics: Don't Block Your Threads, Make Async I/O Work For You

[原文发表时间] 2010-11-15 01:40 AM

 

Stephen Toub是我在微软最喜欢的人之一。我之前向他请教过问题,有时是为我自己,有时是代表你们,亲爱的读者,并且我总能得到见解深刻且合乎逻辑的答案。因为我极其信仰Jon Udell的“节约你的按键”哲学,我总是试图将有用的信息传递给社区,尤其是当信息是通过邮件发送的。记住,当你敲击键盘写一篇只发送给几个人的宏伟邮件时,你错失了外面成千上万的人。尽量少发邮件,多写博客。稍后我会更多讨论这个。

 

提示 :如果你对并行编程模式感兴趣,快去,不要犹豫,去下载免费的内容丰富的电子书,是的,你猜到了, 并行编程模式:理解和应用基于 .NET Framework 4的并行模式 是的,这个标题很长,但如果你以并行方式处理它就会感觉短了。说真的,它是免费的并且有一个C# 和 Visual Basic版本。它很出色。现在,如果你真的对这个这个话题感兴趣,就去把这本由Stephen Toub, Ade Miller, Colin Campbell, 和 Ralph Johnson编的书Parallel Programming with Microsoft .NET弄来。 此书HTML格式的完整版同时也被放到了这儿。

最近注意到了我朋Steve Smith的一篇博文,在文中他分享了一些简短的示例代码来“用C#异步验证一组URLs”因为我知道Steve不会介意我来深究这个问题,所以我就这么做了。我首先请教了在微软的Parallel Computing组工作的Stephen Toub。

 

Steve Smith想要证实一系列URLs的存在。这是基本同步代码:

private static bool RemoteFileExists(string url)

{

    try

    {

        var request = WebRequest.Create(url) as HttpWebRequest;

        request.Method = "HEAD";

        var response = request.GetResponse() as HttpWebResponse;

        return (response.StatusCode == HttpStatusCode.OK);

    }

    catch

    {

        return false;

    }

}

 

然后Steve把代码改为并行运行,他使用.NET 4的并行新特征,就像在四月份Stephen Toub在"Back to Parallel Basics”里帮助我解释的那样。

 

 

var linkList = GetLinks();

 

Action<int> updateLink = i =>

{

    UpdateLinkStatus(linkList[i]); //fetch URL and update its status in a shared list

};

Parallel.For(0, linkList.Count, updateLink);

使用Parallel.For是一个很不错的方法,可以把一些基本的自然的并行性引进到你的应用程序中去。

 

在并行性方面我不是专家,(我阅读过一个很好的白皮书)但是我请教过Stephen Toub是否这是解决这个问题最好的推荐的方法。Stephen当时在飞机上做出了回应,用(他的话)“电子邮件编译且测试过的”例子。征得了他的同意,在这篇博客帖子我引用他的回复,对我——也希望对你——有所启发。

来自Stephen:

首先, 好像作者是在提议使用并行loop来处理这个问题。这没有问题,当然也很简单,但是这种事你只有在客户端应用程序中才真正想要做,而不是在服务器应用程序中。目前的问题是,尽管很简单,但它阻碍了线程;对于一个客户端应用程序来说,有一些更多的被阻塞的线程通常不是什么大问题;但是对于一个服务器应用程序来说,假设你这么做是在对进来的ASP.NET 或 WCF服务请求作出反应,你就会每遇到一个请求就阻塞几个线程,这会极大地阻碍程序的延展性。当然,要获得快速启动和运行,且如果额外的几条线程不会引起麻烦,这也还是可以接受的方法。

假设你想要你快速及简易地“搞定”,并且阻塞几个线程也没问题,你就可以要么直接地使用一个并行循环,要么直接使用tasks,或者使用Stephen个人最爱:一个PLINQ查询。例如如果我有一个函数"bool ValidateUrl(string url);",我可以使用PLINQ一次最多处理至N:

bool [] results = (from url in urls.AsParallel() select ValidateUrl(url)).ToArray();

在这个例子中, PLINQ将会从ThreadPool中使用至多至N的线程,其中N默认为Environment.ProcessorCount,但是你可以在AsParallel()后添加.WithDegreeOfParallelism(N)并提供你自己的N值。

如果Steve是在一个控制台程序中做这件事情,(这很有可能,)那么就如Stephen所指出的那样,这没什么大不了的。你通常在客户端上是有线程可以占用的。然而在服务器这一端上,你通常想要尽量避免阻塞线程。

Stephen说,从可扩展性的角度来说一个更好的解决方案是利用异步I / O 当你在网络上调用的时候,当等待响应回来时我们没有理由来阻塞线程(除非为了方便)。不幸的是,在过去一直很难做这种异步操作的聚集。我们需要重写我们的ValidateUrl方法比如:

public void ValidateUrlAsync(string url, Action<string,bool> callback);

 

在这里该方法立即返回,并且之后通过所提供的回调函数来提醒给定的URL是否有效。然后我们就会把我们对此的使用变成下面的样子。注意System.Collections.Concurrent.ConcurrentQueue代表一个线程安全的、先进先出(FIFO)的集合;而CountdownEvent,表示一个同步基元,会在计数达到零时触发信号。

 

using(var ce = new CountdownEvent(urls.Length))

 

{

    var results = new ConcurrentQueue<Tuple<string,bool>>();

 

    Action callback = (url,valid) =>

    {

        results.Enqueue(Tuple.Create(url,valid));

        ce.Signal();

    };

 

    foreach(var url in urls) ValidateUrlAsync(url, callback);

 

    ce.Wait();

}

 

假设ValidateUrlAsync是使用异步写入,例如(你真的需要下述代码有更好的错误处理,但同样,这是由“电子邮件”编译的):

 

public void ValidateUrlAsync(string url, Action<string,bool> callback)

{

    var request = (HttpWebRequest)WebRequest.Create(url);

    try

    {

        request.BeginGetResponse(iar =>

        {

            HttpWebResponse response = null;

            try

            {

                response = (HttpWebResponse)request.EndGetResponse(iar);

                callback(url, response.StatusCode == HttpStatusCode.OK);

            }

            catch { callback(url, false); }

            finally { if (response != null) response.Close(); }

        }, null);

    }

    catch { callback(url, false); }

}

这个例子最终只会阻塞发送所有请求的主线程然后阻塞等待所有的响应,而不是每一个请求阻塞一个线程。只需一个小小的改变,我们同样可以使这个启动程序异步执行,例如:

public static void ValidateUrlsAsync(string [] urls, Action<IEnumerable<Tuple<string,bool>> callback)

{

    var ce = new CountdownEvent(urls.Length);

    var results = new ConcurrentQueue<Tuple<string,bool>>();

    Action callback = (url,valid) =>

    {

        results.Enqueue(Tuple.Create(url,valid));

        if (ce.Signal()) callback(results);

   };

   foreach(var url in urls) ValidateUrlAsync(url, callback);

}

 

当然,这真的是很复杂,比原先使用PLINQ的一行代码困难得多。

这是Tasks和新的Async CTP真正派上用场的地方。试想一下,原本我们要写:

void ValidateUrlAsync(string url, Action<bool> callback);

现在我们有:

Task<bool> ValidateUrlAsync(string url);

返回的Task<bool>更容易写,它代表异步操作的结果(无论是成功完成的情况还是异常情况)。

 

测试版注意 : 暂时无法将ASP.NET MVC 3与Async CTP同时安装。** 这是一个测试版冲突问题,它会被解决的,我确信。

 

如果我们有这样的操作,并且如果我们有一个Task.WhenAll方法,它执行任何数量的任务并返回一个代表所有这些的任务,那么我们可以很容易地等待所有的结果,例如

 

bool [] results = await Task.WhenAll(from url in urls select ValidateUrlAsync(url));

 

很好且很简单,完全异步,没有阻塞的线程,等等。

 

请注意 ,在Async CTP中,Task.WhenAll目前变成了TaskEx.WhenAll,因为既然它是一个带外的CTP,我们不能像我们想要的那样向Task上添加静态的WhenAll方法。

 

有了Async CTP及关键字await, 也更容易实现ValidateUrlAsync方法了,而且带着对异常处理的完整支持(在我之前的例子中我没有这么做,即如果事情失败了,它没有说明为什么)。

 

public async Task<bool> ValidateUrlAsync(string url)

{

    using(var response = (HttpWebResponse)await WebRequest.Create(url).GetResponseAsync())

        return response.StatusCode == HttpStatusCode.Ok;

}

 

即使没有Async CTP,仍可以用这个名字来实现ValidateUrlAsync。

 

注意我们用到了System.Threading.Tasks.TaskCompletionSource。在MSDN上:

在很多情况下,使Task (Of(TResult)) 能够代表一个外部的异步操作是很有用的。提供TaskCompletionSource (Of( TResult)) 就是为了这个目的。 它实现了一个(可以发放给消费者的)任务的创建,而这些消费者可以像他们使用其他成员一样使用该任务的成员。

public Task<bool> ValidateUrlAsync(string url)

{

    var tcs = new TaskCompletionSource<bool>();

    var request = (HttpWebRequest)WebRequest.Create(url);

    try

    {

        request.BeginGetResponse(iar =>

        {

            HttpWebResponse response = null;

            try

            {

                response = (HttpWebResponse)request.EndGetResponse(iar);

                tcs.SetResult(response.StatusCode == HttpStatusCode.OK);

            }

            catch(Exception exc) { tcs.SetException(exc); }

            finally { if (response != null) response.Close(); }

        }, null);

    }

    catch(Exception exc) { tcs.SetException(exc); }

    return tsc.Task;

 

}

有了这个方法,即使没有Async CTP,我们照样可以相对容易地使用现有的.NET 4的支持来处理这个问题:

 

Task.Factory.ContinueWhenAll(

    (from url in urls select ValidateUrlAsync(url)).ToArray(),

    completedTasks => { /* do some end task */ });

 

现在,仅仅使用.NET 4,我便得到了最好的效果。

 

非常感谢Stephen Toub。在.NET 4中有许多为Task创建、Threading, 及Parallelism的新的高、低层次的构建函数。然而最简单自然的解决办法往往是正确的,在.NET 4中我们必须与之打交道的那些部分(还有甚至更新的那些在Visual Studio 2010 Async CTP中添加的关键字‘await’和‘async’)会让您对您的多线程平行系统有非常精细的控制,而不需要写一大堆的代码。