I've been doing some work lately with the Reactive Framework and have been absolutely loving it so far. I'm looking at replacing a traditional polling message queue with some filtered IObservables to clean up my server operations. In the old way, I dealt with messages coming into the server like so:
// Start spinning the process message loop
Task.Factory.StartNew(() =>
{
while (true)
{
Command command = m_CommandQueue.Take();
ProcessMessage(command);
}
}, TaskCreationOptions.LongRunning);
Which results in a continuously polling thread that delegates commands from clients out to the ProcessMessage method where I have a series of if/else-if statements that determine the type of the command and delegate work based on its type
I am replacing this with an event driven system using Reactive for which I've written the following code:
private BlockingCollection<BesiegedMessage> m_MessageQueue = new BlockingCollection<BesiegedMessage>();
private IObservable<BesiegedMessage> m_MessagePublisher;
m_MessagePublisher = m_MessageQueue
.GetConsumingEnumerable()
.ToObservable(TaskPoolScheduler.Default);
// All generic Server messages (containing no properties) will be processed here
IDisposable genericServerMessageSubscriber = m_MessagePublisher
.Where(message => message is GenericServerMessage)
.Subscribe(message =>
{
// do something with the generic server message here
}
My question is that while this works, is it good practice to use a blocking collection as the backing for an IObservable like this? I don't see where Take() is ever called this way which makes me think that the Messages will pile off on the queue without being removed after they have been processed?
Would it be more efficient to look into Subjects as the backing collection to drive the filtered IObservables that will be picking up these messages? Is there anything else I'm missing here that might benefit the architecture of this system?
BlockingCollectionany longer - you need those for when you are actively polling for new messages, but if you are having your messages delivered to you, there's no need to "block until I get a signal" any longer