Using ADO.NET/WCF Data Services for streaming infinite event result sets

ADO.NET/WCF Data Services (DS for short) provides a fantastic mechanism for easily setting up a REST API Data service for data.

In my application I have a data source which is a continuous stream of events.  I have multiple consumers of this event stream, and each consumer wants to see whole or parts of the event stream depending on their needs.
In addition I wanted to stream different types of objects and have it be extensible.  As I was exploring the design space it became really interesting to consider using DS for this.

The advantages of DS are that:

  • It allows for server side complex filtering
  • It allows for projections (the client can select the properties they want)
  • It publishes the schema without having to distribute libraries
  • It doesn’t require a backing database (it can work against Plain Ol’ CLR objects )
  • It integrates with Visual Studio and other tools such as LinqPad

That said, DS is really designed around the idea of finite result sets.  The big question was: “Could I get DS to deal with infinite results?”  I am glad to report that with some subtle changes I was  able to make this work, and that it works well.

This blog post shows how I figured out how tweak DS so that it would work with infinite result sets.  This has resulted in a very easy to implement and robust rich query event monitoring system.

Implementing the Data model
Just like any DS you need to define the data model by providing a class which has properties which are of IQueryable<>.  For purposes of this article I will create an arbitrary example class.

Sample Schema

 

Code Snippet

[DataServiceKeyAttribute("ID")]
public class Quote
{
    public long ID { get;set;}
    public string Symbol { get;set;}
    public Double Price { get;set}
    public float Delta { get;set;}
    public Company { get;set;}
}
[DataServiceKeyAttribute("Symbol")]
public class Company
{
    public string Symbol { get;set;}
    public string Name { get;set;}
    public string[] Keywords{ get;set;}
    public string Country { get;set;}
}

 

IEnumerable<> with Yield == infinite result set
To make this model visible I need a class which exposes my result set.  What I’m doing to exposing a data service around an IEnumerable<> implementation which I ask for an IQueryable<> from.  The IEnumerable/Yield combination will generate objects on demand.

In this example I am assuming there is background mechanism which is keeping the queue filled with items from the quote source.  As you can see this enumerable is an infinite result set that will continue to return results as long as the client is connected and asking for them.

Code Snippet

public StockQuoteStream
{
    private Queue<Quote> _quotes;
    private IEnumerable<Quote> _getQuotes()
    {
        while(HttpContext.Current.Response.IsClientConnected)
        {
            Quote quote=null;
            lock(_quotes)
            {
                if (_quotes.Count > 0)
                    quote = _quotes.Dequeue();
            }
            if (quote != null)
                yield return quote;
            else
                Thread.Sleep(100);
        }
    }

    public IQueryable<Quote> Quotes
     {
        get
        {
            return _GetQuotes().AsQueryable();
        }
     }

    public IQueryable<Company> Companies { get { return new List<Company>().AsQueryable(); } }
}

To create a top level entity set for it I simply get the Enumerator and call the LINQ extension .AsQueryable().  

Complex types
Since I want to return complex types as part of the results they need to be surfaced as real entities.  To do this requires 2 things:

  1. That it has a DataServiceKeyAttribute() attribute on the class
  2. That there is a root level IQueryable<> entity set.

In this example I am returning these objects on the fly without a backing database, so to support the company object then I will simply add an IQueryable<Company> which returns an empty result set.

Code Snippet

public IQueryable<Company> Companies { get { return new List<Company>().AsQueryable(); } }

Supporting Take() by breaking OrderBy

Here is one of the big subtleties I ran into.  If I have an infinite result set like this a logical thing to want to do is ask for a subset of it by using Take(X).  It turns out that DS assumes that a Take() operation only makes sense if it is against an ordered set, and so if there is no order by clause in the expression, it will apply one automatically when $top is passed. 

Since my IQueryable is backed by POCO, it attempts to do an in memory sort on the IEnumerator, which means it tries to get all of the objects from my infinite result set.  Of course, since the end never occurs, it will never return!

To fix this I created a wrapper around the IQueryable which ignores attempts to apply order to the result set.  When the expression is being built up at some point a OrderBy expression is added to the expression tree. This is given to the source to evaluate, so my StreamingQueryable<> template simply ignores it. 

Here is the class:

Code Snippet

/// <summary>
/// StreamingQueryable
/// This is a IQueryable wrapper which drops OrderBy, OrderByDescending, ThenBy, ThenByDescending commands
/// which are impossible with streaming infinite data sets. This enables Take() to work correctly since
/// Take() applies an implicit OrderBy which will block forever so that it can have order
/// </summary>
/// <typeparam name="EntityT"></typeparam>
public class StreamingQueryable<EntityT> : IQueryable<EntityT>, IEnumerable<EntityT>,
                        IQueryProvider, IOrderedQueryable<EntityT>, IOrderedQueryable
{
    private IQueryable<EntityT> _queryable;
    private IEnumerator<EntityT> _enumerator;

    public StreamingQueryable(IQueryable<EntityT> queryable)
    {
        _queryable = queryable;
    }

    #region IQueryProvider Members
    public IQueryable<EntityT> CreateQuery<EntityT>(Expression expression)
    {
        if (expression.NodeType == ExpressionType.Call)
        {
            MethodCallExpression methodExpression = (MethodCallExpression)expression;
            if ((methodExpression.Method.Name == "OrderBy") ||
                (methodExpression.Method.Name == "ThenBy") ||
                (methodExpression.Method.Name == "OrderByDescending") ||
                (methodExpression.Method.Name == "ThenByDescending"))
            {
                return (IQueryable<EntityT>)this;
            }
        }
        return new StreamingQueryable<EntityT>(_queryable.Provider.CreateQuery<EntityT>(expression));
    }

    public IQueryable CreateQuery(Expression expression)
    {
        if (expression.NodeType == ExpressionType.Call)
        {
            MethodCallExpression methodExpression = (MethodCallExpression)expression;
            if ((methodExpression.Method.Name == "OrderBy") ||
                (methodExpression.Method.Name == "ThenBy") ||
                (methodExpression.Method.Name == "OrderByDescending") ||
                (methodExpression.Method.Name == "ThenByDescending"))
            {
                return (IQueryable<EntityT>)this;
            }
        }
        return new StreamingQueryable<EntityT>(_queryable.Provider.CreateQuery<EntityT>(expression));
    }

    public EntityT Execute<EntityT>(Expression expression)
    {
        return _queryable.Provider.Execute<EntityT>(expression);
    }

    public object Execute(Expression expression)
    {
        return _queryable.Provider.Execute(expression);
    }
    #endregion

    #region IEnumerable<EntityT> Members
    public IEnumerator<EntityT> GetEnumerator()
    {
        if (_enumerator == null)
            _enumerator = _queryable.GetEnumerator();
        return _enumerator;
    }

    #endregion

    #region IEnumerable Members
    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        if (_enumerator == null)
            _enumerator = _queryable.GetEnumerator();
        return _enumerator;
    }
    #endregion

    #region IQueryable Members
    public Type ElementType
    {
        get { return typeof(EntityT); }
    }

    public System.Linq.Expressions.Expression Expression
    {
        get
        {
            return _queryable.Expression;
        }
    }

    public IQueryProvider Provider
    {
        get { return this; }
    }

    #endregion

    #region IDisposable Members
    public void Dispose()
    {
    }
    #endregion
}

To use it I now create an instance around the IQueryable I return in my model.  This makes changes my Quotes property from above to now look like this:

 

Code Snippet

public IQueryable<Quote> Quotes
{
    get
    {
        return new StreamingQueryable<Quote>(_getQuotes().AsQueryable());
    }
}

 

And allows it have Take() clauses not wait forever.

Making the Web Service streaming

OK, so now that the data model is defined we need to do some extra magic to make the data service stream an infinite result set.  The problem is that DS out of the box is built on WCF which assumes that the payload will be a closed set, thus giving it a message that it will push on the transport.

What I wanted on the other hand was a continuous HTTP stream that never ended.  If I tried to run the above service in a normal WCF service it would never return (again). 

There were 2 solutions I could see…

  1. figure out how to set all of the right magic knobs to get WCF to stop doing something that is pretty innate to WCF
  2. simply bypass it entirely by writing an IHttpRequest handler. 

Since the later was both simpler (no WCF stack involved) and only about 20 lines of code I opted for the latter approach.

HttpHost
It turns out that DS uses an interface IDataServiceHost to be hosted within WCF.  If I simply implement that contract I can take over the details of how to communicate with the client that is requesting data.  I created a class called HttpHost which maps between an HttpContext and the DS, enabling compression for good measure.

Code Snippet

/// <summary>
/// Simple implementation of IDataServiceHost on top of
/// the ASP.NET API
/// </summary>
public class HttpHost : IDataServiceHost, IDataServiceHost2
{
    private Uri serviceUri;
    private Uri absoluteUri;
    private HttpContext context;
    private Stream _stream = null;
    private string responseDataServiceVersion = string.Empty;
    private string responseCacheControl = string.Empty;
    private string responseContentType = string.Empty;
    private string responseLocation = string.Empty;
    private string responseETag = string.Empty;

    public HttpHost(Uri serviceUri, HttpContext context)
    {
        if (serviceUri == null) throw new ArgumentNullException("serviceUri");
        if (context == null) throw new ArgumentNullException("context");
        this.context = context;

        this.serviceUri = serviceUri;
        this.absoluteUri = context.Request.Url;
        if (absoluteUri.AbsoluteUri.Length < serviceUri.AbsoluteUri.Length)
            absoluteUri = new Uri(absoluteUri.AbsoluteUri + "/");
    }

    #region IDataServiceHost Members

    public Uri AbsoluteRequestUri
    {
        get
        {
            return this.absoluteUri;
        }
    }

    public Uri AbsoluteServiceUri
    {
        get { return this.serviceUri; }
    }

    public string GetQueryStringItem(string item)
    {
        return this.context.Request[item];
    }

    public void ProcessException(HandleExceptionArgs args)
    {
        // this empty implementation results in the default behavior staying as-is
        context.Response.Write(args.Exception.ToString());
        context.Response.Flush();
        context.Response.Close();
    }

    public string RequestAccept
    {
        get { return this.context.Request.Headers["Accept"]; }
    }

    public string RequestAcceptCharSet
    {
        get { return this.context.Request.Headers["Accept-Charset"]; }
    }

    public string RequestContentType
    {
        get { return this.context.Request.Headers["Content-Type"]; }
    }

    public string RequestHttpMethod
    {
        get
        {
            // if life was simpler this would just be context.Request.HttpMethod,
            // but in order to support HTTP method tunneling (required for
            // Silverlight among other cases), we need to do a bit more

            string method = this.context.Request.Headers["X-HTTP-Method"];

            if (string.IsNullOrEmpty(method))
            {
                method = this.context.Request.HttpMethod;
            }
            else if (this.context.Request.HttpMethod.Equals("POST", StringComparison.InvariantCultureIgnoreCase))
            {
                if (method.Equals("PUT", StringComparison.InvariantCultureIgnoreCase) ||
                    method.Equals("DELETE", StringComparison.InvariantCultureIgnoreCase) ||
                    method.Equals("MERGE", StringComparison.InvariantCultureIgnoreCase))
                {
                    throw new InvalidOperationException("HTTP method tunneling will only tunnel PUT, DELETE and MERGE requests");
                }
            }
            else
            {
                throw new InvalidOperationException("HTTP method tunneling should be done through POST requests.");
            }

            return method;
        }
    }

    public string RequestIfMatch
    {
        get { return this.context.Request.Headers["if-match"]; }
    }

    public string RequestIfNoneMatch
    {
        get { return this.context.Request.Headers["if-none-match"]; }
    }

    public string RequestMaxVersion
    {
        get { return this.context.Request.Headers["MaxDataServiceVersion"]; }
    }

    public System.IO.Stream RequestStream
    {
        get { return this.context.Request.InputStream; }
    }

    public string RequestVersion
    {
        get { return this.context.Request.Headers["DataServiceVersion"]; }
    }

    public string ResponseCacheControl
    {
        get
        {
            return this.responseCacheControl;
        }
        set
        {
            this.context.Response.AddHeader("cache-control", value);
            this.responseCacheControl = value;
        }
    }

    public string ResponseContentType
    {
        get
        {
            return this.responseContentType;
        }
        set
        {
            if (String.IsNullOrEmpty(this.responseContentType))
            {
                this.context.Response.AddHeader("Content-Type", value);
                this.responseContentType = value;
            }
        }
    }

    public string ResponseETag
    {
        get
        {
            return this.responseETag;
        }
        set
        {
            this.context.Response.AddHeader("ETag", value);
            this.responseETag = value;
        }
    }

    public string ResponseLocation
    {
        get
        {
            return this.responseLocation;
        }
        set
        {
            this.context.Response.AddHeader("location", value);
            this.responseLocation = value;
        }
    }

    public int ResponseStatusCode
    {
        get
        {
            return this.context.Response.StatusCode;
        }
        set
        {
            this.context.Response.StatusCode = value;
        }
    }

    public System.IO.Stream ResponseStream
    {
        get
        {
            string encoding = context.Response.Headers["Content-Encoding"];
            if (encoding == "deflate")
                return _stream = new DeflateStream(this.context.Response.OutputStream, CompressionMode.Compress);
            else if (encoding == "gzip")
                return _stream = new GZipStream(this.context.Response.OutputStream, CompressionMode.Compress);

            return _stream = this.context.Response.OutputStream;
        }
    }

    public string ResponseVersion
    {
        get
        {
            return this.responseDataServiceVersion;
        }
        set
        {
            this.context.Response.AddHeader("DataServiceVersion", value);
            this.responseDataServiceVersion = value;
        }
    }

    #endregion

    #region IDataServiceHost2 Members

    public System.Net.WebHeaderCollection RequestHeaders
    {
        get
        {
            var whc = new WebHeaderCollection();
            whc.Add(context.Request.Headers);
            return whc;
        }
    }

    public System.Net.WebHeaderCollection ResponseHeaders
    {
        get
        {
            var whc = new WebHeaderCollection();
            whc.Add(context.Response.Headers);
            return whc;
        }
    }

    #endregion

    public void Flush()
    {
        _stream.Flush();
    }

    public void Close()
    {
        _stream.Close();
    }
}

 

StreamingDataService
Now that I have HttpHost in hand, I can go about implementing my IHttpRequest handler. 

I ended up encapsulating the IHttpRequest into a single class I called StreamingDataService<>.  This class derives from DataService<T> and implements IHttpRequest aournd a HttpHost to do the binding.

This class is pretty straight forward.  It simply handles an Http request by implementing ProcessRequest() calls.  In ProcessRequest() it creates an HttpHost, binds the 2 together and calls DS.ProcessRequest(). 

Code Snippet

/// <summary>
    /// A generic IHttpHandler which provides a streaming DataService around a DataModel T
    /// </summary>
    public class StreamingDataService<DataModelT> : DataService<DataModelT>, IHttpHandler where DataModelT : class
    {
        public void ProcessRequest(HttpContext context)
        {
            // enable compression
            string acceptEncoding = context.Request.Headers["Accept-Encoding"];
            if (!String.IsNullOrEmpty(acceptEncoding))
            {
                if (acceptEncoding.ToLower().Contains("deflate"))
                {
                    context.Response.AppendHeader("Content-Encoding", "deflate");
                }
                else if (acceptEncoding.ToLower().Contains("gzip"))
                {
                    context.Response.AppendHeader("Content-Encoding", "gzip");
                }
            }

            // turn off buffering so that it is streamed in real time
            context.Response.BufferOutput = false;

            // figure out the service URI
            string serviceUri = context.Request.Url.AbsoluteUri.Substring(0, context.Request.Url.AbsoluteUri.ToLower().IndexOf(".ashx") + 5);
            if (!serviceUri.EndsWith("/"))
                serviceUri += "/";

            // create a HttpHost which will tell the DataService how to communicate to our client
            HttpHost httpHost = new HttpHost(new Uri(serviceUri), context);

            // but host it in our IIS host instead of WCF, this bypasses the blocking that WCF does
            AttachHost(httpHost);

            // process the request. This will send results until the client connection is dropped
            ProcessRequest();

            httpHost.Flush();
            httpHost.Close();

            context.Response.Flush();
            context.Response.End();
        }

        public bool IsReusable
        {
            get
            {
                return false;
            }
        }

Define our data service and data service host

Now that we have all of these nice helper classes, I can simply define the data service and data service host. 

To do that

  1. I create a new Generic Handler (.ASHX) file
  2. Replace the IHttphandler with my StreamingDataService<>

Code Snippet

/// <summary>
/// Define our data service host as a generic handler
/// </summary>
[WebService(Namespace = "https://tempuri.org/")]
[WebServiceBinding(ConformsTo = WsiProfiles.BasicProfile1_1)]
[ServiceBehavior(IncludeExceptionDetailInFaults = true)]
public class QuoteStreamService : StreamingDataService<QuoteStream>
{
    // This method is called only once to initialize service-wide policies.
    public static void InitializeService(DataServiceConfiguration config)
    {
        // set everything to readonly
        config.SetEntitySetAccessRule("*", EntitySetRights.AllRead);
        config.SetServiceOperationAccessRule("*", ServiceOperationRights.AllRead);

        // set service behavior configuration options
        config.DataServiceBehavior.AcceptCountRequests = false;
        config.DataServiceBehavior.AcceptProjectionRequests = true;
        config.DataServiceBehavior.MaxProtocolVersion = System.Data.Services.Common.DataServiceProtocolVersion.V2;
    }
}

Voila! I now have a streaming data service over an infinite result set! Hooray for me!

(BTW: An excellent tool for exploring a new DS such as this is the beta version of LinqPAD ( https://linqpad.com ) which supports querying DS services. Just make sure you include a .Take() clause!)

Consuming the DS from the Client Code

Add Service Reference

Now I can simply go to VS and in Add a Service Reference to my service.  VS will automagically generate proxy client code around the metadata published by my DS.  Life is good.

Querying

To query I can now use LINQ and (as long as I provide a Take() clause ) I can access the data from the data service using the strongly typed objects generated by adding the service reference.  Look at how cool this is:

Code Snippet

QuoteStream quoteStream  = new QuoteStream(new Uri(“https://yourservice/QuoteStream.ashx"));
var results = ( from quote in quoteStream.Quotes.Expand("Company")
                where quote.symbol=="IBM"
                select quote).Take(5);
foreach(Quote quote in results)
    Console.WriteLine(quote.Symbol);

This code will take the first 5 quotes for IBM it finds in the continuous result stream. 

As long as all I want to do is take a snapshot of the live stream like this, I’m done.  Of course, the whole point of this blog post was that I wanted continuous streaming results, so I can’t stop here, I must press on. 
Processing Continuous Results
I wanted to be able to continuously monitor the live stream that I’m getting.  The out of box DataServiceQuery<> object from DS doesn’t work because like so many other components it wants to get a finite result set.

After much poking around I discovered that I could leverage DS to build the Url I want, but bypass the client logic for connecting to the data service and processing results.

The key is that I access the Url by simply calling ToString() on query. Cool! 

With the Url now in hand I can simply issue my own HTTP GET of the Url and get back a stream in the HttpRequest.ResponseStream of my data in either JSON or ATOM format. I can then simply create an XmlTextReader() around that stream and parse my data on the fly.

Code Snippet

// use the URI which is generaeted by that to get the results we want
HttpWebRequest request = WebRequest.Create(query.ToString()) as HttpWebRequest;
request.Method = "GET";
if (json)
    request.Accept = "application/json";
if (UseCompression)
    request.Headers.Add(HttpRequestHeader.AcceptEncoding, "gzip,deflate");

// submit request and get the response
HttpWebResponse response = request.GetResponse() as HttpWebResponse;

Stream stream = response.GetResponseStream();
if (response.ContentEncoding.ToLower().Contains("deflate"))
    stream = new DeflateStream(stream, CompressionMode.Decompress);
else if (response.ContentEncoding.ToLower().Contains("gzip"))
    stream = new GZipStream(stream, CompressionMode.Decompress);
XmlTextReader reader = new XmlTextReader(stream);
while(true)
{
    //...pull your data from the stream
}

Ultimately, I ended up writing a class I called StreamingDataServiceQuery<T> which implemented IEnumerable<T> and used my own JSON deserializer I wrote to materialize the objects from a JSON stream. 

I did this for a number of reasons:

  1. JSON is about 1/3 the size of XML serialized
  2. Complex objects are stored inline in JSON which is easier to parse than ATOM, where the inner collections come as a later chunk of data that needs to be stitched up

Here is my hacked up JSONMaterializer

Code Snippet

    /// <summary>
    /// JSON Materializer 
    /// Given a stream it will instantiate objects from it that match a given set of entity types passed int
    /// </summary>
    /// <typeparam name="T">The root type to deserialize</typeparam>
    public class JSONMaterializer
    {
        // record for propertyinfos for a given type
        private class TypeMap
        {
            public Type Type;
            public Dictionary<string, PropertyInfo> Properties = new Dictionary<string, PropertyInfo>();
            // just the end name, not the namepace. This allows us to map server side names to client side names
            public string Name;

            public TypeMap(Type type)
            {
                Type = type;
                // remember prop infos
                foreach (PropertyInfo propInfo in type.GetProperties())
                {
                    Properties.Add(propInfo.Name, propInfo);
                }

                Name = type.Name.Substring(type.Name.LastIndexOf(".") + 1);
            }
        }

        // metadata record for object definitions
        private class __metadata
        {
            public string uri { get; set; }
            public string type { get; set; }
        }

        // cache of types serializers
        private static Dictionary<string, TypeMap> g_typeMaps = null;
        private TypeMap _typeMap;

        /// <summary>
        /// Root deserializer for all types
        /// </summary>
        /// <param name="types"></param>
        public JSONMaterializer(Type[] types)
        {
            if (g_typeMaps == null)
            {
                g_typeMaps = new Dictionary<string, TypeMap>();
                foreach (Type type in types)
                {
                    TypeMap typeMap = new TypeMap(type);
                    g_typeMaps.Add(typeMap.Name, typeMap);
                }
            }
        }

        /// <summary>
        /// Deserializer for a type
        /// </summary>
        /// <param name="reader"></param>
        /// <returns></returns>
        public JSONMaterializer(Type type)
        {
            _typeMap = _lookupTypeMapByType(type);
        }

        /// <summary>
        /// Deserializer which auto detects type based on __metadata information
        /// </summary>
        public JSONMaterializer()
        {
            _typeMap = null;
        }

        /// <summary>
        /// Read an object from the stream
        /// </summary>
        /// <param name="reader"></param>
        /// <returns>object</returns>
        public object GetObject(BufferedStreamReader reader)
        {
            object targetObj = null;
            char c = ' ';
            while (!reader.EndOfStream)
            {
                string name = null;
                string value = null;
                switch (c = SkipWhitespace(reader))
                {
                    case '{':
                        Debug.Assert(targetObj == null);
                        name = ReadName(reader);
                        value = null;

                        switch (name)
                        {
                            case "d":
                                c = SkipWhitespace(reader);
                                Debug.Assert(c == ':');
                                c = ReadChar(reader);
                                c = SkipWhitespace(reader);
                                c = ReadChar(reader);
                                if (c == '[')
                                    return GetObject(reader);
                                break;

                            case "__deferred":
                                // just skip these
                                __metadata deferred = ReadMetadata(reader);
                                break;

                            case "__metadata": // it's a type block
                                __metadata metadata = ReadMetadata(reader);
                                if (_typeMap == null)
                                    _typeMap = _lookupTypeMapByName(metadata.type);

                                // create the target object
                                targetObj = _typeMap.Type.GetConstructor(System.Type.EmptyTypes).Invoke(null);
                                break;
                        }
                        break;

                    case ',':
                        ReadChar(reader); // drop ,
                        break;

                    case '\"':
                        {
                            name = ReadName(reader);
                            value = null;
                            SkipWhitespace(reader);
                            if (PeekChar(reader) == ':')
                                ReadChar(reader);
                            switch (SkipWhitespace(reader))
                            {
                                case '{': // start of a subrecord
                                    {
                                        JSONMaterializer materializer = new JSONMaterializer();
                                        object obj = materializer.GetObject(reader);
                                        SetObjectProperty(targetObj, name, obj);
                                    }
                                    break;

                                case '[':
                                    {
                                        // skip [
                                        ReadChar(reader);
                                        if (SkipWhitespace(reader) != ']')
                                        {
                                            // this is an array, create a materializer for the type of array
                                            JSONMaterializer materializer = new JSONMaterializer();
                                            IList list = null;
                                            do
                                            {
                                                object o = materializer.GetObject(reader);
                                                if (o != null)
                                                {
                                                    // and a Collection<TYPE> to store it
                                                    if (list == null)
                                                        list = CreateGeneric(typeof(System.Collections.ObjectModel.Collection<>), o.GetType()) as IList;
                                                    list.Add(o);
                                                }
                                            } while ((c = SkipWhitespace(reader)) != ']');

                                            ReadChar(reader); // skip ']'

                                            // assingn list to property
                                            SetObjectProperty(targetObj, name, list);
                                        }
                                        else
                                            ReadChar(reader); // eat the ']'
                                    }
                                    break;

                                default:
                                    // this is a name/value pair
                                    value = ReadValue(reader);

                                    // set the property value on the target object
                                    SetObjectProperty(targetObj, name, value);
                                    break;

                            }
                        }
                        break;

                    case '}':
                        // end of block
                        ReadChar(reader); // pull } off
                        // and return materialized object
                        return targetObj;

                    case ']':
                        ReadChar(reader); // this should be the end of the stream in cases where there are constraints
                        break;

                    default:
                        {
#if DEBUG
                            Debug.Write(_charBuffer.ToString());
#endif
                            StringBuilder sb = new StringBuilder();
                            int nLines = 100;
                            try
                            {
                                string line;
                                while (((line = reader.ReadLine()) != null) && (nLines-- > 0))
                                {
                                    sb.AppendLine(line);
                                }
                            }
                            catch (Exception)
                            {
                            }
                            Debug.WriteLine(sb.ToString());
                            throw new InvalidDataException(sb.ToString());
                        }
                }
            }
            return null;// all done!
        }

        /// <summary>
        /// Read metadata header for object entry from the stream
        /// </summary>
        /// <param name="reader"></param>
        /// <returns>metadata</returns>
        private __metadata ReadMetadata(BufferedStreamReader reader)
        {
            string key;
            string value = "";
            __metadata md = new __metadata();
            while (SkipWhitespace(reader) != '}')
            {
                key = ReadName(reader);
                if (PeekChar(reader) == ':')
                    ReadChar(reader);
                value = ReadValue(reader);
                switch (key)
                {
                    case "uri":
                        md.uri = value;
                        break;
                    case "type":
                        md.type = value.Substring(value.LastIndexOf('.') + 1);
                        break;

                    default:
                        // unknown property, just ignore it
                        break;
                }
            }
            ReadChar(reader); // eat }
            return md;
        }

        private static long InitialJavaScriptDateTicks = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks;
        /// <summary>
        /// Assign a strongly typed property value on an object
        /// </summary>
        /// <param name="targetObj"></param>
        /// <param name="key"></param>
        /// <param name="value"></param>
        private void SetObjectProperty(object targetObj, string key, object value)
        {
            PropertyInfo propInfo = null;
            if (_typeMap.Properties.TryGetValue(key, out propInfo))
            {
                switch (propInfo.PropertyType.Name.ToLower())
                {
                    case "boolean":
                        {
                            bool val;
                            if (bool.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "double":
                        {
                            Double val;
                            if (Double.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "single":
                        {
                            Single val;
                            if (Single.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;

                    case "int16":
                        {
                            Int16 val;
                            if (Int16.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "int32":
                        {
                            Int32 val;
                            if (Int32.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "int64":
                        {
                            Int64 val;
                            if (Int64.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "uint16":
                        {
                            UInt16 val;
                            if (UInt16.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "uint32":
                        {
                            UInt32 val;
                            if (UInt32.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "uint64":
                        {
                            UInt64 val;
                            if (UInt64.TryParse((string)value, out val))
                                propInfo.SetValue(targetObj, val, null);
                        }
                        break;
                    case "datetime":
                        {
                            string sdate = ((string)value).Substring(6, ((string)value).Length - 8);
                            DateTime val;
                            long ticks = 0;
                            if (long.TryParse(sdate, out ticks))
                            {
                                DateTime date = new DateTime((ticks * 10000) + InitialJavaScriptDateTicks, DateTimeKind.Utc);
                                propInfo.SetValue(targetObj, date, null);
                            }
                        }
                        break;
                    case "string":
                        propInfo.SetValue(targetObj, value, null);
                        break;

                    default:
                        propInfo.SetValue(targetObj, value, null);
                        break;
                }
            }
        }

        private TypeMap _lookupTypeMapByType(Type type)
        {
            return _lookupTypeMapByName(type.Name.Substring(type.Name.LastIndexOf(".") + 1));
        }

        private TypeMap _lookupTypeMapByName(String name)
        {
            int i = name.LastIndexOf(".");
            if (i > 0)
                name = name.Substring(i + 1);

            return g_typeMaps[name];
        }

        #region stream utils
        /// <summary>
        /// Skip whitespace returning current non-whitespace char PEEKed
        /// </summary>
        /// <param name="reader"></param>
        /// <returns>the peek of the non-whitespace char</returns>
        public char SkipWhitespace(BufferedStreamReader reader)
        {
            char c = ' ';
            // skip whitespace
            while (Char.IsWhiteSpace((c = PeekChar(reader))))
                ReadChar(reader);
            return c;
        }

        /// <summary>
        /// Read a "quoted \"ignoring\" escpaed string"
        /// </summary>
        /// <param name="reader"></param>
        /// <returns></returns>
        public string ReadQuotedValue(BufferedStreamReader reader)
        {
            char c = SkipWhitespace(reader);

            // read up to start quote
            while (PeekChar(reader) != '\"')
                ReadChar(reader);
            // read past start quote
            ReadChar(reader);

            // go to end of field
            string value = "";
            while ((c = ReadChar(reader)) != '\"')
            {
                if (c == '\\')
                {
                    char peek = PeekChar(reader);
                    switch (peek)
                    {
                        case '\\':
                        case '/':
                        case '\"':
                        case '\'':
                            // then drop \\cf0
                            value += ReadChar(reader); // add on ", it's not the end
                            break;
                        case 'b':
                            value += "\b";
                            ReadChar(reader);
                            break;
                        case 'f':
                            value += "\f";
                            ReadChar(reader);
                            break;
                        case 'n':
                            value += "\n";
                            ReadChar(reader);
                            break;
                        case 'r':
                            value += "\r";
                            ReadChar(reader);
                            break;
                        case 't':
                            value += "\t";
                            ReadChar(reader);
                            break;
                        case 'u':
                            ReadChar(reader); // skip u
                            string hex = String.Format("{0}{1}{2}{3}", ReadChar(reader), ReadChar(reader), ReadChar(reader), ReadChar(reader));
                            Int16 r;
                            if (Int16.TryParse(hex, System.Globalization.NumberStyles.AllowHexSpecifier, null, out r))
                                value += (char)r;
                            break;

                        default:
                            value += c;
                            break; //
                    }
                }
                else
                    value += c;
            }

            // skip end quote
            if (PeekChar(reader) == '\"')
                ReadChar(reader);

            return value;
        }

#if DEBUG
        private StringBuilder _charBuffer = new StringBuilder();
#endif
        private char[] _char = new char[1];
        /// <summary>
        ///  read a single char
        /// </summary>
        /// <param name="reader"></param>
        /// <returns></returns>
        public char ReadChar(BufferedStreamReader reader)
        {
            char c = reader.ReadChar();
#if DEBUG
            if (_charBuffer.Length > 80)
            {
                Debug.Write(_charBuffer);
                _charBuffer.Length = 0;
            }
            _charBuffer.Append(c);
#endif

            return c;
        }

        /// <summary>
        /// peek a single char, not removing it from stream
        /// </summary>
        /// <param name="reader"></param>
        /// <returns></returns>
        public char PeekChar(BufferedStreamReader reader)
        {
            return reader.PeekChar();
        }

        /// <summary>
        /// Read quoted name string
        /// </summary>
        /// <param name="reader"></param>
        /// <returns></returns>
        public string ReadName(BufferedStreamReader reader)
        {
            return ReadQuotedValue(reader);
        }

        /// <summary>
        /// Read value
        /// </summary>
        /// <param name="reader"></param>
        /// <returns>value as a string</returns>
        public string ReadValue(BufferedStreamReader reader)
        {
            char c;
            string value = "";
            c = SkipWhitespace(reader);
            if (c == '\"')
                return ReadQuotedValue(reader);

            while (true)
            {
                c = PeekChar(reader);
                if (!Char.IsWhiteSpace(c) &&
                    c != ',' && c != '}')
                {
                    value += ReadChar(reader);
                }
                else
                    break;
            }
            return value;
        }
        #endregion

        /// <summary>
        /// helper to create a generic class instance
        /// </summary>
        /// <param name="generic"></param>
        /// <param name="innerType"></param>
        /// <param name="args"></param>
        /// <returns></returns>
        private static object CreateGeneric(Type generic, Type innerType, params object[] args)
        {
            Type specificType = generic.MakeGenericType(new System.Type[] { innerType });
            return Activator.CreateInstance(specificType, args);
        }

    }

    public class BufferedStreamReader
    {
        private StreamReader _reader;
        private string _currentLine = null;
        private int _pos = 0;

        public BufferedStreamReader(StreamReader streamReader)
        {
            _reader = streamReader;
            _currentLine = null;
            _pos = 0;
        }

        public char ReadChar()
        {
            if ((_currentLine == null) || (_pos == _currentLine.Length))
            {
                _currentLine = ReadLine();
                _pos = 0;
                return _currentLine[_pos++];
            }
            return _currentLine[_pos++];
        }

        public char PeekChar()
        {
            if ((_currentLine == null) || (_pos == _currentLine.Length))
            {
                _currentLine = ReadLine();
                _pos = 0;
                return _currentLine[_pos];
            }
            return _currentLine[_pos];
        }

        public string ReadLine()
        {
            if ((_currentLine != null) && (_pos < _currentLine.Length))
            {
                string result = _currentLine.Substring(_pos);
                _currentLine = null;
                return result;
            }
            else
            {
                _currentLine = null;
                _pos = 0;
                return _reader.ReadLine();
            }
        }

        public bool EndOfStream { get { return _reader.EndOfStream; } }
    }

And my StreamingDataServiceQuery<> template:

Code Snippet

/// <summary>
/// Creates a non-blocking client Enumerator for continiously streaming results
/// </summary>
/// <typeparam name="T"></typeparam>
public class StreamingDataServiceQuery<T> : IEnumerable<T>
{
    private IQueryable<T> _query;
    private List<Type> _types;

    #region Constructor
    public StreamingDataServiceQuery(IQueryable<T> query, string appid) :
        this(query, appid, typeof(T).Assembly.GetTypes().ToArray(), true)
    {
    }

    public StreamingDataServiceQuery(IQueryable<T> query, string appid, Type[] types) :
        this(query, appid, types, true)
    {
    }

    /// <summary>
    /// CTOR which takes a collection of types which might be returned in the entity stream
    /// </summary>
    /// <param name="query"></param>
    /// <param name="types"></param>
    public StreamingDataServiceQuery(IQueryable<T> query, string appID, Type[] types, bool useCompression)
    {
        AppID = appID;
        UseCompression = useCompression;
        _query = query;
        _initTypes(types);
    }

    private void _initTypes(Type[] types)
    {
        _types = new List<Type>();
        foreach (Type t in types)
        {
            foreach (Type type in t.Assembly.GetTypes())
            {
                if (type.BaseType.Name != "DataServiceContext")
                {
                    if (!_types.Contains(type))
                        _types.Add(type);
                }
            }
        }
    }
    #endregion

    #region UseCompression
    /// <summary>
    /// UseCompression, if it is set to true (default) it will use a compressed stream
    /// </summary>
    public bool UseCompression
    {
        get;
        set;
    }
    #endregion

    #region AppID
    public string AppID
    {
        get;
        set;
    }
    #endregion

    #region IEnumerable<T> Members
    /// <summary>
    /// strongly typed enumerator of results
    /// </summary>
    /// <returns></returns>
    public IEnumerator<T> GetEnumerator()
    {
        IEnumerator items = ((IEnumerable)this).GetEnumerator();
        while (items.MoveNext())
            yield return (T)items.Current;
    }

    #endregion

    #region IEnumerable Members
    /// <summary>
    /// Untyped enumerate of results
    /// </summary>
    /// <returns></returns>
    IEnumerator IEnumerable.GetEnumerator()
    {
        BufferedStreamReader reader = new BufferedStreamReader(GetJSONStreamReader());

        // create a JSON materializer around the stream
        Object obj = null;
        JSONMaterializer materializer = new JSONMaterializer(_types.ToArray<Type>());

        // enumerate results
        while ((obj = materializer.GetObject(reader)) != null)
            yield return obj;
    }
    #endregion

    #region GetXXXStreamReader()
    /// <summary>
    /// Execute the query and return a streamreader in JSON format
    /// </summary>
    /// <returns></returns>
    public StreamReader GetJSONStreamReader()
    {
        return _getResultsStream(true);
    }

    /// <summary>
    /// Execute the query and return a streamreader in ATOM format
    /// </summary>
    /// <returns></returns>
    public StreamReader GetATOMStreamReader()
    {
        return _getResultsStream(false);
    }
    #endregion

    #region _getResultsStream()
    private StreamReader _getResultsStream(bool json)
    {
        // use the URI which is generaeted by that to get the results we want
        HttpWebRequest request = WebRequest.Create(_query.ToString()) as HttpWebRequest;
        request.Method = "GET";
        if (json)
            request.Accept = "application/json";

        if (UseCompression)
            request.Headers.Add(HttpRequestHeader.AcceptEncoding, "gzip,deflate");

        request.Timeout = (int)TimeSpan.FromMinutes(3).TotalMilliseconds;
        int count = 0;
        while (count++ < 3)
        {
            try
            {
                // submit request and get the response
                HttpWebResponse response = request.GetResponse() as HttpWebResponse;

                // get a streamreader around response stream
                Stream stream = null;
                if (response.ContentEncoding.ToLower().Contains("gzip"))
                    stream = new GZipStream(response.GetResponseStream(), CompressionMode.Decompress);
                else if (response.ContentEncoding.ToLower().Contains("deflate"))
                    stream = new DeflateStream(response.GetResponseStream(), CompressionMode.Decompress);
                else
                    stream = response.GetResponseStream();

                StreamReader reader = new StreamReader(stream, Encoding.UTF8);
                return reader;
            }
            catch (Exception err)
            {
                if (count >= 3)
                    throw err;
                Trace.TraceError(err.ToString());
            }
            Thread.Sleep(5000);
        }
        throw new System.Exception(String.Format("Couldn't connect to {0}", _query.ToString()));
    }
    #endregion
}

This ultimately allowed me to write client code like this:

Code Snippet

QuoteStream quoteStream  = new QuoteStream(new Uri(“https://yourservice/QuoteStream.ashx"));
var results = ( from quote in quoteStream.Quotes.Expand("Company")
                where quote.symbol=="IBM"
                select quote);
foreach(Quote quote in new StreamingDataServiceQuery<Quote>(results))
    Console.WriteLine(quote.Symbol);

The foreach loop never ends (it’s infinite after all) but this allows me to created a server side filter on a live stream of data with very clear concise efficient code.

If you want to download a complete working solution with client code including silverlight download this solution

https://cid-8dc85127f8ce2f12.skydrive.live.com/self.aspx/Dropbox/StreamingDataServices.zip

I want to give a gianormous shout out to Pablo Castro, Architect on the ADO.NET/WCF Data Services team who provided a ton of insight and help as I went off down this rabbit hole. He is a fantastic person to put up with my pestering questions and I couldn’t have figured all of this out without his help.

Comments

  • Anonymous
    July 01, 2010
    Did you ever do any scalability tests? How many connected clients can one server handle with this approach? Thanks!

  • Anonymous
    August 28, 2010
    Tom, Thanks for a fantastic post.  The download link no longer works, could you upload it again?

  • Anonymous
    September 08, 2010
    #1 I updated the download link to it's new location #2 Yes it scales. We use this architecture for our backend twitter processing pipeline in bing.com, where we are processing many hundreds of entities a second.   NOTE: From a scaling standpoint at some large scale point if you have a lot of clients you will hit a bottleneck because you are serializing the same entity over and over again.  The solution for that is to "remember" a given projection's serialization so you can simply write it out to any client that needs it bypassing the serialization costs. 99% of projects won't have the scale that requires this change, but with this change we can handle a very very large amount of streaming data.

  • Anonymous
    May 07, 2012
    When I try to self host the service, the server doesn't seem to support JSON output and I receive an exception of (415) Unsupported Media Type. So, 1. How do I enabled JSON support in a self-hosted scenario? 2. How to materialize a Atom instead of JSON feed?