Building A Custom Message Encoder to Record Throughput, Part 1

Now that we've seen how to write a custom transport, I thought I'd tackle another common request, which is to write a custom message encoder. A message encoder serializes an instance of the Message class into bytes. This is generally a much simpler operation than actually sending those bytes to someone, so we'll be able to get through the message encoder sample much faster than the transport sample.

I didn't actually want to come up with my own encoder since that would take a lot of work without showing you anything reusable. Instead, I decided to build a message encoder class that wraps an existing encoder and counts the number of bytes read and written while encoding. If you know how long your operation took, you can then figure out your throughput regardless of the transport you're using. This isn't necessarily going to be the exact figure of bytes across the network because most transports add framing and protocol information to the byte stream during transmission. However, this does tell you how many message bytes are being transmitted.

To do things in the opposite way as the last example, I'll build this encoder from the inside out. The piece that actually does the work is an instance of MessageEncoder, so let's build one of those.

 using System;
using System.IO;
using System.ServiceModel.Channels;

namespace CountingEncoder
{
   class CountingEncoder : MessageEncoder
   {
      readonly CountingEncoderBindingElement bindingElement;
      readonly MessageEncoder innerEncoder;

      public CountingEncoder(CountingEncoderBindingElement bindingElement, MessageEncoder innerEncoder)
      {
         this.bindingElement = bindingElement;
         this.innerEncoder = innerEncoder;
      }

      public override string ContentType
      {
         get { return this.innerEncoder.ContentType; }
      }

      public override string MediaType
      {
         get { return this.innerEncoder.MediaType; }
      }

      public override MessageVersion MessageVersion
      {
         get { return this.innerEncoder.MessageVersion; }
      }

      public override Message ReadMessage(ArraySegment<byte> buffer, BufferManager bufferManager)
      {
         Message message = this.innerEncoder.ReadMessage(buffer, bufferManager);
         bindingElement.ReadCount++;
         bindingElement.ReadBytes += buffer.Count;
         return message;
      }

      public override Message ReadMessage(Stream stream, int maxSizeOfHeaders)
      {
         return this.innerEncoder.ReadMessage(new CountingStream(this.bindingElement, stream), maxSizeOfHeaders);
      }

      public override ArraySegment<byte> WriteMessage(Message message, int maxMessageSize, BufferManager bufferManager, int messageOffset)
      {
         ArraySegment<byte> buffer = innerEncoder.WriteMessage(message, maxMessageSize, bufferManager, messageOffset);
         bindingElement.WriteCount++;
         bindingElement.WriteBytes += buffer.Count;
         return buffer;
      }

      public override void WriteMessage(Message message, Stream stream)
      {
         innerEncoder.WriteMessage(message, new CountingStream(this.bindingElement, stream));
      }
   }
}

All I've done here is wrap the methods of another message encoder and pass some counts up to the binding element for centralized record keeping. I created a custom stream class so that we can measure exactly how many operations and bytes are used while streaming, instead of just guessing based on the size of the stream.

 using System.IO;

namespace CountingEncoder
{
   class CountingStream : Stream
   {
      readonly CountingEncoderBindingElement bindingElement;
      readonly Stream innerStream;

      public CountingStream(CountingEncoderBindingElement bindingElement, Stream innerStream)
      {
         this.bindingElement = bindingElement;
         this.innerStream = innerStream;
      }

      public override bool CanRead
      {
         get { return this.innerStream.CanRead; }
      }

      public override bool CanSeek
      {
         get { return this.innerStream.CanSeek; }
      }

      public override bool CanWrite
      {
         get { return this.innerStream.CanWrite; }
      }

      public override void Close()
      {
         this.innerStream.Close();
      }

      public override void Flush()
      {
         this.innerStream.Flush();
      }

      public override long Length
      {
         get { return this.innerStream.Length; }
      }

      public override long Position
      {
         get { return this.innerStream.Position; }
         set { this.innerStream.Position = value; }
      }

      public override int Read(byte[] buffer, int offset, int count)
      {
         int bytesRead = this.innerStream.Read(buffer, offset, count);
         this.bindingElement.ReadCount++;
         this.bindingElement.ReadBytes += bytesRead;
         return bytesRead;
      }

      public override long Seek(long offset, SeekOrigin origin)
      {
         return this.innerStream.Seek(offset, origin);
      }

      public override void SetLength(long value)
      {
         this.innerStream.SetLength(value);
      }

      public override void Write(byte[] buffer, int offset, int count)
      {
         this.innerStream.Write(buffer, offset, count);
         this.bindingElement.WriteCount++;
         this.bindingElement.WriteBytes += count;
      }
   }
}

Like the message encoder, the stream simply wraps an inner stream that is doing all the work and passes the counts up to the binding element.

Next time: Building A Custom Message Encoder to Record Throughput, Part 2