Advanced Workflow Services Talk (Demo 3 of 4)

So, we've seen in part 1 how to manage context, we saw in part 2 how we can take that basic knowledge to do duplex messaging.  Once we start doing duplex work, there are some interesting patterns, and the first one is one that we like to call "long running work".  Why are we interested in this?  Well, as you probably know, the execution of a workflow is single threaded (this is a feature, not a bug).  We also don't have a mechanism to force the workflow to be "pinned" in memory.  What this means is that things like the asynchronous programming model  (APM), can't be used, since there isn't a guarantee that there will be something to call back when we are done.  What this means is that the send activity can not take advantage of the APM to be more thread friendly.

We may want to do things in parallel, like this

image

If each of these branches takes 3 seconds, the whole of this workflow will complete in about 9 seconds.  The general expectation is that in parallel, this would happen at the length of the longest branch + some minor delta for overhead.  The trouble is, APM programming is tricky, especially relative to the layout above.

In order to model APM style service calls, but allowing for the service operations to be extremely long running, where extremely is defined as "long enough to where I would want to be able to persist."  The approach then is to model this as disjoint send and receive activities.

image

One intermediate step is to simply use one way messaging, but the problem there is that in a lot of cases, I'm looking for some information being sent back to me. 

I'll hold off on the code for the above, the fact we are listening in parallel for the same operation requires us to be a little more clever.

Let's look first at our contract, and then our service implementation:

    1:  namespace Long_Running_Work
    2:  {
    3:      [ServiceContract]
    4:      public interface ILongRunningWork
    5:      {
    6:          [OperationContract]
    7:          string TakeAWhile(int i);
    8:   
    9:          [OperationContract(IsOneWay = true)]
   10:          void OneWayTakeAWhile( int i);
   11:          
   12:          [OperationContract(IsOneWay = true)]
   13:          void TakeAWhileAndTellMeLater(IDictionary<string,string> contextToken, int i);
   14:      }
   15:   
   16:   
   17:      [ServiceContract]
   18:      public interface IReverseContract
   19:      {
   20:          [OperationContract(IsOneWay = true)]
   21:          void TakeAWhileAndTellMeLaterDone(string s);
   22:      }
   23:     
   24:  }

And now for the implementation of these;

    1:  namespace Long_Running_Work
    2:  {
    3:     public class Service1 : ILongRunningWork
    4:      {
    5:   
    6:          public Service1()
    7:          {
    8:             
    9:          }
   10:   
   11:          #region ILongRunningWork Members
   12:   
   13:          public string TakeAWhile(int i)
   14:          {
   15:              Console.WriteLine("Starting TakeAWhile");
   16:              System.Threading.Thread.Sleep(new TimeSpan(0, 0, 3));
   17:              return i.ToString();
   18:          }
   19:   
   20:   
   21:   
   22:          public void OneWayTakeAWhile( int i)
   23:          {
   24:              Console.WriteLine("Starting One Way TakeAWhile");
   25:              System.Threading.Thread.Sleep(new TimeSpan(0, 0, 3));
   26:              Console.WriteLine("Ending One Way TakeAWhile");
   27:   
   28:   
   29:          }
   30:   
   31:   
   32:          public void TakeAWhileAndTellMeLater(IDictionary<string, string> context, int i)
   33:          {
   34:              Console.WriteLine("Received the context Token");
   35:              System.Threading.Thread.Sleep(new TimeSpan(0, 0, 3));
   36:              Console.WriteLine("Need to Message Back Now {0}", i.ToString());
   37:              // could investigate a more useful pooling of these if we 
   38:              // really wanted to worry about perf
   39:              IReverseContractClient ircc = new IReverseContractClient(
   40:                  new NetTcpContextBinding(),
   41:                  new EndpointAddress("net.tcp://localhost:10003/ReverseContract")
   42:                  );
   43:              IContextManager icm = ircc.InnerChannel.GetProperty<IContextManager>();
   44:              icm.SetContext(context);
   45:              ircc.TakeAWhileAndTellMeLaterDone(i.ToString());
   46:          }
   47:   
   48:   
   49:   
   50:          #endregion
   51:      } 
   52:   
   53:     public class IReverseContractClient : ClientBase<IReverseContract>, IReverseContract
   54:     {
   55:          public IReverseContractClient() : base(){}
   56:          public IReverseContractClient(System.ServiceModel.Channels.Binding binding, EndpointAddress address) : base(binding, address) { }
   57:   
   58:  #region IReverseContract Members
   59:   
   60:   
   61:   
   62:         public void TakeAWhileAndTellMeLaterDone(string s)
   63:         {
   64:             base.Channel.TakeAWhileAndTellMeLaterDone(s);
   65:         }
   66:   
   67:         #endregion
   68:     }
   69:   
   70:  }

Basically, we sit around and wait.  You'll also note in the TakeAWhileAndTellMeLater, we take in a context token (similar to our previous approach), and we will use that to new up a client at the end and call back in after setting the context.  Look at lines 39-44 above.  The nice thing about this is that my above workflow client can actually go idle, persist, and react to a message being delivered later on.

One thing to note is that one should not place a delay between any of the Send and Receives.  This could cause the workflow to go idle, which may allow you to miss messages.  This is generally considered, a bad thing.  The reason this occurs is that the WorkflowOperationInvoker will use EnqueueOnIdle which means that when teh workflow goes idle, the message will be enqueued.  If the queue hasn't been created by the Receive activity, the message will not get delivered.

For the final workflow above (the TakeAWhileAndTellMeLater workflow), I will need to spin this up in a WorkflowServiceHost (a la the Duplex Sample in part 2).

 using (WorkflowServiceHost wsh = new WorkflowServiceHost(typeof(CallLongRunningComponents.WorkflowWithmessaging)))
{
    wsh.AddServiceEndpoint(
            typeof(Long_Running_Work.IReverseContract),
            new NetTcpContextBinding(),
           "net.tcp://localhost:10003/ReverseContract"
            );
    // don't forget to open up the wsh
    WorkflowRuntime wr = wsh.Description.Behaviors.Find<WorkflowRuntimeBehavior>().WorkflowRuntime;

    wsh.Open();


    WorkflowInstance wi = wr.CreateWorkflow(
        typeof(CallLongRunningComponents.WorkflowWithmessaging));
    wr.WorkflowCompleted += ((o, e) => waitHandle.Set());
    wr.WorkflowIdled += ((o, e) => Console.WriteLine("We're idled"));
        

    wi.Start();




    waitHandle.WaitOne();

}

Why do I think this is cool?

Two reasons:

  • If I assume that I can modify the called service to callback to me (or put such a wrapper at a runtime service level), this is easier to model than the APM (that code included at the end of this post)
  • This gives me a natural way to start exposing more advanced control over a service call.  Rather than just a send and receive, I can use a send and a listen, and in the listen have a receive, a cancel message receive, and a delay in order to expose more fine grained control points for my workflow, and model the way the process should work very explicitly and declaratively.

image

 

Code for APM approach:

call some services and wait:

    1:  Console.WriteLine("Press <enter> to execute APM approach");
    2:  Console.ReadLine();
    3:  waitHandle = new AutoResetEvent(false);
    4:  Stopwatch sw = new Stopwatch();
    5:  sw.Start();
    6:  lrwc = new WorkflowHost.ServiceReference1.LongRunningWorkClient();
    7:  lrwc.BeginTakeAWhile(1, HandleClientReturn, "one");
    8:  lrwc.BeginTakeAWhile(2, HandleClientReturn, "two");
    9:  lrwc.BeginTakeAWhile(3, HandleClientReturn, "three");
   10:  lrwc.BeginTakeAWhile(4, HandleClientReturn, "four");
   11:  while (!areDone)
   12:  {
   13:      System.Threading.Thread.Sleep(25);
   14:  }
   15:  Console.WriteLine("APM approach compelted in {0} milliseconds", sw.ElapsedMilliseconds);
   16:  Console.WriteLine("All Done, press <enter> to exit");
   17:  Console.ReadLine();

Ignore the busy wait on line 11, I should use a waithandle here but was having trouble getting it to work correctly (this is hard code).

The callback and respective state:

    1:  static ServiceReference1.LongRunningWorkClient lrwc;
    2:  static Int32 countOfFinished = 0;
    3:   
    4:  static void HandleClientReturn(IAsyncResult result)
    5:  {
    6:      string s = (string)result.AsyncState;
    7:      string resultString = lrwc.EndTakeAWhile(result);
    8:      Console.WriteLine("received {0}", resultString);
    9:      if (Interlocked.Increment(ref countOfFinished) == 4)
   10:      {
   11:          areDone = true;
   12:      }
   13:  }

I have had some people say that line 9 should use Interlocked.CompareExchange in order to do this correctly, but the point is that this is tricky code, that modeling in WF is pretty nice.  [ignoring for the moment the work required to realize the assumption that we can make the service message back.]