Use Workflow to Invoke Web Services in Parallel

The very first time I saw Windows Workflow Foundation (WF), I slapped together some InvokeWebServiceActivities onto a Workflow design surface and did a slight little smile at how easy it was to execute web services in a sequential fashion. Next, I started tinkering with stuff like the ParallelActivity, trying to execute multiple web services at the same time. That didn't work out so well for me.

The ParallelActivity in WF is not truly parallel, it only uses 1 thread for processing, which turns out to be a good thing because it greatly simplifies WF programming (and makes what I am about to show you SO much easier to code than if it were really multiple threads). Go back and look at the link I just referenced (again, here) to understand what's happening with the ParallelActivity. The right side of the branch will not execute until the left side of the branch yields the thread. This can be demonstrated a little more easily with some demo code.

Let's start out with a HORRIBLE, NASTY, DO-NOT-DO-THIS-IN-PRODUCTION web service that simply sleeps the current thread for 5 seconds.

 using System;
using System.Web.Services;

[WebService(Namespace = "urn:workflow:demo:asyncwebservices")]
public class Service
{
    [WebMethod]
    public string HelloWorld() {
        System.Threading.Thread.Sleep(5000);
        return "Hello World";
    }    
}

We are also going to create a very basic activity called "WriteLine" that exposes a DependencyProperty so that we can bind the output from our web service to the property.

 using System;
using System.Workflow.Activities;
using System.ComponentModel;
using System.Workflow.ComponentModel;

namespace WorkflowConsoleApplication1
{
    public partial class WriteLine: SequenceActivity
    {
        public WriteLine()
        {
            InitializeComponent();
        }

        public static DependencyProperty TextProperty = System.Workflow.ComponentModel.DependencyProperty.Register("Text", typeof(string), typeof(WriteLine));

        [Description("The text that is written to the Console")]
        [Category("Activity")]
        [Browsable(true)]
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)]
        public string Text
        {
            get
            {
                return ((string)(base.GetValue(WriteLine.TextProperty)));
            }
            set
            {
                base.SetValue(WriteLine.TextProperty, value);
            }
        }


        protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
        {
            Console.WriteLine(Text);
            return ActivityExecutionStatus.Closed;
        }
    }
}

Consider what would happen if we called this web service in the left branch of a ParallelActivity, and we add a DelayActivity to the right side with a TimeoutDuration of 3 seconds (including our WriteLine activities for the output of our HelloWorld web service):

 <SequentialWorkflowActivity
    x:Class="WorkflowConsoleApplication1.Workflow1"
    x:Name="Workflow1"
    xmlns:ns0="clr-namespace:WorkflowConsoleApplication1"
    xmlns:x="https://schemas.microsoft.com/winfx/2006/xaml"
    xmlns="https://schemas.microsoft.com/winfx/2006/xaml/workflow">
                
  <ParallelActivity
    x:Name="parallelActivity1">
                
    <SequenceActivity
    x:Name="sequenceActivity1">
                
      <InvokeWebServiceActivity
    ProxyClass="{x:Type p8:Service}"
    MethodName="HelloWorld"
    x:Name="invokeWebServiceActivity1"
    xmlns:p8="clr-namespace:Workflow.AsyncActivities.localhost;Assembly=Workflow.AsyncActivities, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null">                
        <InvokeWebServiceActivity.ParameterBindings>                
          <WorkflowParameterBinding ParameterName="(ReturnValue)">                
            <WorkflowParameterBinding.Value>                
              <ActivityBind Name="writeLine1"
        Path="Text" />                
            </WorkflowParameterBinding.Value>                
          </WorkflowParameterBinding>                
        </InvokeWebServiceActivity.ParameterBindings>                
      </InvokeWebServiceActivity>                
      <ns0:WriteLine Text="{x:Null}"
    x:Name="writeLine1" />                
    </SequenceActivity>                
    <SequenceActivity
    x:Name="sequenceActivity2">                
      <DelayActivity
    TimeoutDuration="00:00:03"
    x:Name="delayActivity1" />                
      <ns0:WriteLine Text="Hello from delay"
    x:Name="writeLine2" />                
    </SequenceActivity>                
  </ParallelActivity>                
</SequentialWorkflowActivity>

The output you will see is "Hello from web service" (from the web service output) and then "Hello from delay" (from the WriteLine that executes as a child of sequenceActivity2, just after the DelayActivity).

How long will the operation take? Yep… 8 seconds, because the web service call on the left side of the parallel activity blocks the current thread and does not yield, the right side of the ParallelActivity must wait for the left side to yield before it can start. The thread will not yield until the web service call is finished because it is executing synchronously.

Yesterday, I showed how to implement an activity that makes asynchronous FtpWebRequest calls. I am going to use the same approach to solve this problem so that both the web service call and the delay activity are fired in parallel. The total time for our call will reduce from 8 seconds to 5 seconds, because the DelayActivity will not have to wait on the synchronous web service call to complete: the asynchronous call will allow the thread to yield. The output wil be reversed as well: instead of "Hello from web service, Hello from delay" you will instead see "Hello from delay, Hello from web service" because the delay has a TimeoutDuration (3 seconds) that is less than the amount of time that we sleep in the web service call (5 seconds).

We start by creating a WorkflowActivityLibrary and adding a Web Reference to our ASMX web method. This will generate a proxy that also adds some helper methods that make asynchronous web services simple to work with. The generated file that contains our proxy class will also contain a class called HelloWorldCompletedEventArgs. We'll leverage that generated code within our activity, creating a property where we can store the result from our web service call so that other activities can bind to the result.

 using System;
using System.Reflection;
using System.Globalization;
using System.Runtime.Serialization.Formatters.Binary;
using System.IO;
using System.Runtime.Serialization;
using System.ComponentModel;
using System.Workflow.ComponentModel;
using System.Workflow.Runtime;
using System.Workflow.Activities;


namespace Workflow.AsyncActivities
{
    public partial class HelloWorldAsyncActivity: SequenceActivity
    {
        public HelloWorldAsyncActivity()
        {
            InitializeComponent();
        }
        public static DependencyProperty UrlProperty = System.Workflow.ComponentModel.DependencyProperty.Register("Url", typeof(string), typeof(HelloWorldAsyncActivity));

        [Description("The URL to the web service")]
        [Category("Activity")]
        [Browsable(true)]
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)]
        public string Url
        {
            get
            {
                return ((string)(base.GetValue(HelloWorldAsyncActivity.UrlProperty)));
            }
            set
            {
                base.SetValue(HelloWorldAsyncActivity.UrlProperty, value);
            }
        }

        public static DependencyProperty ResultProperty = System.Workflow.ComponentModel.DependencyProperty.Register("Result", typeof(string), typeof(HelloWorldAsyncActivity));

        [Description("The result of the HelloWorld web service call")]
        [Category("Activity")]
        [Browsable(true)]
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)]
        public string Result
        {
            get
            {
                return ((string)(base.GetValue(HelloWorldAsyncActivity.ResultProperty)));
            }
            set
            {
                base.SetValue(HelloWorldAsyncActivity.ResultProperty, value);
            }
        }

The next step is to override the Execute method and return ActivityExecutionStatus.Executing. This signifies to the WorkflowRuntime that we are not yet done processing, we still have work that needs to be completed.

         protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
        {
            WorkflowQueuingService qservice = executionContext.GetService<WorkflowQueuingService>();
            WorkflowQueue q = qservice.GetWorkflowQueue(this.Name);
            q.QueueItemAvailable += this.ContinueAt;

            HelloWorldAsyncService s = executionContext.GetService<HelloWorldAsyncService>();
            HelloWorldState state = new HelloWorldState();
            state.Url = this.Url;
            state.WorkflowInstanceID = this.WorkflowInstanceId;
            state.WorkflowQueueName = this.Name;

            s.HelloWorldAsync(state);

            return ActivityExecutionStatus.Executing;
        }

What's going on inside the Execute method is very straightforward. We specify the method that is to be called when a workflow queue item arrives on a particular queue. Since our activity can't seem to execute the asynchronous call without blocking the thread, we can leverage a custom service that will do that on our behalf (we will show that service a little later in this post). We also need to pass some data to the service so that it knows how to get back to this particular activity instance, which is done by using our data transfer object called HelloWorldState… it simply holds some data that we pass between method calls. Our activity then asks our service to do some work, and returns a status of Executing back to the WorkflowRuntime.

In our override to the Execute method, we specified a method that is to be called when a queue item becomes available. I use the name ContinueAt, borrowing the convention from Darma Shukla's excellent book, Essential Windows Worfklow Foundation. Our ContinueAt method simply dequeues the result of our web service call and sets our dependency property "Result". It also has the essential task of asking the WorkflowRuntime to close the activity, signaling that there is no more work left for this activity to perform.

         void ContinueAt(object sender, QueueEventArgs e)
        {
            ActivityExecutionContext context = sender as ActivityExecutionContext;
            WorkflowQueuingService qservice = context.GetService<WorkflowQueuingService>();
            WorkflowQueue q = qservice.GetWorkflowQueue(this.Name);
            HelloWorldState state = (HelloWorldState)q.Dequeue();
            this.Result = state.CompletedArgs.Result;
            context.CloseActivity();
        }

Since we are going to work with a workflow queue, it would be wise to set that up and tear it down for our activity. We shouldn't do that in the constructor or finalizer for the class, since our class could be constructed and finalized many times. WF provides an Initialize and Uninitialize pair of methods that we perform setup and teardown tasks with.

         protected override void Initialize(IServiceProvider provider)
        {
            HelloWorldAsyncService s = (HelloWorldAsyncService)provider.GetService(typeof(HelloWorldAsyncService));
            if (null == s)
            {
                throw new InvalidOperationException("The HelloWorldAsyncService has not been added to the workflow");
            }

            WorkflowQueuingService qservice = (WorkflowQueuingService)provider.GetService(typeof(WorkflowQueuingService));
            if (!qservice.Exists(this.Name))
            {
                WorkflowQueue q = qservice.CreateWorkflowQueue(this.Name, true);
            }

        }

        protected override void Uninitialize(IServiceProvider provider)
        {
            WorkflowQueuingService qservice = (WorkflowQueuingService)provider.GetService(typeof(WorkflowQueuingService));
            if (qservice.Exists(this.Name))
            {
                qservice.DeleteWorkflowQueue(this.Name);
            }
        }
    }
}

That's our activity in its entirety. The next step is to look at our helper service that actually does the async web service calls. Because ASP.NET 2.0 makes the ASMX web service stuff so easy, there's very little code here.

 using System;
using Workflow.AsyncActivities.localhost;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;

namespace Workflow.AsyncActivities
{
    public class HelloWorldAsyncService : WorkflowRuntimeService
    {
        public void HelloWorldAsync(HelloWorldState state)
        {
            localhost.Service s = new localhost.Service();
            s.Url = state.Url;
            state.Proxy = s;
            s.HelloWorldCompleted += new HelloWorldCompletedEventHandler(HelloWorldCompleted);
            s.HelloWorldAsync(state);
            
        }

        void HelloWorldCompleted(object sender, HelloWorldCompletedEventArgs e)
        {
            HelloWorldState state = e.UserState as HelloWorldState;
            try
            {
                WorkflowInstance instance = this.Runtime.GetWorkflow(state.WorkflowInstanceID);
                state.CompletedArgs = e;
                instance.EnqueueItem(state.WorkflowQueueName, state, null, null);
            }
            catch (Exception oops)
            {
                Console.WriteLine(oops.Message);
            }
        }

    }
}

I highlighted 2 parts of this service that I think are worth mentioning. The first is the fact that we derive from WorkflowRuntimeService. We didn't have to do that, there's no requirement that workflow services inherit from this type. The reason that I used it is because that type exposes a member "Runtime" that makes it very easy to get back to the WorkflowRuntime, where we obtain a reference to the WorkflowInstance where the web service call originated from. The second part to highlight is that the service needs to notify the activity that the work is completed. The way we do that is to enqueue a message on a specified queue. If you go back to our Activity's Execute override method, we specified that our ContinueAt method should be called whenever a queue message is received. This is how the activity knows where to resume processing!

The final bit of plumbing is our data transfer object, the HelloWorldState class. It is pretty uninteresting, we simply hold references to some data. I took some shortcuts by adding references to the generated proxy class as well as the generated HelloWorldCompletedArgs class just to make the sample more concise and easier to understand.

 using System;
using System.Workflow.Runtime.Hosting;

namespace Workflow.AsyncActivities
{
    public class HelloWorldState : WorkflowRuntimeService
    {

        private localhost.Service  _proxy;

        public localhost.Service  Proxy
        {
            get { return _proxy; }
            set { _proxy = value; }
        }
    
        private string _url;

        public string Url
        {
            get { return _url; }
            set { _url = value; }
        }

        private object _userState;

        public object UserState
        {
            get { return _userState; }
            set { _userState = value; }
        }

        private localhost.HelloWorldCompletedEventArgs _completedArgs;

        public localhost.HelloWorldCompletedEventArgs CompletedArgs
        {
            get { return _completedArgs; }
            set { _completedArgs = value; }
        }

        private string _workflowQueueName;

        public string WorkflowQueueName
        {
            get { return _workflowQueueName; }
            set { _workflowQueueName = value; }
        }


        private Guid _workflowInstanceID;

        public Guid WorkflowInstanceID
        {
            get { return _workflowInstanceID; }
            set { _workflowInstanceID = value; }
        }    
    }
}

OK, now that the plumbing part is over with, let's see how easy it is to now execute multiple web service calls in parallel. Our workflow will look pretty much like what we saw before, only this time we replace the InvokeWebServiceActivity with our shiny new HelloWorldAsyncActivity.

 <SequentialWorkflowActivity
    x:Class="WorkflowConsoleApplication1.Workflow1"
    x:Name="Workflow1"
    xmlns:ns0="clr-namespace:Workflow.AsyncActivities;Assembly=Workflow.AsyncActivities, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"
    xmlns:ns1="clr-namespace:WorkflowConsoleApplication1"
    xmlns:x="https://schemas.microsoft.com/winfx/2006/xaml"
    xmlns="https://schemas.microsoft.com/winfx/2006/xaml/workflow">                
  <ParallelActivity
    x:Name="parallelActivity2">            
    <SequenceActivity
    x:Name="sequenceActivity3">                
      <ns0:HelloWorldAsyncActivity
    x:Name="HelloWorldAsync"
    Url="https://localhost:56700/WebSite4/Service.asmx"
    Result="{ActivityBind writeLine3,Path=Text}" />                
      <ns1:WriteLine
    Text="{x:Null}"
    x:Name="writeLine3" />                
    </SequenceActivity>                
    <SequenceActivity
    x:Name="sequenceActivity4">                
      <DelayActivity
    TimeoutDuration="00:00:03"
    x:Name="delayActivity2" />                
      <ns1:WriteLine
    Text="Hello from delay"
    x:Name="writeLine4" />                
    </SequenceActivity>                
  </ParallelActivity>                
</SequentialWorkflowActivity>                

Oh yeah, one last detail… the Workflow host. We need a host program that will create and manage the WorkflowRuntime. Like all good blog demos, I will use a Console app.

 #region Using directives

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using Workflow.AsyncActivities;

#endregion

namespace WorkflowConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            using(WorkflowRuntime workflowRuntime = new WorkflowRuntime())
            {

                SqlWorkflowPersistenceService persistence = new SqlWorkflowPersistenceService("Data Source=(local);Initial Catalog=SqlPersistenceService;Integrated Security=True");
                workflowRuntime.AddService(persistence);

                HelloWorldAsyncService helloworld = newHelloWorldAsyncService();
                workflowRuntime.AddService(helloworld);

                AutoResetEvent waitHandle = new AutoResetEvent(false);                
                workflowRuntime.WorkflowCompleted += delegate(object sender, WorkflowCompletedEventArgs e) {waitHandle.Set();};
                workflowRuntime.WorkflowTerminated += delegate(object sender, WorkflowTerminatedEventArgs e)
                {
                    Console.WriteLine(e.Exception.Message);
                    waitHandle.Set();
                };

                WorkflowInstance instance = workflowRuntime.CreateWorkflow(typeof(WorkflowConsoleApplication1.Workflow1));
                instance.Start();

                waitHandle.WaitOne();
            }
        }
    }
}

Again, the interesting part is highlighted. We add an instance of our custom service to the workflow runtime. And, just as promised, the total time to process is 5 seconds instead of 8, and the output looks like:

Hello from delay

Hello from web service

This may look like a bit of code, but there's really not much to it. Here are the steps to follow for creating your own activities.

  1. Create the Activity
    1. Override Initialize and set up any workflow queues that you might need
    2. Override Execute and provide a callback function that is called when a queue item arrives. Return ActivityExecutionStatus.Executing.
    3. Override Uninitialize and tear down any resources that you created in the Initialize method.
  2. Create the service
    1. Do some work.
    2. Enqueue a message when you are done with the work.

That's it! Windows Workflow Foundation makes asynchronous programming very simple once you recognize the pattern of using a custom service.

Obviously, there is room for improvement here. It would be great if the InvokeWebServiceActivity would support asynchronous execution by default. It is feasible to create a generalized activity that could perform any asynchronous web service call, maybe it would be feasible to write a generalized activity that could support any IAsyncResult invocation. That is of course an exercise left to the reader.

[Update: Fixed lots of the code formatting issues]

Workflow.AsyncWebServices.zip