0

I'd like to subscribe to a stream and get a notification if a certain sequence of events are observed.

Something like - in very much pseudo code: .WhenType<MessageHandled>().ThenType<TransactionCommitted>().ThenAnyNumberOfAnyType().ThenType<QueueStopped>().

Like a regex, but filtering on events/types in the stream rather than a string.

What's the best way to accomplish something like that?

1 Answer 1

1

I tried using Observable Joins (see Guide to System.Reactive.Joins), but couldn't get a working solution.

I did, however, try a state machine option and got something that works.

I started by creating an enum that represents your states. I know you're using types, but the enum makes my example clearer.

public enum Value
{
    MessageHandled,
    TransactionCommitted,
    QueueStopped,
    X,
    Y,
}

I then created an observable that returns various combinations of these values.

IObservable<Value> source = new[]
{
    /* invalid */ Value.TransactionCommitted, Value.TransactionCommitted, Value.MessageHandled, Value.X,
    /* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.X, Value.Y, Value.QueueStopped,
    /* invalid */ Value.X, Value.X,
    /* valid */ Value.MessageHandled, Value.TransactionCommitted, Value.QueueStopped,
    /* invalid */ Value.Y, Value.MessageHandled, Value.Y, Value.TransactionCommitted, Value.QueueStopped,
}.ToObservable();

I created an enum for the states, and a definition for the initial states of the state machine and the transitions.

public enum State { A, B, Z }

var initial = new (Value value, State state)[]
{
    (Value.MessageHandled, State.A),
};

var transitions = new(State current, Func<Value, bool> rule, State next)[]
{
    (State.A, t => t == Value.TransactionCommitted, State.B),
    (State.B, t => t != Value.QueueStopped, State.B),
    (State.B, t => t == Value.QueueStopped, State.Z),
};

And finally, here's the query that runs the state machine:

IObservable<Value[]> query =
    source
        .Scan(
            new List<(State state, Value[] values)>(),
            (accumulator, value) =>
            (
                Enumerable
                    .Concat(
                        from a in accumulator
                        from t in transitions
                        where a.state == t.current
                        where t.rule(value)
                        select (t.next, a.values.Append(value).ToArray()),
                        from i in initial
                        where i.value == value
                        select (i.state, new[] { value }))
            ).ToList())
        .SelectMany(x => x.Where(y => y.state == State.Z), (x, y) => y.values);

That produced:

output

And, based on your description, that's what I think you wanted.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.