Custom WCF Streaming


Complete Source code is available at code.msdn.microsoft.com

Many distributed business applications work with huge number of database rows, transferring large number of record sets to multiple processes running on different machines. And most likely these large dataset are generated using complex and long running database queries. To improve data transfer performance for these type of business applications, we can use custom WCF streaming.

Here is a sample that shows how to implement custom WCF Streaming. This sample code streams database records to client.

Basic idea is : we will have two threads, one thread will execute the complex database query and another thread will stream database rows to the clients. So we will alter the database query such that it returns only 1000 rows at time. And modify the WCF service to stream these 1000 rows to client. While WCF service is streaming database rows to the client, at the same time on a different thread, WCF Service will run the database query again to get the next 1000 rows. This way as soon as the WCF Service finishes streaming rows to the client, the next set of rows are available to stream to the client

 

Regular WCF Architecture   Custom WCF Streaming Architecture
WCFStreaming_2a   WCFStreaming_1a

 

  1. WCF Client calling WCF service
  2. WCF Service executing database query
  3. Database returns dataset to WCF service
  4. WCF Service response
  5. Second database query executed by WCF service
  6. WCF Stream response

Lets start with the WCF Service Contract

[ServiceContract]
 public interface IStockMarket
 {
     [OperationContract]
     Stream GetDBRowStream();
 }
  

Here we have one WCF Operation which returns Stream object.

Next, lets review the Service Contract implementation

public class StockMarket : IStockMarket
{
    public Stream GetDBRowStream()
    {
        return new DBRowStream();
    }
}
  

The WCF Service is creating a new custom Stream object and returns it to the client.

Next, lets review the client code

try
{
    Stream s = proxy.GetDBRowStream();

    IFormatter formatter = new BinaryFormatter();

    OrderModel m;
    while (s.CanRead)
    {
        m = formatter.Deserialize(s) as OrderModel;
        Console.Write(string.Format("order ID is {0}\r\n", m.ID));
    }
}
catch (System.Exception ex)
{
    Console.WriteLine(ex.Message);
}
  

The client calls GetDBRowStream() on the WCF proxy object and continuously read the stream object till stream object return false for CanRead()

Here, we are reading the content from the stream object. Using the BinaryFormatterwe are deserializing and converting into useable .NET Object of type OrderModel

Lets review the OrderModel class

[Serializable]
public class OrderModel
{
    public int ID;
    public int ParameterOne;
    public int ParameterTwo;
    public int ParameterThree;
    public int Results;
}
  

OrderModel is nothing but a database row. So in the client code, it is reading one database row at a time

Now lets look at the DBRowStream object source code.

public class DBRowStream : Stream
{
    bool bCanRead = true;
  
    MemoryStream memStream1 = new MemoryStream();
    MemoryStream memStream2 = new MemoryStream();
   
 IFormatter formatter = new BinaryFormatter();
  
…..
…..    

  

Note : This code is little bit complex as it is trying to return the stream object to the client and at the same time it is trying to executing the SQL query to get the next DataSet

This class has two memory stream objects so that it can write on the first memory stream object when reading data from the database and it can read from the second memory stream object when it is streaming data to WCF Client.

In the below image, “a” and “b” are the two memory stream objects. At the beginning we write to memory stream “a” while reading data from the database. Next, at step#4 we read data from memory stream “a” and write it to WCF client. At the same time on another thread, (step#5) we make another database query to get next set and write it to the memory stream “b”

 

WCFStreaming_memstream

Now lets look at how this class fills these two memory stream object : after writing the first 10 database rows to memory stream object “a” , it will write the next 10 database rows to memory stream object “b”. While writing to “b” it will read data from “a” and send it to client.

Binary formatter is used to serialize the database row.

void DBThreadProc(object o)
{
    SqlConnection con = null;
    SqlCommand com = null;
    try
    {
        con = new System.Data.SqlClient.SqlConnection("Server=.\\SQLEXPRESS;Initial
                                  Catalog=OrderDB;Integrated Security=SSPI");

        com = new SqlCommand();
        com.Connection = con;
        com.CommandText = "Select top 1000 * from Orders ";

        con.Open();
        SqlDataReader sdr = com.ExecuteReader();

        int count = 0;

        MemoryStream memStream = memStream1;
        memStreamWriteStatus = 1;
        readyToWriteToMemStream1.WaitOne();
        while (sdr.Read())
        {
            OrderModel model = new OrderModel();

            model.ID = sdr.GetInt32(0);
            model.ParameterOne = sdr.GetInt32(1);
            model.ParameterTwo = sdr.GetInt32(2);
            model.ParameterThree = sdr.GetInt32(3);

            formatter.Serialize(memStream, model);
            
            count++;

            if (count > 10)
            {
                switch (memStreamWriteStatus)
                {
                    case 1: // done writing to 1
                        {
                            memStream1.Position = 0;
                            readyToSendFromMemStream1.Set();
                            Console.Write("write 1 is done...waiting for 2\r\n");
                            readyToWriteToMemStream2.WaitOne();
                            Console.Write("done waiting for 2\r\n");
                            memStream = memStream2;
                            memStream.Position = 0;
                            memStreamWriteStatus = 2;
                            
                            break;
                        }
                    case 2: // done writing to 2
                        {
                            memStream2.Position = 0;                                    
                            readyToSendFromMemStream2.Set();
                            Console.Write("write 2 is done...waiting for 1\r\n");
                            readyToWriteToMemStream1.WaitOne();
                            Console.Write("done waiting for 1\r\n");
                            memStream = memStream1;
                            memStreamWriteStatus = 1;
                            memStream.Position = 0;
                            
                            break;
                        }
                }
                
                count = 0;
            }

            
        }
        bCanRead = false;
    }
    catch (System.Exception excep)
    {

    }
}

  

Now lets see the how this class is streaming database rows to the WCF client.

This class overrides Read() function. This Read() is called by WCF framework to stream the data to the client.

When the Read() is called,  first it checks if there is any data left in the current memory stream object. If so, it will read the memory stream object and stream it to the client and return. When the Read() is called again, it will check which memory stream object “a” or “b” is available for reading and then reads data from that memory stream object and sends it to client.

After reading the memory stream object, it will mark it as “available for write”. The above database function, will now write to this memory stream object. Once writing to the memory stream object is completed, it will mark it as “available for read”. The below stream read function, will now read from this memory stream object.  

MemoryStream memStream = null;
 public override int Read(byte[] buffer, int offset, int count)
 {
     if (memStream != null)
     {
         if (memStream.Position != memStream.Length)
         {
             return memStream.Read(buffer, offset, count);
         }
         else
         {
             switch (memStreamReadStatus)
             {
                 case 0:
                     {
                         throw new Exception();
                     }
                 case 1:
                     {
                         Console.Write("READ : done sending from 1\r\n");
                         readyToWriteToMemStream1.Set();
                         break;
                     }
                 case 2:
                     {
                         Console.Write("READ : done sending from 2\r\n");
                         readyToWriteToMemStream2.Set();
                         break;
                     }

             }
         }
     }

     switch (memStreamReadStatus)
     {
         case 0:
             {
                 Console.Write("READ : waiting for 1\r\n");

                 while (!readyToSendFromMemStream1.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 1\r\n");
                 memStream = memStream1;
                 memStreamReadStatus = 1;
                 break;
             }
         case 1:
             {
                 Console.Write("READ : waiting for 2\r\n");
                 //readyToSendFromMemStream2.WaitOne();
                 while (!readyToSendFromMemStream2.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 2\r\n");
                 memStream = memStream2;
                 memStreamReadStatus = 2;
                 break;
             }
         case 2:
             {
                 Console.Write("READ : waiting for 1\r\n");
                 //readyToSendFromMemStream1.WaitOne();
                 while (!readyToSendFromMemStream1.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 1\r\n");
                 memStream = memStream1;
                 memStreamReadStatus = 1;
                 break;
             }
             
     }
     return memStream.Read(buffer, offset, count);
 }

  

Where is the end ?

Remember the client code: client will read from the stream object until the CanRead() function return false. At the server side, we set this value in this below function

public override bool CanRead
{
    get { return bCanRead; }
}

  

 

Complete Source Code

Complete Source code is available at code.msdn.microsoft.com

Comments (9)

  1. Really interesting article. Out of curiousity, each time I run it I get an error on the last element or two stating that the end of the stream was encountered before parsing was complete. I thought I possibly just needed to resposition the position of the stream to the first element, but I'm still receiving the error. Any ideas?

    Great article!

    Ryan

  2. Sorry for the delay.

    Thank you for downloading the sample. I just fixed the bug and uploaded it to the code.msdn.microsoft.com site. Due this bug, it was not pushing the last set of records to the client.

  3. No problem at all on the delay. I actually found that same issue of missing the last record when writing to the stream and fixed it as well. I've noticed that you're code and mine still ends up with an exception after the last row. It effectively never recognizes the change to the stream.CanRead property, tries to read another set of bytes, comes up with none, and my serializer throws an exception. For a test I simply set canread to false about half way through my dataset, and the client never sees the property changing. Any idea's why the client would never recognize the property CanRead as being false?

    By the way, I implemented this with a wpf based application we have using a dispatcher and background worker to keep the form responsive and it works really really well.

  4. Deserialization Issue says:

    I nearly have everything working, but I run in to this odd deserialization error everytime i run it, and I'm wondering if you've seen it. I basically make it say 100 rows through a 1000 row stream and it throws this consistently:

    The input stream is not a valid binary format. The starting contents (in bytes) are: 65-6E-74-69-76-65-20-4D-61-69-6E-74-65-6E-61-6E-63 …"} System.Exception {System.Runtime.Serialization.SerializationException}

    Do I need to enable a special encoding before I write and read from the stream or something similar to that? Any help would be appreciated,

    Ryan

  5. Issue narrowed down says:

    So I've narrowed down the issue, but unsure why it's happening. I slightly altered your last project to contain a set of basic string values. If you include the strings on line 179 of dbrowstream it fails after 30 rows, if you exclude them it works beautifully. You can have a look at the code I altered here:

    skydrive.live.com/redir.aspx

    Any help would be greatly appreciated.

  6. it looks like something to do with the string length.

    If I replace line #179 with this below code it works.

    model.AMP =  new string('a', 40 );  

    And if replace the line with this below code also it works

    model.AMP = stringCache[i – 1];

                       if (model.AMP.Length != 40)

                       {

                           model.AMP += new string(' ', 40 – model.AMP.Length);

                       }

    Let me some more research and get back to you

  7. Memory Stream Thread Issues says:

    It's the memory stream. I implemented a basic thread safe memory stream found on code project, here:

    http://www.codeproject.com/…/PipeStream-a-Memory-Efficient-and-Thread-Safe-Stre

    and the issues went away. I'm going to do some additional research into thread safe memory streams because this implementation is much slower than a straight memory stream, but that seems to be the issue. If you have any suggestions on other stream types that might do the trick let me know.

    Regards,

    Ryan

  8. Host gets out of memory with a huge set of data records says:

    Ran this utility and with the existing code in it, the host service could process the records seamlessly if they are in limited to 10-20 thousand. Tried for a 0.1 million record scenario and the service ran out of memory.

    Any suggestions for handling this scenario?

    Regards,

    Sanjay

  9. SanjayKabra says:

    Really helpful article. We tried the given utility and found an issue with processing of huge number of records in our case around half a million records. For few thousands this utility works fine. Any suggestions for handling huge records?

    Regards,

    Sanjay

Skip to main content