Weekend Fun 01: Building Application Request Routing/ Reverse Proxy like components using OWIN


For the fun of it. Let us build an IIS ARR without IIS (there might be an actual requirements where you want to run this in Azure Web Sites where you don't have access to do that, or hosting in a worker role).

How can we build that? if you strip out all the fatty details what we need is the following

A pipeline that processes the request with the following stages (in order)

1-HTTP Listener

2- Few components that perform AuthN and AuthZ

3- A component that looks at the incoming request and decides where this request goes say request coming in are for http://domainA.com and it should go http://domainB.com (similar to IIS URL rewrite).

4- A component that takes the request and executes.

 

OWIN comes natural to this as it has the pipeline (and an HTTPlistener based on .NET frx HTTP Listener).

The below code is hosted in a console app:

 

Program Main

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    static void Main(string[] args)
        {
            string baseUrl = "http://localhost:5001/";

            using (var server = WebApp.Start<Startup>(new StartOptions(baseUrl)))
            {
                Console.WriteLine("Server Started... Press Enter to quit.");
                Console.ReadKey();
            }


        }

the above just runs the server

the Startup class sets up 2 components (the URL re-writer and the other is Request Router)

 

1
2
3
4
5
6
7
8
 public class Startup
    {
        public void Configuration(IAppBuilder app)
        {
            app.UseURLRewrite();
            app.UseRR();
        }
    }

 

Because this is fun project. I wanted to do more. Let us assume we want to implement 2 very simple pattern Broadcast (request goes to multiple servers) or First Responder (request goes to multiple servers returns when any responds).

so my URL rewrite does the following (OWIN invoke method). it randomly selects (based on current time second odd/even) modes (either broadcast or first responder) setting list of URLs and current mode in OWIN environment dictionary

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
   public async Task Invoke(IDictionary<string, object> env)
        {
            var response = env["owin.ResponseBody"] as Stream;

   
            List<string> TargetUrls = new List<string>();


            //current code does't handle GZIP 
            // BBC is not using it. other site will give octet/stream response time out of the proxy.

            if(DateTime.Now.Second %2 > 0 )
            {

                // scatter and gather approach
                TargetUrls .Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");
                TargetUrls.Add("http://www.bbc.com");

                env.Add("rr.Mode", "ALL");
                Debug.WriteLine("routed first case");


            }
            else
            {
                env.Add("rr.Mode", "ANY");
                Debug.WriteLine("routed second case");
                TargetUrls.Add("http://www.bbc.com");
            }

            env.Add("rr.targetURLs", TargetUrls);
            await _next.Invoke(env);
        }

 

My Request router performs the following:

1- copies original request stream

2- prepares an HTTPRequest/HTTPClient instance for each URL.

3- copies original request headers

4- execute the request

5- waits for the results (either All or Any).

6- copies response headers back to the down steam header.

7- copies the response body back into the down stream body.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
 public async Task Invoke(IDictionary<string, object> env)
    {
        
        var Tasks = new List<Task<HttpResponseMessage>>();
        var outStream = new MemoryStream();
        Task tCopy = null;
        var inStream = env["owin.RequestBody"] as Stream;
        string rrmode = env["rr.Mode"] as string;
        var Clients = new List<HttpClient>();
        List<string> sTargetURs = env["rr.targetURLs"] as List<string>; 




        if(null != inStream)
            tCopy =  inStream.CopyToAsync(outStream);
        else
            tCopy = Task.FromResult(0);






        foreach (var s in sTargetURs)
        {
            var client = new System.Net.Http.HttpClient();

            
               // copy headers
                foreach (var header in env["owin.RequestHeaders"] as IDictionary<string, string[]>)
                  client.DefaultRequestHeaders.Add(header.Key, header.Value);

                // copy body
                   HttpRequestMessage request = new HttpRequestMessage();
                   await tCopy; // finish copy

                   var sMethod = env["owin.RequestMethod"] as string;

                    
                 // set body - not all types take a body
                if("get" != sMethod.ToLower())
                   request.Content  = new StreamContent(outStream);
   
                
                 //set method
                 request.Method = new HttpMethod( sMethod);

                var sPath = (env["owin.RequestPath"] as string)  ?? string.Empty;
                var sQueryString =   (env["owin.RequestQueryString"] as string)  ?? string.Empty;
            
                if (string.Empty != sQueryString)
                    sQueryString = "?" + sQueryString ;

                request.RequestUri = new Uri(s + sPath + sQueryString);
                
                // add it to our bag // you should track cancellation tokens as well. 
         
                Tasks.Add(client.SendAsync(request));
                Clients.Add(client);
        }

          HttpResponseMessage selectedResponse = null;// 🙁
          HttpStatusCode _responseCode = HttpStatusCode.InternalServerError;
          HttpResponseHeaders _responseHeaders = null;
          HttpContent _responseContent = null;
          string _responseStatusMessage = "Proxy!"; // obviously w are not using RFC - review spec: RFC 2616 section 6.1.1


        // wait for response
          try
          {


              if (rrmode == "ALL")
              {
                  Debug.WriteLine(string.Format("waiting for - ALL - {0} requests", sTargetURs.Count));
                  var completedTasks = await Task.WhenAll<HttpResponseMessage>(Tasks);
                  Debug.WriteLine(string.Format("{0} requests completed..", sTargetURs.Count));
                  // you can do fun stuff here like compsing a response, concating it etc..
                  
                  selectedResponse = completedTasks[0];


              }
              else
              {
                  Debug.WriteLine(string.Format("waiting for - ANY - {0} requests", sTargetURs.Count));
                  
                  // this could be "any" or "Single" mode
                  var completedTask = await Task.WhenAny<HttpResponseMessage>(Tasks);
                  
                  // you should keep a list of cancelation tokens. and cancel all once one returned. 
                  // in this example all tasks will keep in running and return to nothing 
                  
                  
                  string.Format("1 request completed..");
                  selectedResponse = await completedTask;
              }
              selectedResponse.EnsureSuccessStatusCode();

              _responseCode = selectedResponse.StatusCode;
              _responseHeaders = selectedResponse.Headers;
              _responseContent = selectedResponse.Content;

          }
          catch (AggregateException ae)
          {
                // log ae
              _responseStatusMessage = "error executing the request: " + ae.Flatten().Message;
             
          }
          catch (Exception e) // we are assuming since we are here we actually executed a request and we got a response
          {
              //log e
              _responseStatusMessage = "general internal error" + e.Message;
          
          }
        finally
          {
              // release all clients
              Clients.ForEach(c => c.Dispose());
          }
          


          await setResponse(env,
                       _responseCode,
                        _responseHeaders,
                        _responseContent,
                        _responseStatusMessage);

          await _next.Invoke(env);

         
        }

 

While this is FUN project there is a lot of applications to it. Typically with high-scale write operations are based on eventual consistency using platforms such as Event Hubs/Service Bus/CQRS or databases (name your favorite eventual consistency DB here).  Read operations however specially those across multiple shards/systems are harder to implement. My favorite implementation for this a command broker. where the client (imagine SPA page) sends one request that this pipeline intercepts and expand into multiple sub commands each is targeting a WebAPI/System.

The code is attached to this past. hope you had a fun week end

Fun.ARR.zip

Skip to main content