Parallel process execution with partial completion of processes

 

On a recent project I came across an interesting parallel processing problem that I could not find a well defined pattern for. The problem was I needed to execute some long running processes synchronously and in parallel, but stop processing when a certain condition was met. Each individual process would normally take seconds but others could take longer. Based on the results coming back, it may be possible to cancel the longer running processes, as the results would not be needed.

Hence, I needed a pattern for executing a group of processes in parallel, but as results are returned make a determination as to whether currently executing or queued processes need to be executed.

The TPL (task Parallel Library) in .Net 4.0 made this a simple programming problem, for which I wrote a wrapper class; called PartialParallel.

The premise of the PartialParallel class is that given a collection of operations to perform in parallel, each successful completion calls a defined function that collates the results. This result collation function then informs the PartialParallel class as to whether it should continue processing further operations.

Without much further ado, as there is not much code needed to write such a class, here is a stripped down version of the PartialParallel class that I have put together to handle this problem.

 namespace MSDN.Threading
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Collections.ObjectModel;
    using System.Linq;
    using System.Threading.Tasks;
 
    public class OperationInputDefinition<TInput, TResult> : OperationInputDefinitionBase<TResult>
    {
        /// <summary>
        /// Gets or sets the operation to execute.
        /// </summary>
        public Func<TInput, TResult> Operation { get; set; }
    
        /// <summary>
        /// Gets or sets the operation parameter.
        /// </summary>
        public TInput Parameter { get; set; }
    
        /// <summary>
        /// Executes the operation.
        /// </summary>
        /// <returns>Output result.</returns>
        public override TResult Execute()
        {
            return this.Operation(this.Parameter);
        }
    }
 
    /// <summary>
    /// Class to perform parallel operations until a condition is met.
    /// </summary>
    /// <typeparam name="T">Result type.</typeparam>
    public class PartialParallel<T>
    {
        /// <summary>
        /// Operations to be executed.
        /// </summary>
        private ICollection<OperationInputDefinitionBase<T>> inputs;
 
        /// <summary>
        /// Degree of parallelism.
        /// </summary>
        private int degreeParallelism;
 
        /// <summary>
        /// Function to be called when a result is returned.
        /// </summary>
        private Func<T, bool> operationCompleted;
 
        /// <summary>
        /// Sync object for return calling.
        /// </summary>
        private object syncObject = new object();
 
        /// <summary>
        /// Initializes a new instance of the PartialParallel class.
        /// </summary>
        /// <param name="operationCompleted">Result returning function.</param>
        public PartialParallel(Func<T, bool> operationCompleted)
        {
            this.operationCompleted = operationCompleted;
            this.degreeParallelism = 4;
            this.inputs = new Collection<OperationInputDefinitionBase<T>>();
        }
 
        /// <summary>
        /// Initializes a new instance of the PartialParallel class.
        /// </summary>
        /// <param name="operationCompleted">Result returning function.</param>
        /// <param name="degreeParallel">Required degree of parallelism.</param>
        public PartialParallel(Func<T, bool> operationCompleted, int degreeParallel)
            : this(operationCompleted)
        {
            this.degreeParallelism = degreeParallel;
        }
 
        /// <summary>
        /// Adds a single operation into the stack.
        /// </summary>
        /// <param name="input">Operation to add.</param>
        public void AddOperation(OperationInputDefinitionBase<T> input)
        {
            this.inputDefinitions.Add(input);
        }
        
        /// <summary>
        /// Adds a collection of operations into the stack.
        /// </summary>
        /// <param name="inputs">Operations to add.</param>
        public void AddOperations(ICollection<OperationInputDefinitionBase<T>> inputs)
        {
            foreach (OperationInputDefinitionBase<T> input in inputs)
            {
                this.inputDefinitions.Add(input);
            }
        }
 
        /// <summary>
        /// Execution the operations.
        /// </summary>
        public void Execute()
        {
            this.ExecuteParallel();
        }
 
        /// <summary>
        /// Processes the inputs in parallel.
        /// </summary>
        private void ExecuteParallel()
        {
            ParallelOptions options = new ParallelOptions() { MaxDegreeOfParallelism = this.degreeParallelism };
            ConcurrentQueue<Exception> exceptions = new ConcurrentQueue<Exception>();
 
            // perform parallel processing
            Parallel.ForEach(
                this.inputs,
                options,
                (input, loopState) =>
            {
                try
                {
                    // previous loop has been cancelled do not execute
                    if (!loopState.ShouldExitCurrentIteration)
                    {
                        T result = input.Execute();
 
                        // after the operation sync the function returning call
                        lock (syncObject)
                        {
                            // only pass back results if not cancels already calle
                            if (!loopState.ShouldExitCurrentIteration && !this.operationCompleted(result))
                            {
                                loopState.Stop();
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    exceptions.Enqueue(ex);
                }
            });
 
            // look to see exceptions were found
            if (exceptions.Count > 0)
            {
                throw new AggregateException("Processing of the inputs has failed.", exceptions);
            }
        }
    }
}

The completed and extended code can also be found here on the MSDN code gallery.

The basic usage premise is as follows:

  • Create an instance of the PartialParallel class defining the type to be returned from the operations to be executed
  • When creating the class instance pass in a Func defining the function to be called after each operation has been executed (the results collation function); in addition to the required degree of parallelism for the processing
  • Next add the operations to be executed in parallel and who's results will be collated by the defined Func
  • Lastly perform the Execute, remembering to deal with exceptions

Lets dig into these in a little more details. Creating the class instance is easily done:

 this.results = new Collection<string>();
PartialParallel<string> parallelEngine = new PartialParallel<string>(
    (value) =>
    {
        this.results.Add(value);
        return this.results.Count >= 3 ? false : true;
    },
    5);

 

This defines a PartialParallel class that performs an operation that takes in a string and returns a string. Of course as the PartialParallel class uses generics any type can be used.

The class variable this.results is used to collate the result coming back from the operation. Within the PartialParallel class, calls back to the result collation function are wrapped within a critical section. Thus, the caller does not have to worry about thread synchronization.

The variable being inspected to see if the result collating function is to be called is the ParallelLoopState.ShouldExitCurrentIteration. Only if this property is false, within the critical section, is the result collating function called:

 if (!loopState.ShouldExitCurrentIteration)
{
    T result = input.Execute();
    lock (syncObject)
    {
        if (!loopState.ShouldExitCurrentIteration
        && !this.operationCompleted(result))
        {
            loopState.Stop();
        }
    }
}

For testing purposes I have defined the operations to be called as:

 

 Collection<string>  inputs = new Collection<string>()
    {
        "lraC",
        "eiraM",
        "nalyD",
        "eivE",
        "annA",
        "niwraD",
        "lraE"
    };
foreach (string input in inputs)
{
    parallelEngine.AddOperation(new OperationState<string, string>()
    {
        Parameter = input, Operation = this.TestLocateUserName
    });
}

In this case the result collating function merely adds the result into a collection and tells the PartialParallel class to continue until it gathers the first three responses.

The operation being called merely takes in a string, reverses it, and passes it back. To perform these operations, until the loop condition is met, one merely has to call Execute() function:

 try
{
    parallelEngine.Execute();
}
catch (Exception ex)
{
    Debug.WriteLine(string.Format("Exception processing: {0}", ex.Message));
}

The beauty of wrapping the parallel execution in this manner is that within a few lines of code a set of operations can be executed in parallel, until an exit criteria has been met. Also, exception handling is simple as it involves a simple try…catch block.

In this instance the operation to be executed takes a single parameter. However in the download code a class definition is provided that allows one to call an operation with two parameters:

 public class OperationInputDefinition<TInput1, TInput2, TResult>
    : OperationInputDefinitionBase<TResult>

This can easily be extended further for 3, 4 or more parameters.

As mentioned it is the usage of the .Net 4.0 TPL, more specifically the Parallel.ForEach method, that makes writing this code simple. Some good examples of using the Parallel.ForEach method can be located on MSDN here.

MSDN Code Sample

Before finishing the post a few words are warranted about the code that can be downloaded from the MSDN code library.

Firstly the code contains a version of the code that defines the parallel operation in a single static method. This allows the code to be called in a similar fashion to the Parallel.ForEach method. This is best demonstrated using a test method:

 [TestMethod]
public void ExecuteParallelIntStaticTest()
{            
    Collection<OperationInputDefinitionBase<int>> inputDefinitions = new Collection<OperationInputDefinitionBase<int>>();
    for (int idx = 0; idx < 6; ++idx)
    {
        inputDefinitions.Add(new OperationInputDefinition<int, int>() { Parameter = 2, Operation = (value) => { Thread.Sleep(2000); return value * value; } });
    }
 
    int summation = 0;
    ParallelEx.ForEachPartial<int>(
        inputDefinitions,
        (value) => { summation += value; return summation > 7 ? false : true; });
 
    Assert.AreEqual<int>(8, summation, "Only 2 executions should have been executed.");
}

The code also contains a separate version of the class called PartialParallelState. This class is the same as the PartialParallel class except that the return type of result collation function is a class that contains the ContinueProcessing boolean along with a state object. The operation being called within the loop takes as a parameter this additional state object.

This state object allows one to provide feedback into each new operation being performed within the loop. This could be useful if one is searching for a maximum number of results and each iteration tries to find the maximum required entries. After the first iteration the number to be found will be reduced, so the state object can be adjusted to reflect the reduction in the number of results to be found. However, one must remember that all the first iterations will all start with the same state.

Finally, this code also contains some simple unit tests.

Hopefully this code is useful.

Written by Carl Nolan