WCF Extensibility – Transport Channels – Reply Channel

This post is part of a series about WCF extensibility points. For a list of all previous posts and planned future ones, go to the index page .

And after the little detour on the previous post about the IInteractiveChannelInitializer interface, we’re back on the channel layer, most specifically at the transports. Last time, we covered (extensively) the simplest kind of transport channel, a request channel. This time we’ll go to the other side of the communication and cover the reply transport channel (i.e., one which implements the IReplyChannel interface).

The reply channel is one which can be used in request / reply protocols (such as HTTP): the server receives a request, then sends a response back and the communication episode is finished. That is different from a duplex channel (in which after the original connection the server can send multiple “responses” to a single client request), or one-way channels (where the server receives a client request but doesn’t respond with anything). In WCF a reply channel is implemented using a RequestContext; when a new request arrives at the channel a new context is created and passed to the application; the application then uses the request context to send the response back to the client. That’s a better architecture than a simple “application receives message then returns response to the transport channel”, because with the request context the concurrency is improved (the transport can handle multiple requests to the application before a response is returned).

This post will describe the reply channel interface, and the request context. On the sample section, we’ll continue on the JSON-RPC scenario, this time writing the server part. Even though this is more complex than the request channel itself, since we already have the base channel implemented from the previous post, we’ll be able to reuse it and the reply channel itself won’t be too complex.

Public implementations in WCF

None. As with most channel extensibility points, there are no public implementations in WCF. There are many internal implementations, such as the one used by the HTTP transport.

Interface definition

  1. public interface IReplyChannel : IChannel, ICommunicationObject
  2. {
  3.     EndpointAddress LocalAddress { get; }
  4.  
  5.     IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state);
  6.     IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state);
  7.     IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state);
  8.     IAsyncResult BeginWaitForRequest(TimeSpan timeout, AsyncCallback callback, object state);
  9.     RequestContext EndReceiveRequest(IAsyncResult result);
  10.     bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context);
  11.     bool EndWaitForRequest(IAsyncResult result);
  12.     RequestContext ReceiveRequest();
  13.     RequestContext ReceiveRequest(TimeSpan timeout);
  14.     bool TryReceiveRequest(TimeSpan timeout, out RequestContext context);
  15.     bool WaitForRequest(TimeSpan timeout);
  16. }

The reply channel interface includes many variations of methods to receive a request and get a request context. The synchronous ReceiveRequest will block the calling thread until a request arrives (or a timeout expires) and then return a request context. The asynchronous BeginReceiveRequest and EndReceiveRequest will not block the request on the begin call; when a new request arrives the callback passed by the user is called, and the user can then call the end method to get the request context. If the timeout is elapsed, a TimeoutException will be thrown in those cases (by ReceiveRequest on the synchronous path, or by EndReceiveRequest on the asynchronous path).

To avoid dealing with exceptions, this interface also exposes the Try… pattern: TryReceiveRequest (synchronous) and BeginTryReceiveRequest / EndTryReceiveRequest (asynchronous) are similar to their try-less siblings, but instead of throwing an exception when the timeout expires and no request arrives, those methods will simply return false – and when a request arrives, the context is returned in an out parameter. Yet another alternative exposed by this interface is the Wait… pattern: WaitForRequest / BeginWaitForRequest / EndWaitForRequest – instead of dealing with exceptions, the method will simply return true or false if a request is available (and after that the user can call the synchronous ReceiveRequest method which won’t block and return the context immediately). When using the WCF service model, the method which will be called are the BeginTryReceiveRequest / EndTryReceiveRequest in a loop (this can be changed using the SynchronousReceiveBehavior).

The last member of the interface is the LocalAddress property, which gets the address where the channel is receiving the messages.

  1. public abstract class RequestContext : IDisposable
  2. {
  3.     public abstract Message RequestMessage { get; }
  4.  
  5.     public abstract void Abort();
  6.     public abstract IAsyncResult BeginReply(Message message, AsyncCallback callback, object state);
  7.     public abstract IAsyncResult BeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state);
  8.     public abstract void Close();
  9.     public abstract void Close(TimeSpan timeout);
  10.     public abstract void EndReply(IAsyncResult result);
  11.     public abstract void Reply(Message message);
  12.     public abstract void Reply(Message message, TimeSpan timeout);
  13. }

When the caller of IReplyChannel receives a RequestContext for a new request, they can use the RequestMessage property to get the incoming message. When the response is ready to be sent, they can call the Reply method (or the asynchronous BeginReply / EndReply pair). WCF will by default call the synchronous Reply method when returning responses to the client, unless changed by the DispatcherSynchronizationBehavior.

How to add a transport channel (server side)

Like other channels, transport channels are added by using a channel factory (client) / or channel listener (server), which are created by a custom binding element. For reply channels, since they’re server-only, the binding element must override BuildChannelListener<TChannel> to return. The binding element corresponding to a transport channel must be derived from the TransportBindingElement class. Besides the normal properties from the base BindingElement class, the transport binding element also defines a Scheme property, which needs to return the URI scheme for the transport. The sample code below will show an example of defining a reply transport channel.

Real world scenario: a JSON-RPC service

On the original post about transport channels, I mentioned one customer from the forums who wanted to create a service for the JSON-RPC protocol. The original posts dealt with the client part of such service, now we finally get to the server part with the reply channel.

For this sample, I’ll reuse some of the classes from the sample from the request channel post – especially the base channel which is essentially ready for both client and server usage, so if you haven’t already done so, I’d strongly recommend to read those posts.

So, on to the sample. Like in the transport, since we don’t have any service model support for the server side, we need to start with an untyped message contract. The service which implements this contract simply echoes whichever input is passed to it (at this time there’s no JSON-RPC yet, we’ll get there). We’ll then write the transport which can be used for this service using the same framing protocol we used before.

  1. [ServiceContract]
  2. public interface IUntypedTest
  3. {
  4.     [OperationContract(Action = "*", ReplyAction = "*")]
  5.     Message Process(Message input);
  6. }
  7.  
  8. public class Service : IUntypedTest
  9. {
  10.     public Message Process(Message input)
  11.     {
  12.         Console.WriteLine("[service] input = {0}", input);
  13.         byte[] bytes = Formatting.MessageToBytes(input);
  14.         Debugging.PrintBytes(bytes);
  15.  
  16.         return Formatting.BytesToMessage(bytes);
  17.     }
  18. }

Now that what we want to accomplish is defined, let’s go to the transport. But before going further, here goes the usual disclaimer: this is a sample for illustrating the topic of this post, this is not production-ready code. I tested it for a few contracts and it worked, but I cannot guarantee that it will work for all scenarios (please let me know if you find a bug or something missing). I really kept the error checking to a minimum (especially in the “simple service” project and in many other helper functions whose inputs should be sanitized, to make the sample small. Also, I practically didn’t implement any timeouts in the channel (they’re mostly being ignored), otherwise this sample which is rather large would be even more complex.

First, the binding element. This is the same binding element as in the previous post, but I’m now overriding two more functions: CanBuildChannelListener<TChannel> and BuildChannelListener<TChannel>, which are used at the server side. At this point, we’ll support the IReplyChannel interface, and on BuildChannelListener we’ll return a new class, SizedTcpChannelListener.

  1. public override bool CanBuildChannelListener<TChannel>(BindingContext context)
  2. {
  3.     return typeof(TChannel) == typeof(IReplyChannel);
  4. }
  5.  
  6. public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
  7. {
  8.     return (IChannelListener<TChannel>)(object)new SizedTcpChannelListener(this, context);
  9. }

The listener class inherits from ChannelListenerBase<TChannel>, which takes care of some of the boilerplate code for managing the listener life cycle – we only need to override the functions to open, close and accept channel (well, all of its variants). In the listener shown below, the constructor does some basic validation to ensure that the binding has the correct encoder (like in the request channel post, this transport requires the ByteStreamMessageEncodingBindingElement). Opening the listener is done by binding the socket to the local address and starting listening. Since those are fairly fast operations, even the asynchronous path uses the synchronous method in the socket, and returns a CompletedAsyncResult which is completed right away. The same for closing the listener, just close the socket. Then we get to the main operation of the channel listener: AcceptChannel (and its asynchronous counterparts, BeginAcceptChannel and EndAcceptChannel). The synchronous path is simple: the method calls Accept on the socket, then wraps it on a reply channel. For the asynchronous path, we’ll follow the “create an AsyncResult implementation which does the work and return it on Begin” which we used extensively on the post about request channels. The listener base class also has methods which follow the Wait… pattern, but since WCF doesn’t call them, I’ll leave them unimplemented in this sample.

  1. class SizedTcpChannelListener : ChannelListenerBase<IReplyChannel>
  2. {
  3.     BufferManager bufferManager;
  4.     MessageEncoderFactory encoderFactory;
  5.     Socket listenSocket;
  6.     Uri uri;
  7.  
  8.     public SizedTcpChannelListener(SizedTcpTransportBindingElement bindingElement, BindingContext context)
  9.         : base(context.Binding)
  10.     {
  11.         // populate members from binding element
  12.         int maxBufferSize = (int)bindingElement.MaxReceivedMessageSize;
  13.         this.bufferManager = BufferManager.CreateBufferManager(bindingElement.MaxBufferPoolSize, maxBufferSize);
  14.  
  15.         Collection<MessageEncodingBindingElement> messageEncoderBindingElements
  16.             = context.BindingParameters.FindAll<MessageEncodingBindingElement>();
  17.  
  18.         if (messageEncoderBindingElements.Count > 1)
  19.         {
  20.             throw new InvalidOperationException("More than one MessageEncodingBindingElement was found in the BindingParameters of the BindingContext");
  21.         }
  22.         else if (messageEncoderBindingElements.Count == 1)
  23.         {
  24.             if (!(messageEncoderBindingElements[0] is ByteStreamMessageEncodingBindingElement))
  25.             {
  26.                 throw new InvalidOperationException("This transport must be used with the ByteStreamMessageEncodingBindingElement.");
  27.             }
  28.  
  29.             this.encoderFactory = messageEncoderBindingElements[0].CreateMessageEncoderFactory();
  30.         }
  31.         else
  32.         {
  33.             this.encoderFactory = new ByteStreamMessageEncodingBindingElement().CreateMessageEncoderFactory();
  34.         }
  35.  
  36.         this.uri = new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress);
  37.     }
  38.  
  39.     protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
  40.     {
  41.         try
  42.         {
  43.             Socket dataSocket = listenSocket.Accept();
  44.             return new SizedTcpReplyChannel(this.encoderFactory.Encoder, this.bufferManager, this.uri, dataSocket, this);
  45.         }
  46.         catch (ObjectDisposedException)
  47.         {
  48.             // socket closed
  49.             return null;
  50.         }
  51.     }
  52.  
  53.     protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
  54.     {
  55.         return new AcceptChannelAsyncResult(timeout, this, callback, state);
  56.     }
  57.  
  58.     protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
  59.     {
  60.         return AcceptChannelAsyncResult.End(result);
  61.     }
  62.  
  63.     protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
  64.     {
  65.         throw new NotSupportedException("No peeking support");
  66.     }
  67.  
  68.     protected override bool OnEndWaitForChannel(IAsyncResult result)
  69.     {
  70.         throw new NotSupportedException("No peeking support");
  71.     }
  72.  
  73.     protected override bool OnWaitForChannel(TimeSpan timeout)
  74.     {
  75.         throw new NotSupportedException("No peeking support");
  76.     }
  77.  
  78.     public override Uri Uri
  79.     {
  80.         get { return this.uri; }
  81.     }
  82.  
  83.     protected override void OnAbort()
  84.     {
  85.         this.CloseListenSocket(TimeSpan.Zero);
  86.     }
  87.  
  88.     protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  89.     {
  90.         this.CloseListenSocket(timeout);
  91.         return new CompletedAsyncResult(callback, state);
  92.     }
  93.  
  94.     protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  95.     {
  96.         this.OpenListenSocket();
  97.         return new CompletedAsyncResult(callback, state);
  98.     }
  99.  
  100.     protected override void OnClose(TimeSpan timeout)
  101.     {
  102.         this.CloseListenSocket(timeout);
  103.     }
  104.  
  105.     protected override void OnEndClose(IAsyncResult result)
  106.     {
  107.         CompletedAsyncResult.End(result);
  108.     }
  109.  
  110.     protected override void OnEndOpen(IAsyncResult result)
  111.     {
  112.         CompletedAsyncResult.End(result);
  113.     }
  114.  
  115.     protected override void OnOpen(TimeSpan timeout)
  116.     {
  117.         this.OpenListenSocket();
  118.     }
  119.  
  120.     void OpenListenSocket()
  121.     {
  122.         IPEndPoint localEndpoint = new IPEndPoint(IPAddress.Any, uri.Port);
  123.         this.listenSocket = new Socket(localEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  124.         this.listenSocket.Bind(localEndpoint);
  125.         this.listenSocket.Listen(10);
  126.     }
  127.  
  128.     void CloseListenSocket(TimeSpan timeout)
  129.     {
  130.         this.listenSocket.Close((int)timeout.TotalMilliseconds);
  131.     }
  132. }

Compared with other AsyncResult classes, the AcceptChannelAsyncResult class is actually simple. It doesn’t do anything complex, except dealing with ObjectDisposedException which can be raised if the socket is closed (which will happen if the listener is closed or aborted), and not return any channel at that point. As a recap, the constructor first calls the Begin operation on the socket. If the operation completed synchronously, the code will complete itself and store the result in the “channel” field.

  1. class AcceptChannelAsyncResult : AsyncResult
  2. {
  3.     TimeSpan timeout;
  4.     SizedTcpChannelListener listener;
  5.     IReplyChannel channel;
  6.  
  7.     public AcceptChannelAsyncResult(TimeSpan timeout, SizedTcpChannelListener listener, AsyncCallback callback, object state)
  8.         : base(callback, state)
  9.     {
  10.         this.timeout = timeout;
  11.         this.listener = listener;
  12.  
  13.         IAsyncResult acceptResult = listener.listenSocket.BeginAccept(OnAccept, this);
  14.         if (!acceptResult.CompletedSynchronously)
  15.         {
  16.             return;
  17.         }
  18.  
  19.         if (CompleteAccept(acceptResult))
  20.         {
  21.             base.Complete(true);
  22.         }
  23.     }
  24.  
  25.     bool CompleteAccept(IAsyncResult result)
  26.     {
  27.         try
  28.         {
  29.             Socket dataSocket = this.listener.listenSocket.EndAccept(result);
  30.             this.channel = new SizedTcpReplyChannel(
  31.                 this.listener.encoderFactory.Encoder,
  32.                 this.listener.bufferManager,
  33.                 this.listener.uri,
  34.                 dataSocket,
  35.                 this.listener);
  36.         }
  37.         catch (ObjectDisposedException)
  38.         {
  39.             this.channel = null;
  40.         }
  41.  
  42.         return true;
  43.     }
  44.  
  45.     static void OnAccept(IAsyncResult result)
  46.     {
  47.         if (result.CompletedSynchronously)
  48.         {
  49.             return;
  50.         }
  51.  
  52.         AcceptChannelAsyncResult thisPtr = (AcceptChannelAsyncResult)result.AsyncState;
  53.  
  54.         Exception completionException = null;
  55.         bool completeSelf = false;
  56.         try
  57.         {
  58.             completeSelf = thisPtr.CompleteAccept(result);
  59.         }
  60.         catch (Exception e)
  61.         {
  62.             completeSelf = true;
  63.             completionException = e;
  64.         }
  65.  
  66.         if (completeSelf)
  67.         {
  68.             thisPtr.Complete(false, completionException);
  69.         }
  70.     }
  71.  
  72.     public static IReplyChannel End(IAsyncResult result)
  73.     {
  74.         AcceptChannelAsyncResult thisPtr = AsyncResult.End<AcceptChannelAsyncResult>(result);
  75.         return thisPtr.channel;
  76.     }
  77. }

And we’re finally at the transport channel layer. IReplyChannel defines a local property which we can return based on that the listener passes to the constructor. InitializeSocket is defined on the base class, which sets the socket in that class to be used for socket-related operations. But in order to give the information about the remote endpoint to the caller, this class also stores the socket instance in a private member, so when the message is decoded, we can add the RemoteEndpointMessageProperty object to it.

  1. class SizedTcpReplyChannel : SizedTcpBaseChannel, IReplyChannel
  2. {
  3.     Uri localAddress;
  4.     Socket socket;
  5.  
  6.     public SizedTcpReplyChannel(MessageEncoder encoder, BufferManager bufferManager, Uri localAddress, Socket socket, ChannelManagerBase channelManager)
  7.         : base(encoder, bufferManager, channelManager)
  8.     {
  9.         this.localAddress = localAddress;
  10.         this.socket = socket;
  11.         this.InitializeSocket(socket);
  12.     }
  13.  
  14.     public EndpointAddress LocalAddress
  15.     {
  16.         get { return new EndpointAddress(this.localAddress); }
  17.     }
  18.  
  19.     protected override Message DecodeMessage(ArraySegment<byte> data)
  20.     {
  21.         Message result = base.DecodeMessage(data);
  22.         if (result != null)
  23.         {
  24.             result.Headers.To = this.localAddress;
  25.             IPEndPoint remoteEndpoint = (IPEndPoint)this.socket.RemoteEndPoint;
  26.             RemoteEndpointMessageProperty property = new RemoteEndpointMessageProperty(remoteEndpoint.Address.ToString(), remoteEndpoint.Port);
  27.             result.Properties.Add(RemoteEndpointMessageProperty.Name, property);
  28.         }
  29.  
  30.         return result;
  31.     }
  32. }

The synchronous operations are shown below. When a new message arrives (using the ReceiveMessage method from the base class), a new request context (to be shown later) is created and returned to the caller.

  1. public RequestContext ReceiveRequest(TimeSpan timeout)
  2. {
  3.     Message request = this.ReceiveMessage(timeout);
  4.     return new SizedTcpRequestContext(this, request, timeout);
  5. }
  6.  
  7. public RequestContext ReceiveRequest()
  8. {
  9.     return this.ReceiveRequest(this.DefaultReceiveTimeout);
  10. }
  11.  
  12. public bool TryReceiveRequest(TimeSpan timeout, out RequestContext context)
  13. {
  14.     try
  15.     {
  16.         context = this.ReceiveRequest(timeout);
  17.         return true;
  18.     }
  19.     catch (TimeoutException)
  20.     {
  21.         context = null;
  22.         return false;
  23.     }
  24. }

The asynchronous path uses new AsyncResult classes, mostly because we need to pass the timeout parameter from the Begin call to the request context (whch will only be available at the End call). I’ll skip those async result classes from this post, but they can be found at the code in the gallery.

  1. public IAsyncResult BeginReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
  2. {
  3.     return new ReceiveRequestAsyncResult(timeout, this, callback, state);
  4. }
  5.  
  6. public IAsyncResult BeginReceiveRequest(AsyncCallback callback, object state)
  7. {
  8.     return this.BeginReceiveRequest(this.DefaultReceiveTimeout, callback, state);
  9. }
  10.  
  11. public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
  12. {
  13.     return new TryReceiveRequestAsyncResult(timeout, this, callback, state);
  14. }
  15.  
  16. public RequestContext EndReceiveRequest(IAsyncResult result)
  17. {
  18.     return ReceiveRequestAsyncResult.End(result);
  19. }
  20.  
  21. public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
  22. {
  23.     return TryReceiveRequestAsyncResult.End(result, out context);
  24. }

And, to finish the channel, the request context class. The functionality is already implemented in the base channel, so the implementation of the SizedTcpRequestContext is fairly simple, merely delegating the calls to the reply channel, including the asynchronous operations.

  1. class SizedTcpRequestContext : RequestContext
  2. {
  3.     SizedTcpReplyChannel replyChannel;
  4.     Message requestMessage;
  5.     TimeSpan timeout;
  6.  
  7.     public SizedTcpRequestContext(SizedTcpReplyChannel replyChannel, Message requestMessage, TimeSpan timeout)
  8.     {
  9.         this.replyChannel = replyChannel;
  10.         this.requestMessage = requestMessage;
  11.         this.timeout = timeout;
  12.     }
  13.  
  14.     public override void Abort()
  15.     {
  16.         this.replyChannel.Abort();
  17.     }
  18.  
  19.     public override IAsyncResult BeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  20.     {
  21.         return this.replyChannel.BeginSendMessage(message, timeout, callback, state);
  22.     }
  23.  
  24.     public override IAsyncResult BeginReply(Message message, AsyncCallback callback, object state)
  25.     {
  26.         return this.replyChannel.BeginSendMessage(message, callback, state);
  27.     }
  28.  
  29.     public override void Close(TimeSpan timeout)
  30.     {
  31.         this.replyChannel.Close(timeout);
  32.     }
  33.  
  34.     public override void Close()
  35.     {
  36.         this.replyChannel.Close();
  37.     }
  38.  
  39.     public override void EndReply(IAsyncResult result)
  40.     {
  41.         this.replyChannel.EndSendMessage(result);
  42.     }
  43.  
  44.     public override void Reply(Message message, TimeSpan timeout)
  45.     {
  46.         this.replyChannel.SendMessage(message, timeout);
  47.     }
  48.  
  49.     public override void Reply(Message message)
  50.     {
  51.         this.replyChannel.SendMessage(message);
  52.     }
  53.  
  54.     public override Message RequestMessage
  55.     {
  56.         get { return this.requestMessage; }
  57.     }
  58. }

The reply channel is ready, so we’re ready to try it. Let’s use similar requests than the ones which were used by the request channel sample. At this point the service will simply echo those requests back, but we can try it and make sure that the channel is working properly.

  1. static void Main(string[] args)
  2. {
  3.     string baseAddress = "sized.tcp://" + Environment.MachineName + ":8000/";
  4.     ServiceHost host = new ServiceHost(typeof(Service), new Uri(baseAddress));
  5.     CustomBinding binding = new CustomBinding(
  6.         new ByteStreamMessageEncodingBindingElement(),
  7.         new SizedTcpTransportBindingElement());
  8.     host.AddServiceEndpoint(typeof(IUntypedTest), binding, "");
  9.     host.Open();
  10.     Console.WriteLine("Host opened");
  11.  
  12.     string[] requests = new string[]
  13.     {
  14.         "{\"method\":\"Add\",\"params\":[5, 8],\"id\":1}",
  15.         "{\"method\":\"Multiply\",\"params\":[5, 8],\"id\":2}",
  16.         "{\"method\":\"Divide\",\"params\":[5, 0],\"id\":3}",
  17.     };
  18.  
  19.     foreach (string request in requests)
  20.     {
  21.         Console.WriteLine("Request: {0}", request);
  22.         Socket socket = GetConnectedSocket(8000);
  23.         byte[] data = Encoding.UTF8.GetBytes(request);
  24.         byte[] toSend = new byte[data.Length + 4];
  25.         Formatting.SizeToBytes(data.Length, toSend, 0);
  26.         Array.Copy(data, 0, toSend, 4, data.Length);
  27.         socket.Send(toSend);
  28.         Console.WriteLine("Sent request to the server");
  29.         byte[] recvBuffer = new byte[1000];
  30.         int bytesReceived = socket.Receive(recvBuffer);
  31.         Console.WriteLine("Received {0} bytes", bytesReceived);
  32.         Debugging.PrintBytes(recvBuffer, bytesReceived);
  33.         socket.Close();
  34.     }
  35. }

And that’s it. But if we really want to make it useful, we need some service model support for that, so let’s add them now.

JSON-RPC Server – Service Model

The Message-based server was enough to show that the transport is working, but we really need to add the runtime pieces if we want to be able to make it user-friendly. We want to be able to write a service like the one below, where we’re dealing with “regular” CLR types.

  1. [ServiceContract]
  2. public interface ITypedTest
  3. {
  4.     [OperationContract]
  5.     int Add(int x, int y);
  6.     [OperationContract]
  7.     int Subtract(int x, int y);
  8.     [OperationContract]
  9.     int Multiply(int x, int y);
  10.     [OperationContract]
  11.     int Divide(int x, int y);
  12. }
  13.  
  14. public class Service : ITypedTest
  15. {
  16.     public int Add(int x, int y)
  17.     {
  18.         return x + y;
  19.     }
  20.  
  21.     public int Subtract(int x, int y)
  22.     {
  23.         return x - y;
  24.     }
  25.  
  26.     public int Multiply(int x, int y)
  27.     {
  28.         return x * y;
  29.     }
  30.  
  31.     public int Divide(int x, int y)
  32.     {
  33.         try
  34.         {
  35.             return x / y;
  36.         }
  37.         catch (DivideByZeroException e)
  38.         {
  39.             throw new ArgumentException("Divide by zero", e);
  40.         }
  41.     }
  42. }

Like the client, we first need a message formatter, more specifically an IDispatchMessageFormatter implementation. That’s the component which knows how to convert between the Message object and the typed CLR parameters for the operations at the server side. Like on the client side, we’ll also use the NewtonSoft Json.NET library, and the helper operations we used in that example to convert between message objects and the JObject instance which represents the JSON in the request / reply. For the code of the helpers, check the service model post on the request channel, or the code in the gallery.

  1. class JsonRpcMessageFormatter : IDispatchMessageFormatter
  2. {
  3.     private OperationDescription operation;
  4.  
  5.     public JsonRpcMessageFormatter(OperationDescription operation)
  6.     {
  7.         this.operation = operation;
  8.     }
  9.  
  10.     public void DeserializeRequest(Message message, object[] parameters)
  11.     {
  12.         JObject json = JsonRpcHelpers.DeserializeMessage(message);
  13.         JArray jsonParams = json[JsonRpcConstants.ParamsKey] as JArray;
  14.         foreach (MessagePartDescription part in this.operation.Messages[0].Body.Parts)
  15.         {
  16.             int index = part.Index;
  17.             if (jsonParams[index].Type != JTokenType.Null)
  18.             {
  19.                 parameters[index] = JsonConvert.DeserializeObject(
  20.                     jsonParams[index].ToString(),
  21.                     part.Type);
  22.             }
  23.         }
  24.     }
  25.  
  26.     public Message SerializeReply(MessageVersion messageVersion, object[] parameters, object result)
  27.     {
  28.         JObject json = new JObject();
  29.         json[JsonRpcConstants.ErrorKey] = null;
  30.         json[JsonRpcConstants.ResultKey] = JToken.FromObject(result);
  31.         return JsonRpcHelpers.SerializeMessage(json, null);
  32.     }
  33. }

Next, we need the correlation between the “id” member in the request and the reply. The operation doesn’t need to know any of the JSON-RPC details, so we’ll use a message inspector which gives us the correlation between request and reply. In AfterReceiveRequest we fetch the request id, and return it to be passed to the BeforeSendReply method. On this method we receive the id as the correlation state parameter, add it in the JSON, and encode the message with the id member.

  1. class JsonRpcMessageInspector : IDispatchMessageInspector
  2. {
  3.     public object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext)
  4.     {
  5.         JObject json = JsonRpcHelpers.GetJObjectPreservingMessage(ref request);
  6.         int requestId = json[JsonRpcConstants.IdKey].Value<int>();
  7.         return requestId;
  8.     }
  9.  
  10.     public void BeforeSendReply(ref Message reply, object correlationState)
  11.     {
  12.         JObject json = JsonRpcHelpers.GetJObjectPreservingMessage(ref reply);
  13.         json[JsonRpcConstants.IdKey] = (int)correlationState;
  14.         reply = JsonRpcHelpers.SerializeMessage(json, reply);
  15.     }
  16. }

Those classes had their counterpart in the client side, but at the server we need some additional help. First, when a request arrives we need to identify which operation will “serve” that request, and for that we need an operation selector, or more specifically an IDispatchOperationSelector. That’s likely the simplest of the implementations, as we simply retrieve the “method” member in the incoming request, and that is returned so that WCF knows which of the service operations to invoke.

  1. class JsonRpcOperationSelector : IDispatchOperationSelector
  2. {
  3.     public string SelectOperation(ref Message message)
  4.     {
  5.         JObject json = JsonRpcHelpers.GetJObjectPreservingMessage(ref message);
  6.         return json[JsonRpcConstants.MethodKey].Value<string>();
  7.     }
  8. }

That’s pretty much it for the “normal” case. But the JSON-RPC protocol also specifies how errors are returned, so it’s nice to have an error handler (implementing the IErrorHandler interface) which will convert exceptions thrown in the operation to the format expected by the protocol. In this implementation we’re first checking if the exception thrown is JsonRpcException (the custom exception type we introduced in the post about the request channel). That’s a nice way to give the user control over the result. For other exceptions, we map their type and message to the JSON which is returned in the “error” member of the response.

  1. class JsonRpcErrorHandler : IErrorHandler
  2. {
  3.     public bool HandleError(Exception error)
  4.     {
  5.         return true;
  6.     }
  7.  
  8.     public void ProvideFault(Exception error, MessageVersion version, ref Message fault)
  9.     {
  10.         JObject json = new JObject();
  11.         json.Add(JsonRpcConstants.ResultKey, null);
  12.         JsonRpcException jsonException = error as JsonRpcException;
  13.         if (jsonException != null)
  14.         {
  15.             json.Add(JsonRpcConstants.ErrorKey, jsonException.JsonException);
  16.         }
  17.         else
  18.         {
  19.             JObject exceptionJson = new JObject
  20.             {
  21.                 { "type", error.GetType().FullName },
  22.                 { "message", error.Message },
  23.             };
  24.             JObject temp = exceptionJson;
  25.             while (error.InnerException != null)
  26.             {
  27.                 error = error.InnerException;
  28.                 JObject innerJson = new JObject
  29.                 {
  30.                     { "type", error.GetType().FullName },
  31.                     { "message", error.Message },
  32.                 };
  33.                 temp["inner"] = innerJson;
  34.                 temp = innerJson;
  35.             }
  36.  
  37.             json.Add(JsonRpcConstants.ErrorKey, exceptionJson);
  38.         }
  39.  
  40.         fault = JsonRpcHelpers.SerializeMessage(json, fault);
  41.     }
  42. }

And, finally, to glue everything together, we’ll use the JsonRpcEndpointBehavior, implementing the functionality on the ApplyDispatchBehavior method. There’s just one more thing which the behavior does: it changes the contract filter in the endpoint dispatcher to map all the messages to this dispatcher – since we don’t use addressing in this transport, and we’re dealing with it ourselves (with the operation selector), we need to tell that to WCF.

  1. public void ApplyDispatchBehavior(ServiceEndpoint endpoint, EndpointDispatcher endpointDispatcher)
  2. {
  3.     endpointDispatcher.DispatchRuntime.MessageInspectors.Add(new JsonRpcMessageInspector());
  4.     endpointDispatcher.DispatchRuntime.OperationSelector = new JsonRpcOperationSelector();
  5.     endpointDispatcher.ChannelDispatcher.ErrorHandlers.Add(new JsonRpcErrorHandler());
  6.     endpointDispatcher.ContractFilter = new MatchAllMessageFilter();
  7.     foreach (OperationDescription operation in endpoint.Contract.Operations)
  8.     {
  9.         if (!JsonRpcHelpers.IsUntypedMessage(operation))
  10.         {
  11.             DispatchOperation dispatchOperation = endpointDispatcher.DispatchRuntime.Operations[operation.Name];
  12.             dispatchOperation.DeserializeRequest = true;
  13.             dispatchOperation.SerializeReply = true;
  14.             dispatchOperation.Formatter = new JsonRpcMessageFormatter(operation);
  15.         }
  16.     }
  17. }

Good to go now, we just need to update the test program to check that everything is in place.

  1. static void Main(string[] args)
  2. {
  3.     string baseAddress = "sized.tcp://" + Environment.MachineName + ":8000/";
  4.     ServiceHost host = new ServiceHost(typeof(Service), new Uri(baseAddress));
  5.     CustomBinding binding = new CustomBinding(
  6.         new ByteStreamMessageEncodingBindingElement(),
  7.         new SizedTcpTransportBindingElement());
  8.     //host.AddServiceEndpoint(typeof(IUntypedTest), binding, "");
  9.     host.AddServiceEndpoint(typeof(ITypedTest), binding, "").Behaviors.Add(new JsonRpcEndpointBehavior());
  10.     host.Open();
  11.     Console.WriteLine("Host opened");
  12.  
  13.     string[] requests = new string[]
  14.     {
  15.         "{\"method\":\"Add\",\"params\":[5, 8],\"id\":1}",
  16.         "{\"method\":\"Multiply\",\"params\":[5, 8],\"id\":2}",
  17.         "{\"method\":\"Divide\",\"params\":[5, 0],\"id\":3}",
  18.     };
  19.  
  20.     foreach (string request in requests)
  21.     {
  22.         Console.WriteLine("Request: {0}", request);
  23.         Socket socket = GetConnectedSocket(8000);
  24.         byte[] data = Encoding.UTF8.GetBytes(request);
  25.         byte[] toSend = new byte[data.Length + 4];
  26.         Formatting.SizeToBytes(data.Length, toSend, 0);
  27.         Array.Copy(data, 0, toSend, 4, data.Length);
  28.         socket.Send(toSend);
  29.         Console.WriteLine("Sent request to the server");
  30.         byte[] recvBuffer = new byte[1000];
  31.         int bytesReceived = socket.Receive(recvBuffer);
  32.         Console.WriteLine("Received {0} bytes", bytesReceived);
  33.         Debugging.PrintBytes(recvBuffer, bytesReceived);
  34.         socket.Close();
  35.     }
  36. }

And we’re done with the reply channel, including service model support. That means that we can now implement a service with a custom protocol using WCF.

Coming up

I’ll finish the section about transport channels (and the whole channel layer) with a quick post about duplex channels.

[Code in this post]

[Back to the index]