Implementing a simple ForEachAsync

Stephen Toub - MSFT

Jon Skeet recently asked me how I might go about implementing the following “asynchronous ForEach” behavior:

  • For each element in an enumerable, run a function that returns a Task<TResult> to represent the completion of processing that element. All of these functions may run asynchronously concurrently.
  • As each task completes, run a second processing action over the results.  All of these actions must be run sequentially, but order doesn’t matter.

Given what we now know about SemaphoreSlim from my previous post, here’s one way to achieve this:

public static Task ForEachAsync<TSource, TResult>(
    this IEnumerable<TSource> source,
    Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor)
{
    var oneAtATime = new SemaphoreSlim(initialCount:1, maxCount:1);
    return Task.WhenAll(
        from item in source
        select ProcessAsync(item, taskSelector, resultProcessor, oneAtATime));
}

private static async Task ProcessAsync<TSource, TResult>(
    TSource item,
    Func<TSource, Task<TResult>> taskSelector, Action<TSource, TResult> resultProcessor,
    SemaphoreSlim oneAtATime)
{
    TResult result = await taskSelector(item);
    await oneAtATime.WaitAsync();
    try { resultProcessor(item, result); }
    finally { oneAtATime.Release(); }
}

We instantiate a semaphore initialized with a count of 1, and we use this to throttle the follow-up actions to ensure that only one at a time runs.  We then call the ProcessAsync method for each element, passing in the element, the semaphore, and the delegates to invoke.  Inside ProcessAsync, we first run the function and await its completion.  Once it’s complete, we acquire the semaphore to ensure that we run the result processing function sequentially with regards to all the other processing function.

Thanks for the question/suggestion, Jon!

0 comments

Discussion is closed.

Feedback usabilla icon