WF – The Asynchronous Activity/Service Pattern


In my previous blog post (Creating an Asynchronous Workflow Activity), I explained why your custom activities should either be really fast or run asynchronously. But, I didn't give you a real world example of how to do this. In this post I provide an example of the pattern that my team uses when creating a custom activity.

Pattern:

1) Create a service that actually does the work

2) Create a custom activity that uses the service and has a Queue listener event handler to get a message back from the service when the work is done.

Instead of walking you through this one, I just want to present you with the code, commented as much as I could 🙂

    public class DoSomethingAsyncService : WorkflowRuntimeService
    {
        private Random m_random;

        public DoSomethingAsyncService()
        {
            m_random = new Random();
        }

        public void DoSomethingAsync(Guid instanceId, Guid queueId)
        {
            ThreadPool.QueueUserWorkItem(delegate(Object state)
                {
                    // Fake a call to a WebService (let's say it takes 10 seconds)
                    Console.WriteLine("Making webservice call.");
                    Thread.Sleep(10000);

                    // Create the QueueItem
                    QueueItem item = new QueueItem();
                    item.ReturnCode = m_random.Next(0, 10);
                    item.Message = String.Format("The return code is {0}.", item.ReturnCode);

                    try
                    {
                        // Now send this item to the appropriate queue
                        WorkflowInstance instance = Runtime.GetWorkflow(instanceId);
                        instance.EnqueueItem(queueId, item, null, null);
                    }
                    catch (InvalidOperationException)
                    {
                        // Catch any InvalidOperationExceptions that occur due to the 
                        // workflow having already completed, etc.
                    }
                });
        }
    }

    public class QueueItem
    {
        public int ReturnCode;
        public String Message;
    }
    public class DoSomethingAsyncActivity : Activity
    {
        private Guid m_queueId;

        protected override ActivityExecutionStatus Execute(ActivityExecutionContext executionContext)
        {
            // Create my queue
            m_queueId = Guid.NewGuid();
            WorkflowQueuingService queuingService = (WorkflowQueuingService)executionContext.GetService(typeof(WorkflowQueuingService));
            WorkflowQueue queue = queuingService.CreateWorkflowQueue(m_queueId, true);

            // Hook the item available event. This event will be triggered when our
            // service enqueues an item.
            queue.QueueItemAvailable += new EventHandler<QueueEventArgs>(QueueItemAvailable);

            // Call the service to perform the async work.
            DoSomethingAsyncService service = (DoSomethingAsyncService)executionContext.GetService(typeof(DoSomethingAsyncService));
            service.DoSomethingAsync(WorkflowInstanceId, m_queueId);

            // Return Executing to let the workflow know that we are still working.
            return ActivityExecutionStatus.Executing;
        }

        private void QueueItemAvailable(object sender, QueueEventArgs e)
        {
            // The activity execution context is always passed in as the sender.
            ActivityExecutionContext context = sender as ActivityExecutionContext;

            // Checking for a null context just in case something really strange is happening.
            if (context != null)
            {
                // Get the queue service so we can get the Queue object.
                // Note: you cannot salt away the actual queue object, the Workflow runtime does not allow it.
                WorkflowQueuingService queuingService = context.GetService<WorkflowQueuingService>();

                // Make sure the queue exists before going any further.
                if (queuingService.Exists(m_queueId))
                {
                    WorkflowQueue queue = queuingService.GetWorkflowQueue(m_queueId);

                    // Dequeue the item that was queued by the service.
                    QueueItem item = queue.Dequeue() as QueueItem;
                    if (item != null)
                    {
                        // We only expect one thing to ever be put into our Queue;
                        // so, now we remove the queue.
                        queue.QueueItemAvailable -= QueueItemAvailable;
                        queuingService.DeleteWorkflowQueue(m_queueId);

                        // Finally inspect the results and do something with them.
                        Console.WriteLine(item.Message);

                        // Once we have finished our mission, we close this activity
                        context.CloseActivity();
                        return;
                    }
                }
            }

            // We should not get here if everything works as we expect.
            // So, throw an exception so the workflow will be terminated.
            throw new Exception("Something unexpected was placed in the Queue.");
        }
    }

Try creating a workflow that contains a Parallel activity and add this activity to both branches. If you really want to run activities in parallel, this is how it needs to be done.

Don't forget to add the service to the runtime!

Happy flowing!

Comments (4)
  1. (or Designing Your Workflow Activity Class Hierarchy) One of the questions that we’ve had to consider

  2. Mick.Lang says:

    Interesting article.

    I’m just starting off in WF so I just want to verify a few things.

    Would the first option, before taking this step to implement asychronous execution for an Actiivity, be to think about breaking up a long running activity into a CompositeActivity containing shorter components ?

    The example you’ve given where the delay is due to invoking Web or WCF services in code, in reality you would use the standard WF framework activities?  

    Correct me if I’m wrong, but you’re not going to encounter thread blocking using SendActivity, ReceiveActivity, WebServiceInput or WebServiceOutput activities are you?

  3. Hi Mick,

    I think you should try to avoid long running activities, but there are cases where you can’t. In our case we are calling out to run scripts that can take a long time to run or as you pointed out we are calling Web Services. We wanted one solution to all our problems and this is the solution that we came up with. I believe the WF team knows about these problems and are activitly working to fix them.

    The thread blocking question is an interesting one. I don’t think you should have any problems, but I haven’t tried the Send and Receive activities myself.

    Thanks for the comment and good luck with your WF 🙂

Comments are closed.

Skip to main content