Building a Custom File Transport, Part 5: Channel Basics

I've pulled out the actual mechanics of the file transport into a separate class in this example. Both the client and server sides of the channel are going to use this code. There's more than the average amount of code in this class so I've created a few parts. The first part has all of the supporting methods in the class. The second part has the methods for buffered transfers. The third part has the methods for streamed transfers. Streamed transfers in this example are a bit contrived because I didn't want the code for handling IO to overwhelm the code for the channel. I'll talk a little more about that with part three.

 using System;
using System.IO;
using System.ServiceModel;
using System.ServiceModel.Channels;

namespace FileTransport
{
   abstract class FileChannelBase : ChannelBase
   {
      const int MaxBufferSize = 64 * 1024;
      const int MaxSizeOfHeaders = 4 * 1024;

      readonly EndpointAddress address;
      readonly BufferManager bufferManager;
      readonly MessageEncoder encoder;
      readonly long maxReceivedMessageSize;
      readonly bool streamed;

      public FileChannelBase(BufferManager bufferManager, MessageEncoderFactory encoderFactory, EndpointAddress address, ChannelManagerBase parent,
         bool streamed, long maxReceivedMessageSize)
         : base(parent)
      {
         this.address = address;
         this.bufferManager = bufferManager;
         this.encoder = encoderFactory.CreateSessionEncoder();
         this.maxReceivedMessageSize = maxReceivedMessageSize;
         this.streamed = streamed;
      }

      protected Message ReadMessage(string path)
      {
         if (this.streamed)
         {
            return StreamedReadMessage(path);
         }
         return BufferedReadMessage(path);
      }

      protected void WriteMessage(string path, Message message)
      {
         if (this.streamed)
         {
            StreamedWriteMessage(path, message);
         }
         else
         {
            BufferedWriteMessage(path, message);
         }
      }

      public EndpointAddress RemoteAddress
      {
         get { return this.address; }
      }

      protected static Exception ConvertException(Exception exception)
      {
         Type exceptionType = exception.GetType();
         if (exceptionType == typeof(System.IO.DirectoryNotFoundException) ||
             exceptionType == typeof(System.IO.FileNotFoundException) ||
             exceptionType == typeof(System.IO.PathTooLongException))
         {
            return new EndpointNotFoundException(exception.Message, exception);
         }
         return new CommunicationException(exception.Message, exception);
      }

      protected static string PathToFile(Uri path, String name)
      {
         UriBuilder address = new UriBuilder(path);
         address.Scheme = "file";
         address.Path = Path.Combine(path.AbsolutePath, name);
         return address.Uri.AbsolutePath;
      }
   }
}

The first half of this block sets up the options for the channel. We'll support a Streamed option for controlling the transfer mode and quotas for MaxReceivedMessageSize and MaxBufferPoolSize. You'd want to use the TransferMode enumeration if you were really doing this kind of streaming control, but this is trying to keep the example as simple as possible. The second half of this block defines some helper methods. It's bad to throw exceptions that don't descend from CommunicationException. The exception to this exception rule is design time exceptions, terminally fatal exceptions, or where you're indicating that there's a bug in the calling code.

 Message BufferedReadMessage(string path)
{
   byte[] data;
   long bytesTotal;
   try
   {
      using (FileStream stream = new FileStream(path, FileMode.Open))
      {
         bytesTotal = stream.Length;
         if (bytesTotal > int.MaxValue)
         {
            throw new CommunicationException(
               String.Format("Message of size {0} bytes is too large to buffer. Use a streamed transfer instead.", bytesTotal)
            );
         }
         if (bytesTotal > this.maxReceivedMessageSize)
         {
            throw new CommunicationException(String.Format("Message exceeds maximum size: {0} > {1}.", bytesTotal, maxReceivedMessageSize));
         }
         data = this.bufferManager.TakeBuffer((int)bytesTotal);
         int bytesRead = 0;
         while (bytesRead < bytesTotal)
         {
            int count = stream.Read(data, bytesRead, (int)bytesTotal - bytesRead);
            if (count == 0)
            {
               throw new CommunicationException(String.Format("Unexpected end of message after {0} of {1} bytes.", bytesRead, bytesTotal));
            }
            bytesRead += count;
         }
      }
   }
   catch (IOException exception)
   {
      throw ConvertException(exception);
   }
   ArraySegment<byte> buffer = new ArraySegment<byte>(data, 0, (int)bytesTotal);
   return this.encoder.ReadMessage(buffer, this.bufferManager);
}

void BufferedWriteMessage(string path, Message message)
{
   ArraySegment<byte> buffer;
   using (message)
   {
      this.address.ApplyTo(message);
      buffer = this.encoder.WriteMessage(message, MaxBufferSize, this.bufferManager);
   }
   try
   {
      using (FileStream stream = new FileStream(path, FileMode.Create))
      {
         stream.Write(buffer.Array, buffer.Offset, buffer.Count);
      }
   }
   catch (IOException exception)
   {
      throw ConvertException(exception);
   }
}

Buffered transfers show off the use of our BufferManager class to pass data around. The buffer manager takes care of sharing and allocating byte array buffers, which sometimes have noticeable collection costs when there's a lot of traffic. If you reuse buffers, I would recommend using our BufferManager class rather than writing your own. Manual addressing is ignored for the file transport. It really doesn't make sense to support specifying a custom address because the destination of the message is an inherent part of the channel setup.

 Message StreamedReadMessage(string path)
{
   try
   {
      Stream stream = File.Open(path, FileMode.Open);
      long bytesTotal = stream.Length;
      if (bytesTotal > maxReceivedMessageSize)
      {
         throw new CommunicationException(String.Format("Message exceeds maximum size: {0} > {1}.", bytesTotal, maxReceivedMessageSize));
      }
      return this.encoder.ReadMessage(stream, MaxSizeOfHeaders);
   }
   catch (IOException exception)
   {
      throw ConvertException(exception);
   }
}

void StreamedWriteMessage(string path, Message message)
{
   using (message)
   {
      this.address.ApplyTo(message);
      try
      {
         using (Stream stream = File.Open(path, FileMode.Create))
         {
            this.encoder.WriteMessage(message, stream);
         }
      }
      catch (IOException exception)
      {
         throw ConvertException(exception);
      }
   }
}

Streamed transfers are simpler for two reasons in this example. The first reason is that the native file APIs are stream-oriented rather than byte-array oriented making the coupling with WCF easier. The second reason is that this is an extremely cheesy stream implementation. We're taking advantage of a lot of implicit buffering, especially the buffering of the file by the file system. To make this a real streaming example, here's what we'd need to do:

  1. Add some framing so that we can tell whether the stream is expected to continue or end after we pull each chunk of data.
  2. Create a message class to wrap the file stream so that the reader is pulling directly off of the file. Note that the reader will block if the file stream runs out of data but the framing indicates that more data is expected in the future.
  3. Handle the overlapped reads and writes going on in the file system to keep our view of the stream consistent.

This would have made the simple example much more complicated without really adding anything to the discussion about channels.

Next time: Networking with NATs and Firewalls