Rx on the server, part 5 of n: Observable ASP.NET

(Reposted due to layout issues)

Previously in these series we saw:

All previous posts dealt with interacting between observable and IO Streams. Next in these series, we’re going to take a look at a place we could use these operations in a real server appication: ASP.NET.

IHttpHandler

ASP.NET provides a lot of functionality to make building webpages easy. This functionality can be grouped into different layers. At the bottom layer lies the technology that we’re going to focus in this post:
IHttpHandler.

IHttpHandler is the base interface that is used to expose request to the webserver to ASP.NET. It comes in two flavors: IHttpHandler and IHttpAsyncHandler.

The reason for having the second version is exactly as described in the beginning of these series: scaling a webserver doesn’t work when blocking a whole thread for a single response.
In most webserver scenarios, a request will face many delays that are not processor bound, e.g.:

  • Reading a file from (remote) disk
  • Accessing a database

As such, implementing IHttpAsyncHandler is often the best way to go.

Unfortunately, implementations of this interface can get very nasty. The interface itself follows the begin/end invoke pattern and when making multiple asynchronous calls as described above, coordinating these calls can get a nightmare.

Let’s see if we can improve on this pattern.

ObservableHttpHandler

At heart of this interface is the following set of operations:

  • For each request send to a specified URL of the webserver, a method is called that will be responsible for processing this request.
  • This method will receive an HttpContext object. This object provides information on the request and allows the method to send a response.
  • The method can spin up any amount of asynchronous work to process the request.
  • Once all asynchronous work is done and the response is sent, the method needs to signal it is done processing the request.

In the Rx world, this can be modeled as follows:

 

    public IObservable<Unit> ProcessRequestAsObservable(HttpContext context)

 

As ASP.NET doesn’t know about this pattern, we’ll have to convert between the Asynchronous pattern it knows about (IHttpAsyncHandler) and the pattern we prefer.

We’ll create an abstract base class that will do this work for us.

First, we’ll implement the IHttpAsyncHandler interface. We’ll do this explicitly so these methods don’t show up in our public API surface.

 

    public abstract class ObservableHttpHandler : IHttpAsyncHandler
    {
        public abstract IObservable<Unit> ProcessRequestAsObservable(HttpContext context);

        IAsyncResult IHttpAsyncHandler.BeginProcessRequest(HttpContext context,
AsyncCallback cb, object extraData)
        {
            ...
        }

        void IHttpAsyncHandler.EndProcessRequest(IAsyncResult result)
        {
            ...
        }

        bool IHttpHandler.IsReusable
        {
            get { ... }
        }

        void IHttpHandler.ProcessRequest(HttpContext context)
        {
            ...
        }
    }

 

Let’s see how to implement these methods. We’ll start with the methods that IHttpAsyncHandler inherits from IHttpHandler:

IsReusable: return true as we don’t plan to maintain state in this class.

ProcessRequest: the method that is called if the request has to be processed synchronously. We will forward to the asynchronous version and block until the work has completed:

 

    void IHttpHandler.ProcessRequest(HttpContext context)
    {
        var gate = new ManualResetEvent(false);
        AsyncCallback cb = (_ => gate.Set());
        ((IHttpAsyncHandler)this).BeginProcessRequest(context, cb, null);
        gate.WaitOne();
    }

AsyncResult

As the BeginProcessRequest method follows the begin/endinvoke pattern, we’ll have to return something that implements the IAsyncResult interface. So let’s start off with a simple implementation of this interface:

 

    private class SimpleAsyncResult : IAsyncResult
    {
        private readonly ManualResetEvent asyncWaitHandler;
        private Exception exception;

        public SimpleAsyncResult(object asyncState)
        {
            AsyncState = asyncState;
            asyncWaitHandler = new ManualResetEvent(false);
        }

        public object AsyncState
        {
            get; private set;
        }

        public WaitHandle AsyncWaitHandle
        {
            get { return asyncWaitHandler; }
        }

        public bool CompletedSynchronously
        {
            get { return false; }
        }

        public bool IsCompleted
        {
            get;
            private set;
        }

        internal void Success()
        {
            Complete();
        }

        internal void Fail(Exception exception)
        {
            this.exception = exception;
            Complete();
        }

        private void Complete()
        {
            IsCompleted = true;
            asyncWaitHandler.Set();
        }

        internal void CheckResult()
        {
         if (exception != null)
                throw exception;
        }
    }

 

SimpleAsyncResult will allow us to communicate to ASP.NET that the request has finished processing as well as pass along any exception that happened along the way.

 

UPDATE: After posting this post, my coworker Stephen Toub from the TPL team pointed out, that instead of implementing SimpleAsyncResult, we could have reused Task as it also implements IAsyncResult.
I’ll leave it up as an exercise for the reader to do this.

BeginProcessRequest

The first thing we’ll do in BeginProcessRequest is to create an instance of this new class:

 

    var asyncResult = new SimpleAsyncResult(extraData);

 

Next we need to know how to kick off the computation. We’ll call the ProcessRequestAsObservable method and store the observable:

 

    var observable = ProcessRequestAsObservable(context);

 

Now it is time to kick off the computation by subscribing to the observable. As the observable only contains Unit values, we only care about completion either success or failure:

 

    observable.Subscribe(
        _ => { },
        e=>
        {
            asyncResult.Fail(e);
            cb(asyncResult);
        },
        () =>
        {
            asyncResult.Success();
            cb(asyncResult);
        });

 

In both cases, we signal completion to the SimpleAsyncResult and call the callback that ASP.NET provided us. In the case of failure, we do provide the SimpleAsyncResult with the exception that occurred.
Finally we’ll return the SimpleAsyncResult object back to ASP.NET:

 

    return asyncResult;

EndProcessRequest

As the exception is now stuck as instance state in the SimpleAsyncResult instance, we’ll need to provide ASP.NET with a way to get the exception out of there. This is done by implementing the EndProcessRequest method:

 

    ((SimpleAsyncResult) result).CheckResult();

Complete Implementation

With that the complete implementation of ObservableHttpHandler is:

 

    public abstract class ObservableHttpHandler : IHttpAsyncHandler
    {
        public abstract IObservable<Unit> ProcessRequestAsObservable(HttpContext context);

        IAsyncResult IHttpAsyncHandler.BeginProcessRequest(HttpContext context,
AsyncCallback cb, object extraData)
        {
            var asyncResult = new SimpleAsyncResult(extraData);
            var observable = ProcessRequestAsObservable(context);
            observable.Subscribe(
                _ => { },
                e =>
                {
                    asyncResult.Fail(e);
                    cb(asyncResult);
                },
                () =>
                {
                    asyncResult.Success();
           cb(asyncResult);
                });
            return asyncResult;
        }

        void IHttpAsyncHandler.EndProcessRequest(IAsyncResult result)
        {
            ((SimpleAsyncResult)result).CheckResult();
        }

        bool IHttpHandler.IsReusable
        {
            get { return true; }
        }

        void IHttpHandler.ProcessRequest(HttpContext context)
        {
            var gate = new ManualResetEvent(false);
            AsyncCallback cb = (_ => gate.Set());
            ((IHttpAsyncHandler)this).BeginProcessequest(context, cb, null);
            gate.WaitOne();
        }

        private class SimpleAsyncResult : IAsyncResult
        {
            private readonly ManualResetEvent asyncWaitHandler;
         private Exception exception;

            public SimpleAsyncResult(object asyncState)
            {
                AsyncState = asyncState;
                asyncWaitHandler = new ManualResetEvent(false);
            }

            public object AsyncState
            {
                get;
                private set;
            }

            public WaitHandle AsyncWaitHandle
            {
                get { return asyncWaitHandler; }
            }

            public bool CompletedSynchronously
            {
                get { return false; }
            }

            public bool IsCompleted
            {
                get;
                private set;
            }

            internal void Success()
            {
                Complete();
            }

            internal void Fail(Exception exception)
            {
                this.exception = exception;
                Complete();
            }

            private void Complete()
            {
                IsCompleted = true;
                asyncWaitHandler.Set();
            }
 
internal void CheckResult()
            {
                if (exception != null)
                    throw exception;
            }
        }
    }

Usage

Now that the implementation is done, it is time to take a look how we would use this new base class:

Imagine an HttpHandler that will asynchronously read in a big movie file, e.g. in 1mb chunks and sends each chunk to the client asynchronously:

 

    public class BigVideoHandler : ObservableHttpHandler
    {
        public override IObservable<System.Unit> ProcessRequestAsObservable(HttpContext context)
        {
            var fs = new FileStream(@"d:\videos\video1.wmv", FileMode.Open,
FileAccess.Read, FileShare.Read, 2 << 19, true);
            context.Response.ContentType = "video/x-ms-wmv";
            return fs.AsyncRead(2 << 19).WriteToStream(context.Response.OutputStream);
        }
    }

 

NOTE: To use this HTTP Handler, it needs to be registered with the webserver. As this is different per webserver, please read this MSDN article on how to do so.

 

Even though many asynchronous operations are going on here, the code still looks very concise. The code inside ProcessRequestAsObservable can use any IObservable implementation and any of the Rx operators to orchestrate many asynchronous operations.

What’s next

The HttpHandler pattern is very powerful; it can be used to represent nearly any kind of HTTP request. As it is very low on the stack though, you’ll have to do a lot of manual work. Next in these series we’ll take a look how Rx can help out asynchronous programming in ASP.NET MVC.