Udostępnij za pośrednictwem


Custom WCF Streaming

Complete Source code is available at code.msdn.microsoft.com

Many distributed business applications work with huge number of database rows, transferring large number of record sets to multiple processes running on different machines. And most likely these large dataset are generated using complex and long running database queries. To improve data transfer performance for these type of business applications, we can use custom WCF streaming.

Here is a sample that shows how to implement custom WCF Streaming. This sample code streams database records to client.

Basic idea is : we will have two threads, one thread will execute the complex database query and another thread will stream database rows to the clients. So we will alter the database query such that it returns only 1000 rows at time. And modify the WCF service to stream these 1000 rows to client. While WCF service is streaming database rows to the client, at the same time on a different thread, WCF Service will run the database query again to get the next 1000 rows. This way as soon as the WCF Service finishes streaming rows to the client, the next set of rows are available to stream to the client

 

Regular WCF Architecture   Custom WCF Streaming Architecture
WCFStreaming_2a   WCFStreaming_1a

 

  1. WCF Client calling WCF service
  2. WCF Service executing database query
  3. Database returns dataset to WCF service
  4. WCF Service response
  5. Second database query executed by WCF service
  6. WCF Stream response

Lets start with the WCF Service Contract

 [ServiceContract]
 public interface IStockMarket
 {
     [OperationContract]
     Stream GetDBRowStream();
 }
  

Here we have one WCF Operation which returns Stream object.

Next, lets review the Service Contract implementation

 public class StockMarket : IStockMarket
{
    public Stream GetDBRowStream()
    {
        return new DBRowStream();
    }
}
  

The WCF Service is creating a new custom Stream object and returns it to the client.

Next, lets review the client code

 try
{
    Stream s = proxy.GetDBRowStream();

    IFormatter formatter = new BinaryFormatter();

    OrderModel m;
    while (s.CanRead)
    {
        m = formatter.Deserialize(s) as OrderModel;
        Console.Write(string.Format("order ID is {0}\r\n", m.ID));
    }
}
catch (System.Exception ex)
{
    Console.WriteLine(ex.Message);
}
  

The client calls GetDBRowStream() on the WCF proxy object and continuously read the stream object till stream object return false for CanRead()

Here, we are reading the content from the stream object. Using the BinaryFormatterwe are deserializing and converting into useable .NET Object of type OrderModel

Lets review the OrderModel class

 [Serializable]
public class OrderModel
{
    public int ID;
    public int ParameterOne;
    public int ParameterTwo;
    public int ParameterThree;
    public int Results;
}
  

OrderModel is nothing but a database row. So in the client code, it is reading one database row at a time

Now lets look at the DBRowStream object source code.

 public class DBRowStream : Stream
{
    bool bCanRead = true;
  
     MemoryStream memStream1 = new MemoryStream();
    MemoryStream memStream2 = new MemoryStream();
   
  IFormatter formatter = new BinaryFormatter();
  
 …..
 …..    

  

Note : This code is little bit complex as it is trying to return the stream object to the client and at the same time it is trying to executing the SQL query to get the next DataSet

This class has two memory stream objects so that it can write on the first memory stream object when reading data from the database and it can read from the second memory stream object when it is streaming data to WCF Client.

In the below image, “a” and “b” are the two memory stream objects. At the beginning we write to memory stream “a” while reading data from the database. Next, at step#4 we read data from memory stream “a” and write it to WCF client. At the same time on another thread, (step#5) we make another database query to get next set and write it to the memory stream “b”

 

WCFStreaming_memstream

Now lets look at how this class fills these two memory stream object : after writing the first 10 database rows to memory stream object “a” , it will write the next 10 database rows to memory stream object “b”. While writing to “b” it will read data from “a” and send it to client.

Binary formatter is used to serialize the database row.

 void DBThreadProc(object o)
{
    SqlConnection con = null;
    SqlCommand com = null;
     try
    {
        con = new System.Data.SqlClient.SqlConnection("Server=.\\SQLEXPRESS;Initial
                                   Catalog=OrderDB;Integrated Security=SSPI");

        com = new SqlCommand();
        com.Connection = con;
        com.CommandText = "Select top 1000 * from Orders ";

        con.Open();
        SqlDataReader sdr = com.ExecuteReader();

        int count = 0;

        MemoryStream memStream = memStream1;
        memStreamWriteStatus = 1;
        readyToWriteToMemStream1.WaitOne();
        while (sdr.Read())
        {
            OrderModel model = new OrderModel();

            model.ID = sdr.GetInt32(0);
            model.ParameterOne = sdr.GetInt32(1);
            model.ParameterTwo = sdr.GetInt32(2);
            model.ParameterThree = sdr.GetInt32(3);

            formatter.Serialize(memStream, model);
            
            count++;

            if (count > 10)
            {
                switch (memStreamWriteStatus)
                {
                    case 1: // done writing to 1
                        {
                            memStream1.Position = 0;
                            readyToSendFromMemStream1.Set();
                            Console.Write("write 1 is done...waiting for 2\r\n");
                            readyToWriteToMemStream2.WaitOne();
                            Console.Write("done waiting for 2\r\n");
                            memStream = memStream2;
                            memStream.Position = 0;
                            memStreamWriteStatus = 2;
                            
                            break;
                        }
                    case 2: // done writing to 2
                        {
                            memStream2.Position = 0;                                    
                            readyToSendFromMemStream2.Set();
                            Console.Write("write 2 is done...waiting for 1\r\n");
                            readyToWriteToMemStream1.WaitOne();
                            Console.Write("done waiting for 1\r\n");
                            memStream = memStream1;
                            memStreamWriteStatus = 1;
                            memStream.Position = 0;
                            
                            break;
                        }
                }
                
                count = 0;
            }

            
        }
        bCanRead = false;
    }
    catch (System.Exception excep)
    {

    }
}

  

Now lets see the how this class is streaming database rows to the WCF client.

This class overrides Read() function. This Read() is called by WCF framework to stream the data to the client.

When the Read() is called,  first it checks if there is any data left in the current memory stream object. If so, it will read the memory stream object and stream it to the client and return. When the Read() is called again, it will check which memory stream object “a” or “b” is available for reading and then reads data from that memory stream object and sends it to client.

After reading the memory stream object, it will mark it as “available for write”. The above database function, will now write to this memory stream object. Once writing to the memory stream object is completed, it will mark it as “available for read”. The below stream read function, will now read from this memory stream object.  

 MemoryStream memStream = null;
 public override int Read(byte[] buffer, int offset, int count)
 {
     if (memStream != null)
     {
         if (memStream.Position != memStream.Length)
         {
             return memStream.Read(buffer, offset, count);
         }
         else
         {
             switch (memStreamReadStatus)
             {
                 case 0:
                     {
                         throw new Exception();
                     }
                 case 1:
                     {
                         Console.Write("READ : done sending from 1\r\n");
                         readyToWriteToMemStream1.Set();
                         break;
                     }
                 case 2:
                     {
                         Console.Write("READ : done sending from 2\r\n");
                         readyToWriteToMemStream2.Set();
                         break;
                     }

             }
         }
     }

     switch (memStreamReadStatus)
     {
         case 0:
             {
                 Console.Write("READ : waiting for 1\r\n");

                 while (!readyToSendFromMemStream1.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 1\r\n");
                 memStream = memStream1;
                 memStreamReadStatus = 1;
                 break;
             }
         case 1:
             {
                 Console.Write("READ : waiting for 2\r\n");
                 //readyToSendFromMemStream2.WaitOne();
                 while (!readyToSendFromMemStream2.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 2\r\n");
                 memStream = memStream2;
                 memStreamReadStatus = 2;
                 break;
             }
         case 2:
             {
                 Console.Write("READ : waiting for 1\r\n");
                 //readyToSendFromMemStream1.WaitOne();
                 while (!readyToSendFromMemStream1.WaitOne(1000))
                 {
                     if (!bCanRead)
                     {
                         buffer[offset] = 0;
                         return 0;
                     }
                 }
                 Console.Write("READ : done waiting for 1\r\n");
                 memStream = memStream1;
                 memStreamReadStatus = 1;
                 break;
             }
             
     }
     return memStream.Read(buffer, offset, count);
 }

  

Where is the end ?

Remember the client code: client will read from the stream object until the CanRead() function return false. At the server side, we set this value in this below function

 public override bool CanRead
{
    get { return bCanRead; }
}

  

 

Complete Source Code

Complete Source code is available at code.msdn.microsoft.com

Comments

  • Anonymous
    October 16, 2012
    Really interesting article. Out of curiousity, each time I run it I get an error on the last element or two stating that the end of the stream was encountered before parsing was complete. I thought I possibly just needed to resposition the position of the stream to the first element, but I'm still receiving the error. Any ideas? Great article! Ryan

  • Anonymous
    October 24, 2012
    Sorry for the delay. Thank you for downloading the sample. I just fixed the bug and uploaded it to the code.msdn.microsoft.com site. Due this bug, it was not pushing the last set of records to the client.

  • Anonymous
    October 30, 2012
    No problem at all on the delay. I actually found that same issue of missing the last record when writing to the stream and fixed it as well. I've noticed that you're code and mine still ends up with an exception after the last row. It effectively never recognizes the change to the stream.CanRead property, tries to read another set of bytes, comes up with none, and my serializer throws an exception. For a test I simply set canread to false about half way through my dataset, and the client never sees the property changing. Any idea's why the client would never recognize the property CanRead as being false? By the way, I implemented this with a wpf based application we have using a dispatcher and background worker to keep the form responsive and it works really really well.

  • Anonymous
    November 09, 2012
    I nearly have everything working, but I run in to this odd deserialization error everytime i run it, and I'm wondering if you've seen it. I basically make it say 100 rows through a 1000 row stream and it throws this consistently: The input stream is not a valid binary format. The starting contents (in bytes) are: 65-6E-74-69-76-65-20-4D-61-69-6E-74-65-6E-61-6E-63 ..."} System.Exception {System.Runtime.Serialization.SerializationException} Do I need to enable a special encoding before I write and read from the stream or something similar to that? Any help would be appreciated, Ryan

  • Anonymous
    November 11, 2012
    The comment has been removed

  • Anonymous
    November 13, 2012
    The comment has been removed

  • Anonymous
    November 13, 2012
    It's the memory stream. I implemented a basic thread safe memory stream found on code project, here: www.codeproject.com/.../PipeStream-a-Memory-Efficient-and-Thread-Safe-Stre and the issues went away. I'm going to do some additional research into thread safe memory streams because this implementation is much slower than a straight memory stream, but that seems to be the issue. If you have any suggestions on other stream types that might do the trick let me know. Regards, Ryan

  • Anonymous
    February 04, 2013
    Ran this utility and with the existing code in it, the host service could process the records seamlessly if they are in limited to 10-20 thousand. Tried for a 0.1 million record scenario and the service ran out of memory. Any suggestions for handling this scenario? Regards, Sanjay

  • Anonymous
    February 04, 2013
    Really helpful article. We tried the given utility and found an issue with processing of huge number of records in our case around half a million records. For few thousands this utility works fine. Any suggestions for handling huge records? Regards, Sanjay

  • Anonymous
    January 06, 2014
    This is a great example but the code in the download and the code in the blog (the two are different) does not complete properly.  The CanRead property NEVER changes on the client regardless of when you set it to false on the host, which makes terminating the stream impossible.  Has anyone been able to get this to work 100% properly?

  • Anonymous
    March 04, 2014
    Thank you for your good article. Download and test your code, I tried. repeated a number of stream 10030, 8000 have been changed, such as when an error occurs. Errors occur after you import the data from the client to the last. What should I do? Error 1 : The input stream is not a valid binary format. The starting contents (in bytes) are Error 2 : End of Stream encountered before parsing was completed.

  • Anonymous
    March 04, 2014
    Thank you for your good article.(Custom WCF Streaming) Download and test your code, I tried. repeated a number of stream 10030, 8000 have been changed, such as when an error occurs. Errors occur after you import the data from the client to the last. What should I do? Error 1 : The input stream is not a valid binary format. The starting contents (in bytes) are Error 2 : End of Stream encountered before parsing was completed.

  • Anonymous
    January 29, 2015
    Hello, were you ever able to resolve the issue with this message: The input stream is not a valid binary format. The starting contents (in bytes) are: 65-6E-74-69-76-65-20-4D-61-69-6E-74-65-6E-61-6E-63 ..."} System.Exception {System.Runtime.Serialization.SerializationException}

  • Anonymous
    October 25, 2017
    So I have notice with this code, it can only return 3 sets before it starts returning garbage data. You can test this by setting the 'if (count > 10)' to 'if (count > 2)'You will get 6 records back and the reach will be garbage. Not sure what I can check to try and fix this