Async Streaming in ASP.NET Web API

ASP.NET Web API supports asynchronous actions using the new Task-based programming model introduced in .NET 4. In .NET 4.5 the model has improved further and in addition is supported directly from C# using the new async and await keywords (see “Visual Studio Asynchronous Programming” for background).

What this means is that you can have actions that look like this:

    1: public Task<string[]> GetContent(string topic)
    2: {
    3:     ...
    4: }

as well as

    1: public Task<HttpResponseMessage> GetContent(string topic)
    2: {
    3:     ...
    4: }

and even this

    1: public Task GetContent(string topic)
    2: {
    3:     ...
    4: }

The benefit of writing Task-based actions is that you don’t block a thread on the server even if things take a long time to complete and that allows the server to scale better and be more responsive.

In the blog “Push and Pull Streams using HttpClient” we described how to use push and pull streams with HttpContent. As HttpRequestMessage, HttpResponseMesssage, and HttpContent apply to both client side (as part of HttpClient) as well as server side (as part of ASP.NET Web API) the push vs. pull model described in that blog applies equally well to the server side.

In this blog we expand on the topic of reading request content using asynchronous streaming within an action.

Reading Request Body Asynchronously

In the first scenario we implement a POST action that takes a JSON entity body and writes it to another stream. Note that the action returns Task<HttpResponseMessage> which means that the action completes when the task completes. The important part of this action is line 11 where we copy the content from the request to another stream (we describe the SummaryStream below):

    1: public Task<HttpResponseMessage> PostContent()
    2: {
    3:     // Verify that this is JSON content
    4:     if (!Request.Content.IsJsonContent())
    5:     {
    6:         throw new HttpResponseException(HttpStatusCode.UnsupportedMediaType);
    7:     }
    8:  
    9:     // Copy content asynchronously to our SummaryStream and return the continuation task
   10:     SummaryStream summaryStream = new SummaryStream();
   11:     return Request.Content.CopyToAsync(summaryStream).ContinueWith(
   12:         (readTask) =>
   13:         {
   14:             summaryStream.Close();
   15:  
   16:             // Check whether we completed successfully and generate response
   17:             if (readTask.IsCompleted)
   18:             {
   19:                 return new HttpResponseMessage(HttpStatusCode.OK);
   20:             }
   21:             else
   22:             {
   23:                 return new HttpResponseMessage(HttpStatusCode.BadRequest);
   24:             }
   25:         });
   26: }

This is essentially all you have to do – the rest is just filler but if you want to dive in a little more then keep going! The first thing the action does is checking that the content is actually JSON. To do this we use a little extension method that we have added for this purpose:

    1: /// <summary>
    2: /// Determines whether the specified content has an JSON media type of either 
    3: /// <c>application/json</c> or <c>text/json</c>. 
    4: /// </summary>
    5: /// <param name="content">The content.</param>
    6: /// <returns>
    7: /// <c>true</c> if the specified content is XML with a media type of either <c>application/json</c> or <c>text/json</c>.
    8: /// </returns>
    9: public static bool IsJsonContent(this HttpContent content)
   10: {
   11:     if (content == null)
   12:     {
   13:         throw new ArgumentNullException("content");
   14:     }
   15:  
   16:     if (content.Headers == null || content.Headers.ContentType == null)
   17:     {
   18:         return false;
   19:     }
   20:  
   21:     MediaTypeHeaderValue contentType = content.Headers.ContentType;
   22:     return
   23:         contentType.MediaType.Equals("application/json", StringComparison.OrdinalIgnoreCase) ||
   24:         contentType.MediaType.Equals("text/json", StringComparison.OrdinalIgnoreCase);
   25: }

To  see what is going on when reading the content we create a special stream called SummaryStream that simply indicates when it gets called by writing to the console. You don’t have to use this but if you want to try it out then it looks like this:

    1: /// <summary>
    2: /// Sample stream that write how many characters were written to it.
    3: /// </summary>
    4: internal class SummaryStream : DelegatingStream
    5: {
    6:     public SummaryStream()
    7:         : base(Stream.Null)
    8:     {
    9:     }
   10:  
   11:     public override void Flush()
   12:     {
   13:         Console.WriteLine("Flushing stream...");
   14:         _innerStream.Flush();
   15:     }
   16:  
   17:     public override void Write(byte[] buffer, int offset, int count)
   18:     {
   19:         Console.WriteLine("Writing {0} bytes SYNChronously...", count);
   20:         _innerStream.Write(buffer, offset, count);
   21:     }
   22:  
   23:     public override void WriteByte(byte value)
   24:     {
   25:         Console.WriteLine("Writing a single byte SYNChronously...");
   26:         _innerStream.WriteByte(value);
   27:     }
   28:  
   29:     public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
   30:     {
   31:         Console.WriteLine("Writing {0} bytes ASYNChronously...", count);
   32:         return _innerStream.BeginWrite(buffer, offset, count, callback, state);
   33:     }
   34:  
   35:     public override void Close()
   36:     {
   37:         Console.WriteLine("Closing stream...");
   38:         base.Close();
   39:     }
   40: }

The DelegatingStream which SummaryStream derives from is nothing more than delegating every call to the inner stream. It looks like this:

    1: /// <summary>
    2: /// Utility <see cref="Stream"/> which delegates everything to an inner <see cref="Stream"/>
    3: /// </summary>
    4: internal abstract class DelegatingStream : Stream
    5: {
    6:     protected Stream _innerStream;
    7:  
    8:     protected DelegatingStream(Stream innerStream)
    9:     {
   10:         Contract.Assert(innerStream != null);
   11:         _innerStream = innerStream;
   12:     }
   13:  
   14:     public override bool CanRead
   15:     {
   16:         get { return _innerStream.CanRead; }
   17:     }
   18:  
   19:     public override bool CanSeek
   20:     {
   21:         get { return _innerStream.CanSeek; }
   22:     }
   23:  
   24:     public override bool CanWrite
   25:     {
   26:         get { return _innerStream.CanWrite; }
   27:     }
   28:  
   29:     public override long Length
   30:     {
   31:         get { return _innerStream.Length; }
   32:     }
   33:  
   34:     public override long Position
   35:     {
   36:         get { return _innerStream.Position; }
   37:         set { _innerStream.Position = value; }
   38:     }
   39:  
   40:     public override int ReadTimeout
   41:     {
   42:         get { return _innerStream.ReadTimeout; }
   43:         set { _innerStream.ReadTimeout = value; }
   44:     }
   45:  
   46:     public override bool CanTimeout
   47:     {
   48:         get { return _innerStream.CanTimeout; }
   49:     }
   50:  
   51:     public override int WriteTimeout
   52:     {
   53:         get { return _innerStream.WriteTimeout; }
   54:         set { _innerStream.WriteTimeout = value; }
   55:     }
   56:  
   57:     protected override void Dispose(bool disposing)
   58:     {
   59:         if (disposing)
   60:         {
   61:             _innerStream.Dispose();
   62:         }
   63:         base.Dispose(disposing);
   64:     }
   65:  
   66:     public override long Seek(long offset, SeekOrigin origin)
   67:     {
   68:         return _innerStream.Seek(offset, origin);
   69:     }
   70:  
   71:     public override int Read(byte[] buffer, int offset, int count)
   72:     {
   73:         return _innerStream.Read(buffer, offset, count);
   74:     }
   75:  
   76:     public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
   77:     {
   78:         return _innerStream.BeginRead(buffer, offset, count, callback, state);
   79:     }
   80:  
   81:     public override int EndRead(IAsyncResult asyncResult)
   82:     {
   83:         return _innerStream.EndRead(asyncResult);
   84:     }
   85:  
   86:     public override int ReadByte()
   87:     {
   88:         return _innerStream.ReadByte();
   89:     }
   90:  
   91:     public override void Flush()
   92:     {
   93:         _innerStream.Flush();
   94:     }
   95:  
   96:     public override void SetLength(long value)
   97:     {
   98:         _innerStream.SetLength(value);
   99:     }
  100:  
  101:     public override void Write(byte[] buffer, int offset, int count)
  102:     {
  103:         _innerStream.Write(buffer, offset, count);
  104:     }
  105:  
  106:     public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  107:     {
  108:         return _innerStream.BeginWrite(buffer, offset, count, callback, state);
  109:     }
  110:  
  111:     public override void EndWrite(IAsyncResult asyncResult)
  112:     {
  113:         _innerStream.EndWrite(asyncResult);
  114:     }
  115:  
  116:     public override void WriteByte(byte value)
  117:     {
  118:         _innerStream.WriteByte(value);
  119:     }
  120: }

Trying It Out

To run the sample we start it in self host as usual.

Note: By default the max message size in self host is 64k so to accept larger request sizes we set it to 1M and also turn on streaming so that we don’t buffer the whole thing in memory at any point in time.

    1: static void Main(string[] args)
    2: {
    3:     var baseAddress = "https://localhost:8080/";
    4:     HttpSelfHostServer server = null;
    5:  
    6:     try
    7:     {
    8:         // Create configuration
    9:         var config = new HttpSelfHostConfiguration(baseAddress);
   10:  
   11:         // Set the max message size to 1M instead of the default size of 64k and also
   12:         // set the transfer mode to 'streamed' so that don't allocate a 1M buffer but 
   13:         // rather just have a small read buffer.
   14:         config.MaxReceivedMessageSize = 1024 * 1024;
   15:         config.TransferMode = TransferMode.Streamed;
   16:  
   17:         // Add a route
   18:         config.Routes.MapHttpRoute(
   19:           name: "default",
   20:           routeTemplate: "api/{controller}/{id}",
   21:           defaults: new { controller = "Home", id = RouteParameter.Optional });
   22:  
   23:         server = new HttpSelfHostServer(config);
   24:  
   25:         server.OpenAsync().Wait();
   26:  
   27:         Console.WriteLine("Hit ENTER to exit");
   28:         Console.ReadLine();
   29:     }
   30:     finally
   31:     {
   32:         if (server != null)
   33:         {
   34:             server.CloseAsync().Wait();
   35:         }
   36:     }
   37: }

We then use Fiddler to submit an HTTP POST request with an JSON request body. That is, we open the Composer tab and fill in a POST request with whatever JSON content you want to submit:

SaveJson

The resulting output to the console shows that the data is written asynchronously to the SummaryStream:

SummaryStreamOutput

Have fun!

Henrik

del.icio.us Tags: asp.net,webapi,mvc,rest,http