Long polling chat with Anna

This is an example chat built on top of Anna. Anna is an evolution of what I wrote in this post and it is in my github (and nuget) if you want to have a look.

Today, I have read this very nice post about Manos de Mono and decide to borrow the javascript files and the idea to Tarn Barford  (I hope he don’t mind).

The Html page looks like this:

<!DOCTYPE HTML>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Anna Chat</title>
    <script type="text/javascript" 
            src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.2/jquery.min.js">
    </script>
    <script type="text/javascript" src="/app.js"></script>
</head>
<body>
    <h1>Chat</h1>
    <input type="text" id="message" name="message" />
    <input type="submit" id="send-message" name="Send" />
    <ul id="messages">
    </ul>
</body>
</html>

And the javascript file "app.js" as follows:

$(function () {

    var showMessage = function (message) {
        $('#messages').prepend($('<li/>').append(message));
    };

    var getMessages = function () {
        $.ajax({
            type: 'POST',
            url: '/wait',
            data: 'data=none',
            success: function (data) {
                showMessage(data);
            },
            complete: function (q, s) {
                getMessages();
            }
        });
    };

    var sendMessage = function (message) {
        $.ajax({
            type: 'POST',
            url: '/send',
            data: message,
            contentType: "text/plain",
            success: function (data) {
            },
            error: function (d, m, et) {
                alert(m + ": " + et);
            }
        });
    };

    $('#send-message').click(function (e) {
        e.preventDefault();
        var m = $('#message').val();
        sendMessage(m);
    });

    getMessages();

});

The first version of my code is:

using System;
using System.Collections.Generic;
using System.IO;
using System.Reactive.Concurrency;
using System.Threading;
using Anna;
using Anna.Request;
using Anna.Responses;

namespace LongPollingChat
{
    class Program
    {
        static void Main()
        {
            var eventLoop = new EventLoopScheduler();
            var waiting = new Queue<RequestContext>();

            using (var requests = new HttpServer("http://127.0.0.1:987/", eventLoop))
            {
                requests.GET("app.js")
                    .Subscribe(r => r.Respond(new StaticFileResponse("app.js")));

                requests.GET("index.html")
                    .Subscribe(r => r.Respond(new StaticFileResponse("index.html")));

                requests.POST("wait")
                    .Subscribe(waiting.Enqueue);

                requests.POST("send")
                    .Subscribe(r => {
                              string message;
                              using(var sr = new StreamReader(r.Request.InputStream))
                              {
                                  message = sr.ReadToEnd();
                              }

                              r.Respond(201);

                              while (waiting.Count > 0)
                              {
                                  waiting.Dequeue().Respond(new StringResponse(message));
                              }
                    });

                Console.ReadLine();
            }
        }
    }
}

Notice:

  • Everything is happening in the same thread because the EventLoopScheduler from Rx. The same way that node.js and Manos de Mono does.
  • The StaticFileResponse is an async operation. It is async when reading from the disk and it is async to write to the stream.
  • Requests is an observable, and the GET and POST methods are just extension methods that filter the observable based on URI matching and Http METHOD.
  • I am responding with a 201 to the POST method before pushing to all clients. I don’t need to make the POST to wait.

A more functional way

Even if the above code works, I think is still very imperative…. So, follow my reasoning and think that the POST is just an stream of messages, it is an IObservable<string> so we can do something like this:

static void Main()
{
    var eventLoop = new EventLoopScheduler();
    
    using (var requests = new HttpServer("http://127.0.0.1:987/", eventLoop))
    {
        requests.GET("app.js")
            .Subscribe(r => r.Respond(new StaticFileResponse("app.js")));

        requests.GET("index.html")
            .Subscribe(r => r.Respond(new StaticFileResponse("index.html")));

        var messageStream = requests.POST("send")
            .Select(r => {
                      string message;
                      using(var sr = new StreamReader(r.Request.InputStream))
                      {
                          message = sr.ReadToEnd();
                      }
                      r.Respond(201);
                      return message;
            }).Publish().RefCount();

        requests.POST("wait")
            .Subscribe(subscriber => messageStream
                          .Take(1)
                          .Subscribe(m => subscriber.Respond(new StringResponse(m))));
        
        messageStream.Subscribe(m => Console.WriteLine("New messages published {0}", m));

        Console.ReadLine();
    }
}

And this new version doesn’t need a queue.. We are only “chaining” observables…

Can we do it better?

I don’t know if it is any better but you can start take advantage of the Rx operators, for instance the above code for subscriptions could be implemented with a SelectMany:

requests.POST("wait")
        .SelectMany(subscriber => messageStream.Take(1),
                   (subscriber, message) => new {subscriber, message})
        .Subscribe(kp => kp.subscriber.Respond(new StringResponse(kp.message)));

The code is just an experiment and you can play with it here.


blog comments powered by Disqus
  • Categories

  • Archives