Event Aggregator with Reactive Extensions

This is the second part of my post about Event Aggregator. The idea is to show you an easy way to create and use an event aggregator in your application.

The interface of IEventAggregator in Prism looks as follows:

public interface IEventAggregator
{
    TEventType GetEvent<TEventType>() 
        where TEventType : EventBase;
}

Let’s go to see what is EventBase:

///<summary>
/// Defines a base class to publish and subscribe to events.
///</summary>
public abstract class EventBase
{
    private readonly List<IEventSubscription> _subscriptions = new List<IEventSubscription>();


    protected ICollection<IEventSubscription> Subscriptions
    {
        get { return _subscriptions; }
    }

    protected virtual SubscriptionToken InternalSubscribe(IEventSubscription eventSubscription)
    {
        eventSubscription.SubscriptionToken = new SubscriptionToken();
        lock (Subscriptions)
        {
            Subscriptions.Add(eventSubscription);
        }
        return eventSubscription.SubscriptionToken;
    }

    protected virtual void InternalPublish(params object[] arguments)
    {
        List<Action<object[]>> executionStrategies = PruneAndReturnStrategies();
        foreach (var executionStrategy in executionStrategies)
        {
            executionStrategy(arguments);
        }
    }

    public virtual void Unsubscribe(SubscriptionToken token)
    {
        lock (Subscriptions)
        {
            IEventSubscription subscription = Subscriptions.FirstOrDefault(evt => evt.SubscriptionToken == token);
            if (subscription != null)
            {
                Subscriptions.Remove(subscription);
            }
        }
    }

    public virtual bool Contains(SubscriptionToken token)
    {
        lock (Subscriptions)
        {
            IEventSubscription subscription = Subscriptions.FirstOrDefault(evt => evt.SubscriptionToken == token);
            return subscription != null;
        }
    }

    private List<Action<object[]>> PruneAndReturnStrategies()
    {
        List<Action<object[]>> returnList = new List<Action<object[]>>();

        lock (Subscriptions)
        {
            for (var i = Subscriptions.Count - 1; i >= 0; i--)
            {
                Action<object[]> listItem =
                    _subscriptions[i].GetExecutionStrategy();

                if (listItem == null)
                {
                    // Prune from main list. Log?
                    _subscriptions.RemoveAt(i);
                }
                else
                {
                    returnList.Add(listItem);
                }
            }
        }

        return returnList;
    }
}

Although its look fine, I think we can do less and achieve more with Reactive Extensions framework.

So, for this example my interface will look as follows:

public interface IEventPublisher
{
  void Publish<TEvent>(TEvent sampleEvent);
  IObservable<TEvent> GetEvent<TEvent>();
}

The first method is for publishing a TEvent, and the second method is used to get an IObservable of TEvent. I like this approach because I leverage too many things of the reactive framework. On the other hand, there is no restriction about TEvent, in fact, any class could be an event. Another interesting point, is that IObservable is part of the framework now.

Usage examples

Simple subscription

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Subscribe(se => eventWasRaised = true);

eventPublisher.Publish(new SampleEvent());
eventWasRaised.Should().Be.True();

UnSubscribe

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

var subscription = eventPublisher.GetEvent<SampleEvent>()
    .Subscribe(se => eventWasRaised = true);

subscription.Dispose();
eventPublisher.Publish(new SampleEvent());
eventWasRaised.Should().Be.False();

Selective subscription

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Where(se => se.Status == 1)
    .Subscribe(se => eventWasRaised = true);

eventPublisher.Publish(new SampleEvent{Status = 1});
eventWasRaised.Should().Be.True();

Subscribe to projection

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .Select(se => se.Status)
    .Subscribe(status => Console.WriteLine(status));

eventPublisher.Publish(new SampleEvent{Status = 1});

Observe on dispatcher

No matter what is the thread the event was published, execute the desired handler in the UI thread.

bool eventWasRaised = false;
var eventPublisher = new EventPublisher();

eventPublisher.GetEvent<SampleEvent>()
    .ObserveOnDispatcher()
    .Select(se => se.Status)
    .Subscribe(status => Console.WriteLine(status));

eventPublisher.Publish(new SampleEvent{Status = 1});

These are just examples of what you can do when you combine Reactive Extensions with Event Aggregator.

The implementation

This is the whole implementation:

public class EventPublisher : IEventPublisher
{
    private readonly ConcurrentDictionary<Type, object> subjects
        = new ConcurrentDictionary<Type, object>();

    public IObservable<TEvent> GetEvent<TEvent>()
    {
        var subject = 
            (ISubject<TEvent>) subjects.GetOrAdd(typeof (TEvent), 
                        t => new Subject<TEvent>());
        return subject.AsObservable();
    }

    public void Publish<TEvent>(TEvent sampleEvent)
    {
        object subject;
        if (subjects.TryGetValue(typeof(TEvent), out subject))
        {
            ((ISubject<TEvent>)subject)
                .OnNext(sampleEvent);
        }
    }
}

Finally

Prism is just wrong. Your events should be like POCOs, put the subscription elsewhere.


blog comments powered by Disqus
  • Categories

  • Archives