Custom WCF Transport Channel Part 2

In the previous post, I simply gathered the code for creating a custom WCF transport channel from Nicholas Allen's blog and updated it for .Net 4.0. The sample is a good start but the client and service code is a bit contrived. They are written specifically to avoid asynchronous code paths. You will immediately run into issues when you try to use the transport channel for a service contract. In this post, I show what changes I made to support service contracts with the custom transport channel.

Click here to see all the posts in this series. All the code is now available on github here: https://github.com/dmetzgar/custom-transport-channel

The first step is to create the service contract. Add a library project to the solution called SharedContracts, add a reference to System.ServiceModel, and add the following service contract interface:

 using System.ServiceModel;

namespace SharedContracts
{
    [ServiceContract]
    public interface IReverse
    {
        [OperationContract]
        string ReverseString(string text);
    }
}

This contract is the similar to Nicholas's example. It replies with a backwards version of the request string.

Add the SharedContracts project as a reference to the client and service projects. The best place to start is with the service. Add a new class to the service called Reverse that implements the service contract above.

 using SharedContracts;

namespace CustomTransportChannelService
{
    class Reverse : IReverse
    {
        public string ReverseString(string text)
        {
            return Program.ProcessReflectRequest(text);
        }
    }
}

Next, open up Program.cs in the service project and replace it with the following code:

 using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using CustomTransportChannelLibrary;
using SharedContracts;

namespace CustomTransportChannelService
{
    class Program
    {
        static void Main(string[] args)
        {
            Binding binding = new FileTransportBinding();
            Uri uri = new Uri("my.file://localhost/x");
            using (ServiceHost serviceHost = new ServiceHost(typeof(Reverse)))
            {
                serviceHost.AddServiceEndpoint(typeof(IReverse), binding, uri);
                serviceHost.Open();

                Console.WriteLine("The service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.ReadLine();
            }
        }

        internal static string ProcessReflectRequest(string request)
        {
            char[] output = new char[request.Length];
            for (int index = 0; index < request.Length; index++)
            {
                output[index] = request[request.Length - index - 1];
            }
            return new string(output);
        }
    }
}

The service code here is definitely much simpler. However, if you try to run the service, you'll notice that there are plenty of NotImplementedExceptions thrown since it uses some async code paths.

The first change is to the channel listener. The call to AcceptChannel is now done with the async pattern. For the file protocol we don't do anything of note in AcceptChannel except create the FileReplyChannel object. First we'll need an IAsyncResult implementation. For the purposes of this, I added a file called AsyncResult.cs to the CustomTransportChannelLibrary. Then I added the code for a dummy AsyncResult that would suit this use.

 using System;
using System.Threading;

namespace CustomTransportChannelLibrary
{
    class DummyAsyncResult : IAsyncResult
    {
        ManualResetEvent waitHandle = new ManualResetEvent(true);

        public TimeSpan Timeout { get; set; }

        public object AsyncState { get; set; }

        public WaitHandle AsyncWaitHandle
        {
            get { return this.waitHandle; }
        }

        public bool CompletedSynchronously
        {
            get { return true; }
        }

        public bool IsCompleted
        {
            get { return true; }
        }
    }
}

Notice that I create a wait handle set to true, set sync completion to true, and set IsComplete to true.

The next step is to change the AcceptChannel methods. I've placed the sync version of AcceptChannel here for comparison.

         protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
        {
            EndpointAddress address = new EndpointAddress(Uri);
            return new FileReplyChannel(this.bufferManager, this.encoderFactory, address, this);
        }

        protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            DummyAsyncResult asyncResult = new DummyAsyncResult() { AsyncState = state, Timeout = timeout };
            if (callback != null)
                callback(asyncResult);
            return asyncResult;
        }

        protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
        {
            return this.OnAcceptChannel((result as DummyAsyncResult).Timeout);
        }

Set breakpoints on the begin and end methods and try running just the service application. You should see first the Begin method and then the End method get executed. However, if you keep going you'll notice that WCF will just keep calling Begin and End. Remove the breakpoints and you'll run until getting an OOM exception.

This may seem an odd behavior but WCF is relying on the channel listener to decide how many channels it wants to have open. If you look into the HttpChannelListener in the .Net framework code, you'll eventually end up at a class called InputQueue. For a file transport like ours, we only need one channel because we're only watching one directory. I won't go into extending this transport channel to support multiple directories. Instead, I'll make another AsyncResult to handle this issue.

 using System;
using System.IO;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Threading;

namespace CustomTransportChannelLibrary
{
    class FileReplyChannelListener : ChannelListenerBase<IReplyChannel>
    {
        FileReplyChannel replyChannel;
        AcceptChannelAsyncResult acceptChannelAsyncResult;

        private static ManualResetEvent acceptChannelWaitHandle = new ManualResetEvent(false);

        protected override void OnClose(TimeSpan timeout)
        {
            if (this.acceptChannelAsyncResult != null)
            {
                FileReplyChannelListener.acceptChannelWaitHandle.Set();
                if (this.acceptChannelAsyncResult.Callback != null)
                    this.acceptChannelAsyncResult.Callback(this.acceptChannelAsyncResult);
            }
        }

        protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
        {
            if (this.replyChannel != null)
                return this.acceptChannelAsyncResult = new AcceptChannelAsyncResult() { AsyncState = state, Callback = callback };

            DummyAsyncResult asyncResult = new DummyAsyncResult() { AsyncState = state, Timeout = timeout };
            if (callback != null)
                callback(asyncResult);
            return asyncResult;
        }

        protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
        {
            if (result is DummyAsyncResult)
                return this.OnAcceptChannel((result as DummyAsyncResult).Timeout);
            return null;
        }

        class AcceptChannelAsyncResult : IAsyncResult
        {
            public AsyncCallback Callback { get; set; }

            public object AsyncState { get; set; }

            public System.Threading.WaitHandle AsyncWaitHandle
            {
                get { return FileReplyChannelListener.acceptChannelWaitHandle; }
            }

            public bool CompletedSynchronously
            {
                get { return false; }
            }

            public bool IsCompleted { get; private set; }
        }

    }
}

Essentially all this does is give me a means of blocking the BeginAcceptChannel for the second call until the channel listener is closed. After the listener is closed, it calls the callback to let the EndAcceptChannel go through.

So that takes care of the channel listener for now. The next change is in the FileReplyChannel since async code paths will be executed there. The key method is WaitForRequest since this is the method that will block the thread in the sync case. We could just create a new thread and block that thread and return a custom AsyncResult. However, the FileSystemWatcher can do this without blocking thread, so let's just use that. Here is the original WaitForRequest code:

         public bool WaitForRequest(TimeSpan timeout)
        {
            ThrowIfDisposedOrNotOpen();
            try
            {
                File.Delete(PathToFile(LocalAddress.Uri, "request"));
                using (FileSystemWatcher watcher = new FileSystemWatcher(LocalAddress.Uri.AbsolutePath, "request"))
                {
                    watcher.EnableRaisingEvents = true;
                    WaitForChangedResult result = watcher.WaitForChanged(WatcherChangeTypes.Changed, (int)timeout.TotalMilliseconds);
                    return !result.TimedOut;
                }
            }
            catch (IOException exception)
            {
                throw ConvertException(exception);
            }
        }

This code blocks the thread on the WaitForChanged method. Instead of doing this, let's handle the Changed event and create a custom AsyncResult.

         public IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state)
        {
            ThrowIfDisposedOrNotOpen();
            try
            {
                File.Delete(PathToFile(LocalAddress.Uri, "request"));
                FileSystemWatcher watcher = new FileSystemWatcher(LocalAddress.Uri.AbsolutePath, "request");
                watcher.EnableRaisingEvents = true;
                WaitForRequestAsyncResult asyncResult = new WaitForRequestAsyncResult(watcher, state, timeout);
                watcher.Changed += new FileSystemEventHandler((obj, ea) =>
                {
                    if (ea.ChangeType == WatcherChangeTypes.Changed)
                    {
                        asyncResult.Complete(false);
                        if (callback != null)
                            callback(asyncResult);
                    }
                });
                return asyncResult;
            }
            catch (IOException exception)
            {
                throw ConvertException(exception);
            }
        }

        public bool EndWaitForRequest(IAsyncResult asyncResult)
        {
            bool result = (asyncResult as WaitForRequestAsyncResult).Result;
            (asyncResult as IDisposable).Dispose();
            return result;
        }

        class WaitForRequestAsyncResult : IAsyncResult, IDisposable
        {
            FileSystemWatcher watcher;
            ManualResetEvent waitHandle = new ManualResetEvent(false);

            public WaitForRequestAsyncResult(FileSystemWatcher watcher, object asyncState, TimeSpan timeout)
            {
                this.watcher = watcher;
                this.AsyncState = asyncState;
                if (timeout < TimeSpan.MaxValue)
                {
                    new Timer(new TimerCallback((obj) =>
                    {
                        this.Complete(true);
                    }),
                    null, (long)timeout.TotalMilliseconds, Timeout.Infinite);
                }
            }

            public object AsyncState
            {
                get;
                private set;
            }

            public WaitHandle AsyncWaitHandle
            {
                get { return this.waitHandle; }
            }

            public bool CompletedSynchronously
            {
                get { return false; }
            }

            public bool IsCompleted
            {
                get;
                private set;
            }

            public bool Result
            {
                get;
                private set;
            }

            public void Dispose()
            {
                if (watcher != null)
                    watcher.Dispose();
            }

            public void Complete(bool timedOut)
            {
                this.waitHandle.Set();
                this.IsCompleted = true;
                this.Result = !timedOut;
            }
        }

Note that this AsyncResult class is an inner class in the FileReplyChannel class. Also, the timeout value WCF passes can be TimeSpan.MaxValue and a Timer cannot be created with the value, so you have to check for it.

WCF does not call BeingWaitForRequest directly though. It calls through TryReceiveRequest instead. So we have to update the Begin/End methods for that. Luckily this is pretty simple since we can just use the same AsyncResult from WaitForRequest. I've included the sync version of the method for comparison.

         public bool TryReceiveRequest(TimeSpan timeout, out RequestContext context)
        {
            context = null;
            bool complete = this.WaitForRequest(timeout);
            if (!complete)
                return false;
            context = this.ReceiveRequest(DefaultReceiveTimeout);
            return true;
        }

        public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.BeginWaitForRequest(timeout, callback, state);
        }

        public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
        {
            context = null;
            bool complete = this.EndWaitForRequest(result);
            if (!complete)
                return false;
            context = this.ReceiveRequest(DefaultReceiveTimeout);
            return true;
        }

Now you should be able to run the service without running into any exceptions. It will just wait for the client to send a request, so let's update the client now too. I believe most people will generate a client using a service reference inside Visual Studio or by using the command line tool, however these classes are pretty easy to create by hand. Add a class called ReverseClient to the client project.

 using System.ServiceModel;
using System.ServiceModel.Channels;
using SharedContracts;

namespace CustomTransportChannelClient 
{
    class ReverseClient : ClientBase<IReverse>, IReverse
    {
        public ReverseClient(Binding binding, EndpointAddress remoteAddress) :
            base(binding, remoteAddress)
        {
        }

        public string ReverseString(string text)
        {
            return base.Channel.ReverseString(text);
        }
    }
}

Nothing too complex there. Now update the Program.cs for the client to look like this:

 using System;
using System.ServiceModel;
using System.ServiceModel.Channels;
using CustomTransportChannelLibrary;

namespace CustomTransportChannelClient
{
    class Program
    {
        static void Main(string[] args)
        {
            Binding binding = new FileTransportBinding();
            Uri uri = new Uri("my.file://localhost/x");
            ReverseClient client = new ReverseClient(binding, new EndpointAddress(uri));

            while (true)
            {
                Console.Write("Enter some text (Ctrl-Z to quit): ");
                String text = Console.ReadLine();
                if (text == null)
                    break;
                string response = client.ReverseString(text);
                Console.WriteLine("Reply: {0}", response);
            }

            client.Close();
        }
    }
}

Set the solution to startup both client and service at the same time. You should now have a working transport channel that uses service contracts.

CustomTransportChannel.zip