Event-driven http server in C# with Rx and HttpListener

Pretty big name? ha? In this post I will show you an alternative approach to build a simple event-driven http server in c# using the full power of Reactive Extensions.

Introduction

I am not good for explanations so I’ll quote this very interesting article from Dan York on node.js even driven model;

The “traditional” mode of web servers[1] has always been one of the thread-based model. You launch Apache or any other web server and it starts receiving connections. When it receives a connection, it holds that connection open until it has performed the request for the page or whatever other transaction was sent. If it make take a few microseconds to retrieve a page from disk or write results to a database, the web server is blocking on that input/output operation. (This is referred to as “blocking I/O“.) To scale this type of web server, you need to launch additional copies of the server (referred to as “thread-based” because each copy typically requires another operating system thread).

In contrast, Node.js uses an event-driven model where the web server accepts the request, spins it off to be handled, and then goes on to service the next web request. When the original request is completed, it gets back in the processing queue and when it reaches the front of the queue the results are sent back (or whatever the next action is). This model is highly efficient and scalable because the web server is basically always accepting requests because it’s not waiting for any read or write operations. (This is referred to as “non-blocking I/O” or “event-driven I/O“.)

What is happening in .net?

There are lot of things happening around this in the .Net ecosystem:

An alternative approach

Using the HttpListener class and Reactive Extensions we can build something like this:

public class HttpServer : IObservable<RequestContext>, IDisposable
{
    private readonly HttpListener listener;
    private readonly IObservable<RequestContext> stream;

    public HttpServer(string url)
    {
        listener = new HttpListener();
        listener.Prefixes.Add(url);
        listener.Start();
        stream = ObservableHttpContext();
    }

    private IObservable<RequestContext> ObservableHttpContext()
    {
        return Observable.Create<RequestContext>(obs =>
                            Observable.FromAsyncPattern<HttpListenerContext>(listener.BeginGetContext,
                                                                             listener.EndGetContext)()
                                      .Select(c => new RequestContext(c.Request, c.Response))
                                      .Subscribe(obs))
                         .Repeat()
                         .Retry()
                         .Publish()
                         .RefCount();
    }
    public void Dispose()
    {
        listener.Stop();
    }

    public IDisposable Subscribe(IObserver<RequestContext> observer)
    {
        return stream.Subscribe(observer);
    }
}

Some notes about this code:

  • FromAsyncPattern is a nice methods that comes with Rx. This method convert a Begin/End signature to an IObservable
  • RequestContext is a tiny wrapper around HttpListener stuff. I am not going to copy the code her but you can have a look at the full source code later.
  • Repeat: If you have seen before some usages of httplistener, for sure you saw the code inside a while loop. This is the same.
  • Retry: If we get an error …. retry.
  • Publish/Refcount: this help us to create “warm” observables from a “cold” observable. It behaves somehow as a “hot” observable. You can read more here and here.

An example use case

You can build any type of web application based in this concept. The “hello world” will be something like this:

static void Main()
{
        //a stream os messages
        var subject = new Subject<string>();

        using(var server = new HttpServer("http://*:5555/"))
        {
            var handler = server.Where(ctx => ctx.Request.Url.EndsWith("/hello"))
                  .Subscribe(ctx => ctx.Respond(new StringResponse("world")));

            Console.ReadLine();
            handler.Dispose();
        }    
}

The recommendation is that anything you do should be asynchronous. For instance, if you access to a database it must be an asynchronous operation and you must chain the callbacks/observables/Tasks etc.

There is one more interesting use case that I would like to share, is a technique called long polling:

Long polling is a variation of the traditional polling technique and allows emulation of an information push from a server to a client. With long polling, the client requests information from the server in a similar way to a normal poll. However, if the server does not have any information available for the client, instead of sending an empty response, the server holds the request and waits for some information to be available.

So, here is the simplest example of long polling that I can think of using the afore mentioned code:

class Program
{
    static void Main()
    {
        //a stream os messages
        var subject = new Subject<string>();

        using(var server = new HttpServer("http://*:5555/"))
        {
            //the listeners stream and subscription
            var listeners = server
                    .Where(ctx => ctx.Request.HttpMethod == "GET")
                    .Subscribe(ctx => subject.Take(1) //wait the next message to end the request
                                             .Subscribe(m => ctx.Respond(new StringResponse(m))));

            //the publishing stream and subscrition
            var publisher = server
                .Where(ctx => ctx.Request.HttpMethod == "POST")
                .Subscribe(ctx => ctx.Request.InputStream.ReadBytes(ctx.Request.ContentLength)
                                      .Subscribe(bts =>
                                         {
                                           ctx.Respond(new EmptyResponse(201));
                                           subject.OnNext(Encoding.UTF8.GetString(bts));
                                         }));


            Console.ReadLine();

            listeners.Dispose();
            publisher.Dispose();

        }
    }
}

As you can see, we are chaining observables to do the work... So there isn't any blocking operation. Even reading from a stream is an asynchronous operation.

Want to see the code working?

there is a tiny full screen button in the botton

Unable to display content. Adobe Flash is required.

Finally the code is published here opensource if you want to take this one step further or simple study.

Special thanks to Gustavo Machado, Silvio Massari and the Nancy frameworks guys for the advices and for the bits of code that I have stole them.


blog comments powered by Disqus
  • Categories

  • Archives