Async Streaming in ASP.NET Web API


ASP.NET Web API supports asynchronous actions using the new Task-based programming model introduced in .NET 4. In .NET 4.5 the model has improved further and in addition is supported directly from C# using the new async and await keywords (see “Visual Studio Asynchronous Programming” for background).

What this means is that you can have actions that look like this:

   1: public Task<string[]> GetContent(string topic)
   2: {
   3:     ...
   4: }

as well as

   1: public Task<HttpResponseMessage> GetContent(string topic)
   2: {
   3:     ...
   4: }

and even this

   1: public Task GetContent(string topic)
   2: {
   3:     ...
   4: }

The benefit of writing Task-based actions is that you don’t block a thread on the server even if things take a long time to complete and that allows the server to scale better and be more responsive.

In the blog “Push and Pull Streams using HttpClient” we described how to use push and pull streams with HttpContent. As HttpRequestMessage, HttpResponseMesssage, and HttpContent apply to both client side (as part of HttpClient) as well as server side (as part of ASP.NET Web API) the push vs. pull model described in that blog applies equally well to the server side.

In this blog we expand on the topic of reading request content using asynchronous streaming within an action.

Reading Request Body Asynchronously

In the first scenario we implement a POST action that takes a JSON entity body and writes it to another stream. Note that the action returns Task<HttpResponseMessage> which means that the action completes when the task completes. The important part of this action is line 11 where we copy the content from the request to another stream (we describe the SummaryStream below):

   1: public Task<HttpResponseMessage> PostContent()
   2: {
   3:     // Verify that this is JSON content
   4:     if (!Request.Content.IsJsonContent())
   5:     {
   6:         throw new HttpResponseException(HttpStatusCode.UnsupportedMediaType);
   7:     }
   8:  
   9:     // Copy content asynchronously to our SummaryStream and return the continuation task
  10:     SummaryStream summaryStream = new SummaryStream();
  11:     return Request.Content.CopyToAsync(summaryStream).ContinueWith(
  12:         (readTask) =>
  13:         {
  14:             summaryStream.Close();
  15:  
  16:             // Check whether we completed successfully and generate response
  17:             if (readTask.IsCompleted)
  18:             {
  19:                 return new HttpResponseMessage(HttpStatusCode.OK);
  20:             }
  21:             else
  22:             {
  23:                 return new HttpResponseMessage(HttpStatusCode.BadRequest);
  24:             }
  25:         });
  26: }

This is essentially all you have to do – the rest is just filler but if you want to dive in a little more then keep going! The first thing the action does is checking that the content is actually JSON. To do this we use a little extension method that we have added for this purpose:

   1: /// <summary>
   2: /// Determines whether the specified content has an JSON media type of either 
   3: /// <c>application/json</c> or <c>text/json</c>. 
   4: /// </summary>
   5: /// <param name="content">The content.</param>
   6: /// <returns>
   7: /// <c>true</c> if the specified content is XML with a media type of either <c>application/json</c> or <c>text/json</c>.
   8: /// </returns>
   9: public static bool IsJsonContent(this HttpContent content)
  10: {
  11:     if (content == null)
  12:     {
  13:         throw new ArgumentNullException("content");
  14:     }
  15:  
  16:     if (content.Headers == null || content.Headers.ContentType == null)
  17:     {
  18:         return false;
  19:     }
  20:  
  21:     MediaTypeHeaderValue contentType = content.Headers.ContentType;
  22:     return
  23:         contentType.MediaType.Equals("application/json", StringComparison.OrdinalIgnoreCase) ||
  24:         contentType.MediaType.Equals("text/json", StringComparison.OrdinalIgnoreCase);
  25: }

To  see what is going on when reading the content we create a special stream called SummaryStream that simply indicates when it gets called by writing to the console. You don’t have to use this but if you want to try it out then it looks like this:

   1: /// <summary>
   2: /// Sample stream that write how many characters were written to it.
   3: /// </summary>
   4: internal class SummaryStream : DelegatingStream
   5: {
   6:     public SummaryStream()
   7:         : base(Stream.Null)
   8:     {
   9:     }
  10:  
  11:     public override void Flush()
  12:     {
  13:         Console.WriteLine("Flushing stream...");
  14:         _innerStream.Flush();
  15:     }
  16:  
  17:     public override void Write(byte[] buffer, int offset, int count)
  18:     {
  19:         Console.WriteLine("Writing {0} bytes SYNChronously...", count);
  20:         _innerStream.Write(buffer, offset, count);
  21:     }
  22:  
  23:     public override void WriteByte(byte value)
  24:     {
  25:         Console.WriteLine("Writing a single byte SYNChronously...");
  26:         _innerStream.WriteByte(value);
  27:     }
  28:  
  29:     public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  30:     {
  31:         Console.WriteLine("Writing {0} bytes ASYNChronously...", count);
  32:         return _innerStream.BeginWrite(buffer, offset, count, callback, state);
  33:     }
  34:  
  35:     public override void Close()
  36:     {
  37:         Console.WriteLine("Closing stream...");
  38:         base.Close();
  39:     }
  40: }

The DelegatingStream which SummaryStream derives from is nothing more than delegating every call to the inner stream. It looks like this:

   1: /// <summary>
   2: /// Utility <see cref="Stream"/> which delegates everything to an inner <see cref="Stream"/>
   3: /// </summary>
   4: internal abstract class DelegatingStream : Stream
   5: {
   6:     protected Stream _innerStream;
   7:  
   8:     protected DelegatingStream(Stream innerStream)
   9:     {
  10:         Contract.Assert(innerStream != null);
  11:         _innerStream = innerStream;
  12:     }
  13:  
  14:     public override bool CanRead
  15:     {
  16:         get { return _innerStream.CanRead; }
  17:     }
  18:  
  19:     public override bool CanSeek
  20:     {
  21:         get { return _innerStream.CanSeek; }
  22:     }
  23:  
  24:     public override bool CanWrite
  25:     {
  26:         get { return _innerStream.CanWrite; }
  27:     }
  28:  
  29:     public override long Length
  30:     {
  31:         get { return _innerStream.Length; }
  32:     }
  33:  
  34:     public override long Position
  35:     {
  36:         get { return _innerStream.Position; }
  37:         set { _innerStream.Position = value; }
  38:     }
  39:  
  40:     public override int ReadTimeout
  41:     {
  42:         get { return _innerStream.ReadTimeout; }
  43:         set { _innerStream.ReadTimeout = value; }
  44:     }
  45:  
  46:     public override bool CanTimeout
  47:     {
  48:         get { return _innerStream.CanTimeout; }
  49:     }
  50:  
  51:     public override int WriteTimeout
  52:     {
  53:         get { return _innerStream.WriteTimeout; }
  54:         set { _innerStream.WriteTimeout = value; }
  55:     }
  56:  
  57:     protected override void Dispose(bool disposing)
  58:     {
  59:         if (disposing)
  60:         {
  61:             _innerStream.Dispose();
  62:         }
  63:         base.Dispose(disposing);
  64:     }
  65:  
  66:     public override long Seek(long offset, SeekOrigin origin)
  67:     {
  68:         return _innerStream.Seek(offset, origin);
  69:     }
  70:  
  71:     public override int Read(byte[] buffer, int offset, int count)
  72:     {
  73:         return _innerStream.Read(buffer, offset, count);
  74:     }
  75:  
  76:     public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  77:     {
  78:         return _innerStream.BeginRead(buffer, offset, count, callback, state);
  79:     }
  80:  
  81:     public override int EndRead(IAsyncResult asyncResult)
  82:     {
  83:         return _innerStream.EndRead(asyncResult);
  84:     }
  85:  
  86:     public override int ReadByte()
  87:     {
  88:         return _innerStream.ReadByte();
  89:     }
  90:  
  91:     public override void Flush()
  92:     {
  93:         _innerStream.Flush();
  94:     }
  95:  
  96:     public override void SetLength(long value)
  97:     {
  98:         _innerStream.SetLength(value);
  99:     }
 100:  
 101:     public override void Write(byte[] buffer, int offset, int count)
 102:     {
 103:         _innerStream.Write(buffer, offset, count);
 104:     }
 105:  
 106:     public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
 107:     {
 108:         return _innerStream.BeginWrite(buffer, offset, count, callback, state);
 109:     }
 110:  
 111:     public override void EndWrite(IAsyncResult asyncResult)
 112:     {
 113:         _innerStream.EndWrite(asyncResult);
 114:     }
 115:  
 116:     public override void WriteByte(byte value)
 117:     {
 118:         _innerStream.WriteByte(value);
 119:     }
 120: }

Trying It Out

To run the sample we start it in self host as usual.

Note: By default the max message size in self host is 64k so to accept larger request sizes we set it to 1M and also turn on streaming so that we don’t buffer the whole thing in memory at any point in time.

   1: static void Main(string[] args)
   2: {
   3:     var baseAddress = "http://localhost:8080/";
   4:     HttpSelfHostServer server = null;
   5:  
   6:     try
   7:     {
   8:         // Create configuration
   9:         var config = new HttpSelfHostConfiguration(baseAddress);
  10:  
  11:         // Set the max message size to 1M instead of the default size of 64k and also
  12:         // set the transfer mode to 'streamed' so that don't allocate a 1M buffer but 
  13:         // rather just have a small read buffer.
  14:         config.MaxReceivedMessageSize = 1024 * 1024;
  15:         config.TransferMode = TransferMode.Streamed;
  16:  
  17:         // Add a route
  18:         config.Routes.MapHttpRoute(
  19:           name: "default",
  20:           routeTemplate: "api/{controller}/{id}",
  21:           defaults: new { controller = "Home", id = RouteParameter.Optional });
  22:  
  23:         server = new HttpSelfHostServer(config);
  24:  
  25:         server.OpenAsync().Wait();
  26:  
  27:         Console.WriteLine("Hit ENTER to exit");
  28:         Console.ReadLine();
  29:     }
  30:     finally
  31:     {
  32:         if (server != null)
  33:         {
  34:             server.CloseAsync().Wait();
  35:         }
  36:     }
  37: }

We then use Fiddler to submit an HTTP POST request with an JSON request body. That is, we open the Composer tab and fill in a POST request with whatever JSON content you want to submit:

SaveJson

The resulting output to the console shows that the data is written asynchronously to the SummaryStream:

SummaryStreamOutput

Have fun!

Henrik

del.icio.us Tags: ,,,,
Comments (9)

  1. James Manning says:

    Small typo? I'm assuming you meant HttpClient?

    In the previous blog “Push and Pull Streams using HttpClient” we described how to use push and pull streams with **HttpContent**

  2. Thanks for asking — I did mean HttpContent because it applies to both client and server side. HttpRequestMessage, HttpResponseMessage, HttpContent, etc. sit on both the client side (as part of HttpClient) and on the server side (as part of ASP.NET Web API). That is, the way you can do push on client side works identical on server side.

    Hope this helps,

    Henrik

  3. Radenko Zec says:

    Do you recommend using Tasks for every action in WebApi to improve scalability or just for long running actions ?

  4. A good rule of thumb is whether the action does any I/O (e.g. reading or writing to the network). For example, if an action talks to a backend, performs some form of aggregation of other services, or reads data from the request, then using Tasks is a good idea.

    If this is not the case then as a general rule not using tasks is fine. There are of course exceptions to this rule such as actions doing parallelizable data crunching like ray tracing but that is rare in my experience.

  5. Radenko Zec says:

    What do you mean by talks to backend ? For example what do you recommend if my action just read data from Sql Server database based on some passed parameters (simple select statement with where) and returns JSON result ? Is it smart to use tasks here or not ?

  6. Aliostad says:

    Just reading the headline I assumed this is to do with HTTP chunked encoding and I was surprised that the client side of it is done since AFAIK IIS does not support chunked encoding from the client. Is that statement still true?

    Also I am wondering, how would be define that in a response? Do I need to set a property to chunked encoding?

    Also would these Task<T>s and Tasks use the default scheduler – hence suffocate the ThreadPool threads? And as such, should we set a higher value for Min and Max of ThreadPool threads?

    Sorry this is already three questions? Thanks 🙂

  7. Nice Example, which i am looking for.

  8. Jack Li says:

    Thanks for the explanation !!

    I do have one question, you said :

    " The benefit of writing Task-based actions is that you don’t block a thread on the server "

    But Web Api is already multi-threaded, why do I need to worry about blocking a thread ? Even if my Action is doing I/O to the backend, we still need to dedicate a thread for it for as long as it takes to complete.

    Am I missing anything ?

  9. Eric M. says:

    BadRequest is never the right status code to return when a failure happens on the server. InternalServerError is more appropriate. BadRequest is reserved for incorrect use of an endpoint in the form of invalid formatting or parameters.

Skip to main content