3,456 questions
0
votes
0
answers
47
views
What are the consequences of using Action<T> vs Func<Task<T>> in Subscribe method? [duplicate]
What is the difference between and consequences of using:
IObservable<Trade> bigTrades = ...
bigTrades.Subsribe(async t => await bigTradeStore.LogTradeAsync(t)); //async void
vs.
...
0
votes
0
answers
42
views
#error "Unable to determine type definition of intptr_t" with RX toolchain and PC-lint
I'm working on a Renesas RX project using the MinGW RX toolchain (GCC 8.3.0a) and running PC-lint for static analysis. During linting, I get the following error:
#error "Unable to determine type ...
1
vote
0
answers
51
views
Using IObservable<T> with System.Reactive through the layers of a software
When using IObservable and System.Reactive, I often come into the situation where I need to forward the elements of an observable through multiple layers of the software. For example: NetworkStream -&...
0
votes
1
answer
59
views
Handling exceptions in IObservable pipeline while allowing pipeline to continue
I am writing a UI application using ReactiveUI for the view model logic. I am trying to respond to a change in state by sending the new state into an async method, and then storing the result in an ...
0
votes
5
answers
112
views
How to pause method execution until at least N seconds have passed since the last change to a directory?
I have a process that needs to intermittently move some files from one directory to another. Before I move the files, I need to ensure that a user isn't currently adding files to the directory - if ...
1
vote
1
answer
86
views
How to clean groups from produced by GroupBy
I made following observable, that filters only changes for given ResourceId:
var valueChangesObs = events
.GroupBy(e => e.ResourceId)
.SelectMany(e => e.DistinctUntilChanged(e => e....
0
votes
0
answers
84
views
Observable Event - how to wait for all async subscriptions to finish
Let's say I have a singleton dispatcher with a property OnSubmit that should be consumed by multiple components. The components provide async handlers for the OnSubmit event. The goal is to await the ...
1
vote
1
answer
51
views
RX operator to signal changes on an original observable emitting directly when possible
I'm struggling to write some operator/method generating an observable that would transform an initial observable into another one but with some kind of back pressure control (like Throttle operator) ...
1
vote
1
answer
76
views
Rx cancellation and race condition
I have an async method that receives data from Rx Observable and writes it to a file. Give this method a cancellationToken and cancel it at some random time.
This method is implemented with await ...
3
votes
0
answers
58
views
Is there a way to unsubscribe from a Subject of System.Reactive
I am new to System.Reactive and trying to understand how to use this extension for some of the scenarios.
I have basically three components, one is creating the message and the others are the receiver ...
1
vote
0
answers
56
views
Why does AsObservable used with BroadCastBlock not get all values
I was experimenting with having a BroadcastBlock also broadcast to observers. However I found that it doesn't quite work the way I'd expect.
This is an example which demonstrates the difference ...
0
votes
0
answers
58
views
Rx operator which waits for subscription to finish
I want to combine two observables, say SendMoves and Results to produce SendMovesAndGetResults.
Subscription to SendMoves ultimately causes values to be pushed down Results. Therefore subscription to ...
0
votes
1
answer
97
views
How to merge a hot and cold observable of the same source while avoiding duplicates efficiently?
I have a source of elements. It can be queried and it publishes events when elements are added. In other words, I can make a cold observable from the query results, and a hot observable from the event....
0
votes
1
answer
57
views
How to detect there is no data or no stream. Rx.net
I want to managed case in rx.net where there is no data or no stream. I just what to simulate is there connection with udp service.
I am trying to do that such like this:
Observable
.Defer(
() =&...
3
votes
2
answers
131
views
Detecting Click and DoubleClick using Observables
I have a stream of click events and I want to determine from that stream whether a single click or a double clicked was performed by the user. Currently I'm attempting to do this by buffering the ...
2
votes
0
answers
60
views
FileSytemWatcher error event to Observable
The "Creating Observable Sequences" section of the e-book INTRODUCTION TO RX.NET (by Ian Griffiths and Lee Campbell) describes how to transform file system events generated by ...
0
votes
3
answers
103
views
Use RxExtensions to aggregate stream of data
I have a play project to learn Rx as it is something I have always wanted to learn and could be useful.
I just can't seem to grasp it like I did Linq.
So I have a stream of data and would like to ...
1
vote
1
answer
67
views
Overlapping Buffer with Closing Selector
I have a stream of mouse events that emit whilst the user drags the mouse cursor on the screen whilst the button is clicked. I want to perform a pairwise operation on these events and can do this ...
0
votes
1
answer
54
views
Reactive Extensions operators for pattern matching
I'd like to subscribe to a stream and get a notification if a certain sequence of events are observed.
Something like - in very much pseudo code: .WhenType<MessageHandled>().ThenType<...
0
votes
1
answer
55
views
Reactive operator to set up Timeout behaviour, but only once a certain number of items are returned
I have a scenario where I have an Observable sequence of measurements coming from an instrument, which only fires a measurement event when the value has changed by a certain amount.
The underlying ...
0
votes
1
answer
64
views
System.Reactive, processing azure queue message one at a time
Having a little trouble with this one and hoping you guys can help... I am using C# System.Reactive. I have one observable that is simply an interval that gets azure queue messages from an azure ...
0
votes
1
answer
41
views
C# RX - GroupBy only runs for the first time
I have rx pipeline that does the following -
Runs every 10 second.
Generate period using Scan (1 minute increments).
Fetches data for current time period.
Projects them as stream of objects of type ...
1
vote
1
answer
77
views
Rx.Net - How to Aggregate messages and emit intermediate output when closing sequence gets triggered?
Objective
The problem I am trying to solve is aggregating the sequence (summing values) of messages of type (int Key, int Value) until a closing observable emits a "flush" marker item.
For ...
1
vote
1
answer
45
views
Non-intersecting LeftDurationSelector in Reactive Extensions Join
I want to create an observable that emits when the right observable emits a value and takes the last left observable value with it, i.e., as described in the following marble diagram (taken from ...
1
vote
1
answer
91
views
.NET Rx C# Observable.FromEventPattern does not run OnCompleted
I could not figure it out why the following code could not run the OnCompleted, can anyone please help me thanks.
Basically what I'm doing is everytime I press a key, I'll fire an event and convert ...
2
votes
3
answers
108
views
Is there a way to cancel and replace an observable if it is producing too many values too fast?
I have an Observable that produces values at variable speed. In order to not get overwhelmed with values I added a Throttle of three seconds so I only get a value if no further values were published ...
0
votes
0
answers
44
views
Convert from IObservable<byte> to Stream?
I was looking to convert from IObservable<byte> to Stream and came up with this code that uses System.IO.Pipelines.Pipe.
public static Stream ToStream(this IObservable<byte> observable, ...
1
vote
0
answers
149
views
Integrating Observables into Blazor Fluxor Effects like in Angular
I come from an Angular background switching over to Blazor. Since I like the redux pattern and have used NGRX with Angular, I'm using Blazor Fluxor for statement management, which works well.
Now I ...
1
vote
1
answer
72
views
Rx.Net Window inner observables completing early
Why does
Observable.Interval(TimeSpan.FromSeconds(1)).Window(2, 1).Concat()
yield 0, 1, 2, ...?
My expectation is that .Window(2, 1) would yield a series of pairs of numbers (0, 1, 1, 2, 2, 3, ...) ...
0
votes
1
answer
113
views
Observable from Func delegate
Is there a method or library function that would accept Func<T> and return IObservable<T> by invoking it?
The functionality should be probably equal to
public IObservable<T> Create&...
2
votes
2
answers
140
views
How to pull from IObservable
Suppose there is a Subject<T> at endpoint A and an IObservable<T> on endpoint B.
Endpoint A sends exactly one object of T using OnNext() and never calls OnComplete(). I don't have a ...
1
vote
2
answers
113
views
Rx.NET Buffer with delay problem on quotes aggregation
I am using reactive library for 1-second bar aggregation based on symbol quotes (foreign exchange and CFD)
So far without success on IAsyncEnumerable extension, so I decided to replace pull with push ...
0
votes
1
answer
927
views
How to implement a .Debounce() / .DebounceDistinct() extension methods for observable streams in RXNet (C#)?
Essentially what the subject says. I can't find any ready-made .Debounce() extension-method (similar to how Throttle() is available out-of-the-box).
Following below is a rough and probably faulty idea ...
4
votes
1
answer
185
views
How to wait subscriber to complete before onNext?
I am very new to reactive programming and I stuck at one point.
I try to implement if my collection has reaches a certain number element or certain time has paseed i need to trigger some method.
...
1
vote
1
answer
64
views
RX operator to scan with prior value, with signal deriving from value itself
Say you have a source observable of type IObservable<uint>. I'm looking for a transformation function
IObservable<uint> Countdown(IObservable<uint> source)
{
// how? Something like ...
-1
votes
1
answer
86
views
What can cause a BehaviorSubject to report unhanded exceptions of another unrelated observable?
Context
I am investigating a complex issue where for some reason a BehaviorSubject emits an error to observers.
That happens even though nobody is actually interacting with that observable.
The only ...
0
votes
1
answer
55
views
Processing batched events with RX Observables in Hosted Service
I have the following scenario:
public class MyHostedService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly IEventSource _eventSource;
private ...
1
vote
2
answers
96
views
Rx - Scheduler that only allows n number of parallel operations at a time
I am trying to write a Scheduler that allows Limited Concurrency. Here is what I have written.
public class ThreeAtATimeScheduler : IScheduler
{
private readonly SemaphoreSlim _semaphore = new ...
0
votes
0
answers
73
views
How to create a Stream out of IObservable<byte>?
I've been thinking how to create a bridge between IObservable<byte> and Stream, but I'm completely lost.
The use case for this is when a function needs you to provide a Stream, but you have your ...
0
votes
2
answers
190
views
Converting Stream to Observable
I've tried with this, but it seems to have concurrency issues.
I don't fully understand what is wrong.
public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
...
1
vote
2
answers
514
views
How can I make async await the Reactive Extensions onNext, onError, onCompleted delegates
I have created the below extension where I was able to make an IObservable.Subscribe awaitable, but wrapping the whole subscription.
A big issue in current async programming in C# and the code below ...
1
vote
0
answers
358
views
Using .net rx in regular REST API - does it make sense?
I recently heard about reactive programming and started to explore Observer C# pattern and rx library. Despite some tutorials, articles and example use cases I still can't think if there are any ...
2
votes
1
answer
79
views
RX implementation of Save() action that would run immediately if 2s has passed since the previous execution or delay it until 2s has passed
var i = 0;
var saveSource = Observable.Interval(TimeSpan.FromMilliseconds(200)).Select(x => i++);
var throttledClicks = saveSource
.Throttle(TimeSpan.FromMilliseconds(2000)) // Throttle for ...
3
votes
2
answers
385
views
Is RX.net subject thread safe for subscribers?
I'm having a hard time finding information as google yields results about the safety of concurrently pushing data to the subject, this isn't my use case, i have a dedicated thread (running an infinite ...
0
votes
1
answer
99
views
Watching changes in children recursively with DynamicData
I'm creating a fully generic file explorer for AvaloniaUI. It has a view that shows a folder tree that you can navigate by expanding each directory.
My goal is to make this view flexible enough to ...
0
votes
0
answers
79
views
Implementing non-overlapping calls to a method
I'm developing a websocket client wrapper. The third-party server I'm connecting to manages sessions, and it doesn't tolerate concurrent sessions. If the AuthenticateAsync method is called twice in a ...
1
vote
1
answer
45
views
Creating an array of observables based off a condition and then reducing that array to one observable
I have two types of observables A & B, they return the same item type but are accessible by separate APIs. With a list of id's which can be for either A or B.
These observables return data for a ...
2
votes
1
answer
180
views
Executing a synchornous method, but with a timeout, using System.Reactive
What's the correct way to call the Read method with a timeout?
public int Method()
{
return Read();
}
I've tried with this, but my application is shutting down completely when a timeout ...
1
vote
1
answer
208
views
Subscribe to observable get event *and* full history
I'm playing around with System.Reactive and looking to achieve the following:
A timer fires an event every 3 seconds
A filesystemwatcher fires events in case a file is changed (another app appending ...
2
votes
1
answer
136
views
Rx.NET Buffer emit all items on Cancellation
I am using Rx.NET library and its method Buffer in the most simple manner like:
observable.Buffer(TimeSpan.FromSeconds(5), 10);
It works great except the case when cancellation token is activated. ...