LINQ Queries as Streams and Thread Safety

A few weeks ago, Torsten Grabs – a colleague on the StreamInsight team and accomplished violist – came to me with a potential bug. He was seeing an ADO.NET SqlClient exception when running a StreamInsight query: “There is already an open DataReader associated with this Command which must be closed first. ” I suggested he enable Multiple Active Result Sets (MARS) in his SQL connection and this appeared to solve the problem. Digging a little deeper, a few issues occurred to us that will affect StreamInsight and Rx queries using external data sources. Before starting to dig, let’s dissect Torsten’s code to understand what’s happening.

static void Main(string[] args)
{
    using (var ctx = new MyObjectContext())
    {
        var source1 = from c in ctx.Table1
                      ...

        var source2 = from c in ctx.Table2
                      ...

        using (var server = Server.Create("Default"))
        {
            var app = server.CreateApplication("ThreadSafety");

            var stream1 = source1.ToPointStream(app, ...);
            
            var stream2 = source1.ToPointStream(app, ...);

            var query = from e1 in stream1
                        from e2 in stream2
                        ...

            var sink = query.ToEnumerable();

            using (var enumerator = sink.GetEnumerator()) // Exception!

(StreamInsight 1.1 adds support for .NET event sequences. Using the ToStream, ToPointStream, etc. extension methods, arbitrary LINQ queries (IEnumerable<> , IQueryable<> , IObservable<> , or IQbservable<> ) can be turned into event streams processed by the StreamInsight engine. For a quick introduction to the feature, see my previous post.)

First of all, what’s a SqlClient exception doing in a StreamInsight query? StreamInsight doesn’t leverage the SqlClient libraries, or SQL Server for that matter. The exception originates in the LINQ to Entities query source and passes through the StreamInsight query. When the StreamInsight query is evaluated, it automatically triggers evaluation of the underlying LINQ to Entities queries where the problem occurs.

Why does this program result in multiple simultaneous data readers? The behavior of ToPointStream is the key. StreamInsight turns an IEnumerable into a stream by pulling on its iterator from a dedicated thread. In the above example, the same SQL connection is being used by two data readers simultaneously because stream1 and stream2 are being processed in parallel by the StreamInsight query.

image

By enabling MARS (setting MultipleActiveResultSets=true in our SQL connection string), we can partially address the problem Torsten encountered. However, you need to look closely at the thread-safety boilerplate in MSDN for the Entity Framework. In summary, it isn’t. In practice, you may see intermittent failures if you attempt to access the same ObjectContext simultaneously on two threads. See the EF FAQ site for more information.

A more reliable solution uses separate contexts for each query:

using (var ctx1 = new MyObjectContext())
using (var ctx2 = new MyObjectContext())
{
    var source1 = from c in ctx1.Table1
                  ...

    var source2 = from c in ctx2.Table2
                  ...

Apart from thread safety concerns, some other advise applies when using LINQ to Entities or LINQ to SQL with StreamInsight:

  1. Use MergeOptions.NoTracking or ObjectTrackingEnabled = false if you’re merely streaming results. There is no need to track entities in the ObjectContext or DataContext if StreamInsight is the only consumer of query results.
  2. StreamInsight imposes strong restrictions on the shape of event payloads. Ensure that source queries project only supported primitive fields, as outlined in “Payload Field Requirements”.
  3. To ensure that StreamInsight can efficiently process a query source as a temporal stream, consider ordering the source query by event sync time.

The above advise is outlined in the following more detailed code example:

ctx1.Table1.MergeOption = MergeOption.NoTracking;

var source1 = from c in ctx1.Table1
              where c.Created != null
              // order by "Created" time, which is the sync time
              // for point event inputs
              orderby c.Created
              // project supported payload fields
              select new { Created = c.Created.Value, c.ID, c.Text };

var stream1 = source1.ToPointStream(app,
    r => PointEvent.CreateInsert(r.Created, r),
    // indicate that sync times are monotonic
    AdvanceTimeSettings.IncreasingStartTime);