Asynchronous Workflow Activities: Implemented Asynchronous WebRequests

(note: sample code is attached to this post). 

As I stated before, Darma's excellent book finally gave me the a-ha moment for Windows Workflow Foundation beyond building simple Hello, World demos and making activities pretty. I finally get how to do more useful things with WF, and finally see why you should think twice before writing another program that doesn't use WF (not saying that every program requires WF, not by any stretch… but many of the programs that exist in the wild could seriously benefit from WF, enough that you should take a second look).

A task that I have wondered about is how you would perform work asynchronously. For instance, you want to utilize the IAsyncResult pattern when possible for server side applications, but how would you achieve that? Turns out that Windows Workflow Foundation makes this very straightforward. To demonstrate this, I am going to pirate some code from MSDN and repurpose it into an Activity. We will pirate some of the code from the MSDN documentation on using FtpWebRequest.

I posted to an internal discussion list asking how to do this, and Jon Flanders pointed out that he had written a sample that used the pattern I needed to follow. Since I hadn't yet grokked services, queues, and the real purpose of workflow as a runtime that spans multiple process and appdomain boundaries, I looked at it, and my eyes glazed over. Once I went back and took another peek, it made perfect sense. I think this is the single most important thing to grok for Windows Workflow Foundation: the concept of long-running workflow activities.

I am going to follow the same pattern Jon used, and we will pirate code from MSDN to fit within that pattern.

The really cool thing about this is that you will see how to take existing code and apply it to workflow activities, gaining new capabilities for your existing code. Developers will be able to take your activities and simply drag-and-drop their way through some fairly complex processing, becoming incredibly productive while you get to focus on the really cool plumbing underneath. You get to focus on writing reusable activities that developers can use incredibly quickly in their applications, and you get to focus on the more interesting deeper code. You get all the kudos for being the rock star who saves the company money by increasing developer productivity!

As with any WF Activity, we start by creating a class that will represent our Activity. Our activity will simply get an FTP file, we'll call it GetFileActivity.

 using System;
using System.Net;
using System.Workflow.Runtime;
using System.Workflow.Activities;
using System.Workflow.ComponentModel;
using System.ComponentModel;

namespace Workflow.Ftp 
{
    public partial class GetFileActivity: SequenceActivity 
    {
        public GetFileActivity() 
        { 
            InitializeComponent(); 
        }

We will create two properties for our Activity, one that allows the user to specify the URI to the file being downloaded, and another property that specifies where to write the file to.

         public static DependencyProperty UrlProperty = System.Workflow.ComponentModel.DependencyProperty.Register("Url", typeof(Uri), typeof(GetFileActivity));
        [Description("The full URL to the file being requested (ie: ftp://localhost/somefolder/somefile.txt)")]
        [Category("Behavior")]
        [Browsable(true)]
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)]
        public Uri Url
        {  
            get { return ((Uri)(base.GetValue(GetFileActivity.UrlProperty))); }
            set{ base.SetValue(GetFileActivity.UrlProperty, value);}
        }
        public static DependencyProperty LocalFilePathProperty = System.Workflow.ComponentModel.DependencyProperty.Register("LocalFilePath", typeof(string), typeof(GetFileActivity));
        [Description("The local file path where the downloaded file will be stored")]
        [Category("Behavior")]
        [Browsable(true)]
        [DesignerSerializationVisibility(DesignerSerializationVisibility.Visible)]
        public string LocalFilePath
        {
            get { return ((string)(base.GetValue(GetFileActivity.LocalFilePathProperty))); }
            set { base.SetValue(GetFileActivity.LocalFilePathProperty, value); }
        }

Now, we need to override the Execute method of the activity and return an ActivityExecutionStatus enumeration member. Here's where the heart of the problem lies: how do we do something asynchronously within an activity? We could use an AutoResetEvent and signal when the work is completed. But WF provides a better way: we can leverage a custom workflow service that will signal our activity when it has completed its work. The activity will simply pass some work to its underlying service and tell the workflow runtime that it is not yet done with its work. The workflow runtime, then, will not complete the workflow until our activity indicates that it has completed (or is aborting for some reason).

Think about how the DelayActivity would work under the covers. The DelayActivity itself doesn't block the current thread, it yields back to the workflow runtime and indicates that it has not yet completed its work. Something else, then, signals the DelayActivity that a specified time period has elapsed, and the DelayActivity completes its work. The DelayActivity leverages a TimerEventSubscription from the WorkflowRuntime, and the runtime signals the DelayActivity when it is to resume its workload.

We will approach this in the same way. We will override the Execute method in our Activity and return the status of Executing to let the workflow runtime know that we are not done with all of our work yet. At the same time, we will also tell our custom service to start doing its work.

         protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
        {
            WorkflowQueuingService qservice = executionContext.GetService<WorkflowQueuingService>();
            WorkflowQueue q = qservice.GetWorkflowQueue(this.Name);
            q.QueueItemAvailable += this.ContinueAt; 
            FtpService s = executionContext.GetService<FtpService>();
            FtpWebRequest request = (FtpWebRequest)FtpWebRequest.Create(this.Url);
            request.Method = WebRequestMethods.Ftp.DownloadFile;
            FtpState state = new FtpState(request,LocalFilePath,this.Name,this.WorkflowInstanceId);
            s.BeginFtp(state);
            return ActivityExecutionStatus.Executing;
        }

Some interesting bits here to pay close attention to. The first is the notion of a WorkflowQueueingService. If you are familiar with MSMQ, then this is very similar in concept. WF under the covers provides a set of queues as a means of communicating between activities, services, and the workflow host. We add a delegate handler called "ContinueAt" that will be called whenever a queue item is available in our workflow queue. We then cruft up some data for an FtpWebRequest, and create a custom type called FtpState. We'll look at this in more detail in a bit, but it is simply a data transfer object to hold a bunch of data related to performing this FTP request. We call our custom FtpService and ask it to start the process, and then we tell the workflow runtime that we are not done with all of our work yet.

We mentioned the ContinueAt method. This is the "bookmark" that we use to continue processing. Our ContinueAt method is called when a workflow queue item is available. We use this to dequeue the item and do any processing. We could do something with the state, such as setting another dependency property that includes exeception data or the FTP status, that task is left to the reader as cleanup.

         void ContinueAt(object sender, QueueEventArgs e)
        {
            ActivityExecutionContext context = sender as ActivityExecutionContext;
            WorkflowQueuingService qservice = context.GetService<WorkflowQueuingService>();
            WorkflowQueue q = qservice.GetWorkflowQueue(this.Name);
            FtpState state = (FtpState)q.Dequeue();
            context.CloseActivity();
        }

The workflow activity framework provides a pipeline of events for processing our activity. Since our activity may be constructed and destructed many times during this lifecycle, we cannot rely on using a constructor for initialization data. An Initialize method is provided for us to perform initialization operations, such as ensuring that our workflow queue exists.

         protected override void Initialize(IServiceProvider provider)
        {
            FtpService s = (FtpService)provider.GetService(typeof(FtpService));
            if (null == s)
            {
                throw new InvalidOperationException("The FtpService has not been added to the workflow");
            }
            //TODO: Could also check for persistence service by looking for
            // WorkflowPersistenceService.  Ommitted for clarity.            
            WorkflowQueuingService qservice = (WorkflowQueuingService)provider.GetService(typeof(WorkflowQueuingService));
            if (!qservice.Exists(this.Name))
            {
                WorkflowQueue q = qservice.CreateWorkflowQueue(this.Name, true);
            }
        }

We can also handle the opposite case, tearing down any resources we created during the Initalize operation.

         protected override void Uninitialize(IServiceProvider provider)
        {
            WorkflowQueuingService qservice = (WorkflowQueuingService)provider.GetService(typeof(WorkflowQueuingService));
            if (qservice.Exists(this.Name))
            {
                qservice.DeleteWorkflowQueue(this.Name);
            }
        }
    }
}

That's our activity in its entirety. Once you break it down, there's not much to it. But how do we actually get work done? The answer lies in the underlying service. There's not much to it, really. We just download a stream and write it to file.

 using System;
using System.IO;
using System.Net;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
   

namespace Workflow.Ftp
{
    public partial class FtpService : WorkflowRuntimeService                
    {
        public void BeginFtp(FtpState state)
        {
            state.Request.BeginGetResponse(FtpCallBack, state);
        }


        public void FtpCallBack(IAsyncResult result)
        {
            FtpState state = (FtpState)result.AsyncState;
            FtpWebResponse response = null;
            try
            {
                response = (FtpWebResponse)state.Request.EndGetResponse(result);
                using(Stream responseStream = response.GetResponseStream())
                {
                    using(FileStream fileStream = new    FileStream(state.LocalFilePath, FileMode.OpenOrCreate))
                    {
                        CopyStream(responseStream,fileStream);
                    }
                }
                response.Close();
                state.StatusDescription = response.StatusDescription;
                //Tell the Activity that we are done with the download                
                WorkflowInstance instance = this.Runtime.GetWorkflow(state.WorkflowInstanceId);
                instance.EnqueueItem(state.WorkflowQueueName, state, null, null);                
            }
            catch (Exception e)
            {                
                state.OperationException = e;
            }
        }

        private void CopyStream(Stream source, Stream target)
        {
            byte[] buffer = new byte[1024];
            int size = 1024;
            int count = 0;
            count = source.Read(buffer, 0, size);
            while (count > 0)
            {
                target.Write(buffer, 0, count);
                count = source.Read(buffer, 0, size);
            }
            target.Flush();
        }
    }
}

Our service obtains a reference to a workflow instance and then enqueues an item into the specified queue. This is how our service will notify our custom activity that the asynchronous work has completed and it can resume where it left off... at its ContinueAt method! A trap that I ran into while trying to figure this out: you might be tempted to create a new WorkflowRuntime and then call its GetWorkflow method. There is an easier way: just inherit your service from WorkflowRuntimeService, and you will have access to the WorkflowRuntime through its inherited member "Runtime". Our FtpService doesn't have to derive from WorkflowRuntimeService, but it sure makes things easier by having access to the WorkflowRuntime through the "Runtime" property.

The final bit of plumbing to show is the FtpState class. Nothing much to tell about here, its just a data transfer object between the activity and the workflow service. I chose to make most of the properties read only (omitting the set accessor) because those properties should not be changed once the FTP process has started.

 using System;
using System.Threading;
using System.Net;
   
namespace Workflow.Ftp
{
    public class FtpState                
    {
        private FtpWebRequest _request;
        private Uri _url;
        private string _localFilePath;
        private Exception operationException = null;
        private string _workflowQueueName;
        private Guid _workflowInstanceId;
        private string status;


        public FtpState(FtpWebRequest request,string localFilePath, string workflowQueueName, Guid workflowInstanceId)
        {
            _request = request;
            _url = request.RequestUri;
            _localFilePath = localFilePath;
            _workflowQueueName = workflowQueueName;
            _workflowInstanceId = workflowInstanceId;
        }
        public FtpWebRequest Request
        {
            get { return _request; }            
        }
   

        public Uri Url
        {
            get { return _url; }         
        }
   

        public string LocalFilePath
        {
            get { return _localFilePath; }            
        }
   

        public Exception OperationException
        {
            get { return operationException; }
            set { operationException = value; }
        }
   

        public string StatusDescription
        {
            get { return status; }
            set { status = value; }
        }
   

        public string WorkflowQueueName
        {
            get { return _workflowQueueName; }            
        }
   

        public Guid WorkflowInstanceId
        {
            get { return _workflowInstanceId; }            
        }
    }
}

We will now focus on our Workflow Host. Like any good demo, we will use a Console app. In the Console app, we spin up a workflow runtime and add services to the workflow, same as usual. Only this time, we will add our custom service to the runtime as well, which is highlighted. The host is pretty simple, it's pretty much the default one that Visual Studio will create for you with the addition of our custom service. We also add the SqlWorkflowPersistenceService in there as well, since that is required for our activity.

 #region Using directives
   

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Workflow.Runtime;
using System.Workflow.Runtime.Hosting;
using Workflow.Ftp;
   

#endregion                
   

namespace WorkflowConsoleApplication1
{
    class Program            
    {
        static AutoResetEvent waitHandle;
   

        static void Main(string[] args)
        {
   

            using(WorkflowRuntime workflowRuntime = new                WorkflowRuntime())
            {                                
                FtpService s = new FtpService();
                workflowRuntime.AddService(s);                
   

                SqlWorkflowPersistenceService ps = new 
                   SqlWorkflowPersistenceService("Data Source=(local);Initial Catalog=SqlPersistenceService;Integrated Security=True");
                workflowRuntime.AddService(ps);
                waitHandle = new AutoResetEvent(false);
                workflowRuntime.WorkflowCompleted += new                EventHandler<WorkflowCompletedEventArgs>(workflowRuntime_WorkflowCompleted);
                workflowRuntime.WorkflowTerminated += delegate(object sender, WorkflowTerminatedEventArgs e)
                {
                    Console.WriteLine(e.Exception.Message);
                    waitHandle.Set();
                };
   

                WorkflowInstance instance = workflowRuntime.CreateWorkflow(typeof(WorkflowConsoleApplication1.Workflow1));
                instance.Start();
   

                waitHandle.WaitOne();
            }
        }
   

        static void workflowRuntime_WorkflowCompleted(object sender, WorkflowCompletedEventArgs e)
        {
            waitHandle.Set();
        }
    }
}

Now that we completed the activity, the service, the asynchronous state object, and our workflow host, let's see how we can use the activity from a workflow. Here's the XOML for the workflow. Pretty uninteresting, other than it completes the story.

 <SequentialWorkflowActivity    
         x:Class="WorkflowConsoleApplication1.Workflow2"
         x:Name="Workflow2"    
         xmlns:ns0="clr-namespace:Workflow.Ftp;Assembly=Workflow.Ftp, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null"
         xmlns:x="https://schemas.microsoft.com/winfx/2006/xaml"
         xmlns="https://schemas.microsoft.com/winfx/2006/xaml/workflow">                
  <ns0:GetFileActivity 
         x:Name="Activity1"    
         LocalFilePath="c:\temp\testing.txt">                
    <ns0:GetFileActivity.Url>                
      <ns1:Uri
             xmlns:ns1="clr-namespace:System;Assembly=System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" />
    </ns0:GetFileActivity.Url>                
  </ns0:GetFileActivity>                
</SequentialWorkflowActivity>                

One caveat: now that I completed this and typed out this ultra-long blog post, I found out that the Workflow designer doesn't like having the System.Uri property, tossing an error "CreateInstance failed for type 'System.Uri'. No parameterless constructor defined for this object." You could rectify this by changing the Url property of the activity back to a string and then parsing it with the System.Uri type to ensure that the scheme is FTP. Even though the designer doesn't like it, the workflow still runs like a champ.

This may look like a bit of code, but there really isn't much to it. Here's a summary of what you can do to process asynchronous operations using a custom workflow activity:

Create a custom activity
  1. Override the Execute method and return ActivityExecutionStatus.Executing.
  2. Specify a "ContinueAt" method that will execute when a queue message is received.
Create a custom service
  1. Process the work asynchronously
  2. Enqueue a message when the work is completed

This has a lot of applications going forward. For instance, the InvokeWebServiceActivity that ships as part of Windows Workflow Foundation invokes web services synchronously. If the web services are being invoked from ASP.NET pages, then you want to perform that work asynchronously, and the way to achieve that would be to use this pattern. Maybe that's the next article… calling a web service asynchronously. Maybe a few less lines of code, but the pattern will be basically the same.

The sample code is available as a download, attached to this blog post.  I welcome feedback, please feel free to leave comments!

Hmm… I wonder… maybe there's a pattern here for generalized activities that support the IAsyncResult pattern?

[Update 9/12/2007: After several email requests, I fixed the horrible code formatting to something more readable.]

Workflow.Ftp.zip