Custom WCF Streaming

Complete Source code is available at code.msdn.microsoft.com

Many distributed business applications work with huge number of database rows, transferring large number of record sets to multiple processes running on different machines. And most likely these large dataset are generated using complex and long running database queries. To improve data transfer performance for these type of business applications, we can use custom WCF streaming.

Here is a sample that shows how to implement custom WCF Streaming. This sample code streams database records to client.

Basic idea is : we will have two threads, one thread will execute the complex database query and another thread will stream database rows to the clients. So we will alter the database query such that it returns only 1000 rows at time. And modify the WCF service to stream these 1000 rows to client. While WCF service is streaming database rows to the client, at the same time on a different thread, WCF Service will run the database query again to get the next 1000 rows. This way as soon as the WCF Service finishes streaming rows to the client, the next set of rows are available to stream to the client

 

Regular WCF Architecture   Custom WCF Streaming Architecture
WCFStreaming_2a   WCFStreaming_1a

 

  1. WCF Client calling WCF service
  2. WCF Service executing database query
  3. Database returns dataset to WCF service
  4. WCF Service response
  5. Second database query executed by WCF service
  6. WCF Stream response

Lets start with the WCF Service Contract

 [ServiceContract]
 public interface IStockMarket
 {
     [OperationContract]
     Stream GetDBRowStream();
 }
  

Here we have one WCF Operation which returns Stream object.

Next, lets review the Service Contract implementation

 public class StockMarket : IStockMarket
{
    public Stream GetDBRowStream()
    {
        return new DBRowStream();
    }
}
  

The WCF Service is creating a new custom Stream object and returns it to the client.

Next, lets review the client code

 try
{
    Stream s = proxy.GetDBRowStream();

    IFormatter formatter = new BinaryFormatter();

    OrderModel m;
    while (s.CanRead)
    {
        m = formatter.Deserialize(s) as OrderModel;
        Console.Write(string.Format("order ID is {0}\r\n", m.ID));
    }
}
catch (System.Exception ex)
{
    Console.WriteLine(ex.Message);
}
  

The client calls GetDBRowStream() on the WCF proxy object and continuously read the stream object till stream object return false for CanRead()

Here, we are reading the content from the stream object. Using the BinaryFormatterwe are deserializing and converting into useable .NET Object of type OrderModel

Lets review the OrderModel class

 [Serializable]
public class OrderModel
{
    public int ID;
    public int ParameterOne;
    public int ParameterTwo;
    public int ParameterThree;
    public int Results;
}
  

OrderModel is nothing but a database row. So in the client code, it is reading one database row at a time

Now lets look at the DBRowStream object source code.

 public class DBRowStream : Stream
{
    bool bCanRead = true;
  
     MemoryStream memStream1 = new MemoryStream();
    MemoryStream memStream2 = new MemoryStream();
   
  IFormatter formatter = new BinaryFormatter();
  
 …..
 …..    

  

Note : This code is little bit complex as it is trying to return the stream object to the client and at the same time it is trying to executing the SQL query to get the next DataSet

This class has two memory stream objects so that it can write on the first memory stream object when reading data from the database and it can read from the second memory stream object when it is streaming data to WCF Client.

In the below image, “a” and “b” are the two memory stream objects. At the beginning we write to memory stream “a” while reading data from the database. Next, at step#4 we read data from memory stream “a” and write it to WCF client. At the same time on another thread, (step#5) we make another database query to get next set and write it to the memory stream “b”

 

WCFStreaming_memstream

Now lets look at how this class fills these two memory stream object : after writing the first 10 database rows to memory stream object “a” , it will write the next 10 database rows to memory stream object “b”. While writing to “b” it will read data from “a” and send it to client.

Binary formatter is used to serialize the database row.

 void DBThreadProc(object o)
{
    SqlConnection con = null;
    SqlCommand com = null;
     try
    {
        con = new System.Data.SqlClient.SqlConnection("Server=.\\SQLEXPRESS;Initial
                                   Catalog=OrderDB;Integrated Security=SSPI");

        com = new SqlCommand();
        com.Connection = con;
        com.CommandText = "Select top 1000 * from Orders ";

        con.Open();
        SqlDataReader sdr = com.ExecuteReader();

        int count = 0;

        MemoryStream memStream = memStream1;
        memStreamWriteStatus = 1;
        readyToWriteToMemStream1.WaitOne();
        while (sdr.Read())
        {
            OrderModel model = new OrderModel();

            model.ID = sdr.GetInt32(0);
            model.ParameterOne = sdr.GetInt32(1);
            model.ParameterTwo = sdr.GetInt32(2);
            model.ParameterThree = sdr.GetInt32(3);

            formatter.Serialize(memStream, model);
            
            count++;

            if (count > 10)
            {
                switch (memStreamWriteStatus)
                {
                    case 1: // done writing to 1
                        {
                            memStream1.Position = 0;
                            readyToSendFromMemStream1.Set();
                            Console.Write("write 1 is done...waiting for 2\r\n");
                            readyToWriteToMemStream2.WaitOne();
                            Console.Write("done waiting for 2\r\n");
                            memStream = memStream2;
                            memStream.Position = 0;
                            memStreamWriteStatus = 2;
                            
                            break;
                        }
                    case 2: // done writing to 2
                        {
                            memStream2.Position = 0;                                    
                            readyToSendFromMemStream2.Set();
                            Console.Write("write 2 is done...waiting for 1\r\n");
                            readyToWriteToMemStream1.WaitOne();
                            Console.Write("done waiting for 1\r\n");
                            memStream = memStream1;
                            memStreamWriteStatus = 1;
                            memStream.Position = 0;
                            
                            break;
                        }
                }
                
                count = 0;
            }

            
        }
        bCanRead = false;
    }
    catch (System.Exception excep)
    {

    }
}

  

Now lets see the how this class is streaming database rows to the WCF client.

This class overrides Read() function. This Read() is called by WCF framework to stream the data to the client.

When the Read() is called,  first it checks if there is any data left in the current memory stream object. If so, it will read the memory stream object and stream it to the client and return. When the Read() is called again, it will check which memory stream object “a” or “b” is available for reading and then reads data from that memory stream object and sends it to client.

After reading the memory stream object, it will mark it as “available for write”. The above database function, will now write to this memory stream object. Once writing to the memory stream object is completed, it will mark it as “available for read”. The below stream read function, will now read from this memory stream object.  

 MemoryStream memStream = null;
 public override int Read(byte[] buffer, int offset, int count)
 {
     if (memStream != null)
     {
         if (memStream.Position != memStream.Length)
         {
             return memStream.Read(buffer, offset, count);
         }
         else
         {
             switch (memStreamReadStatus)
             {
                 case 0:
                     {
                         throw new Exception();
                     }
                 case 1:
                     {
                         Console.Write("READ : done sending from 1\r\n");
                         readyToWriteToMemStream1.Set();
                         break;
                     }
                 case 2:
                     {
                         Console.Write("READ : done sending from 2\r\n");
                         readyToWriteToMemStream2.Set();
                         break;
                     }

             }
         }
     }

     switch (memStreamReadStatus)
     {
         case 0:
             {
                 Console.Write("READ : waiting for 1\r\n");

                 while (!readyToSendFromMemStream1.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 1\r\n");
                 memStream = memStream1;
                 memStreamReadStatus = 1;
                 break;
             }
         case 1:
             {
                 Console.Write("READ : waiting for 2\r\n");
                 //readyToSendFromMemStream2.WaitOne();
                 while (!readyToSendFromMemStream2.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 2\r\n");
                 memStream = memStream2;
                 memStreamReadStatus = 2;
                 break;
             }
         case 2:
             {
                 Console.Write("READ : waiting for 1\r\n");
                 //readyToSendFromMemStream1.WaitOne();
                 while (!readyToSendFromMemStream1.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 1\r\n");
                 memStream = memStream1;
                 memStreamReadStatus = 1;
                 break;
             }
             
     }
     return memStream.Read(buffer, offset, count);
 }

  

Where is the end ?

Remember the client code: client will read from the stream object until the CanRead() function return false. At the server side, we set this value in this below function

 public override bool CanRead
{
    get { return bCanRead; }
}

  

 

Complete Source Code

Complete Source code is available at code.msdn.microsoft.com