How do I buffer a subject such that it emits somewhat like a debounced subject?
I tried using the regular buffer operator that takes a TimeSpan
as input, but this has a few issues. I only want the buffer to be emitted if these conditions are satisfied:
- At least one value has been received
- A set amount of time has passed since the last value was received without receiving any more values
Essentially, I want the buffered subject to behave like this:
- If the next value is received when the buffer is empty, add the value to the buffer and start a timeout
- If the next value is received when the buffer is not empty, add the value to the buffer and re-start the timeout
- Only after the timeout has started and finished, emit the current buffer and start a new buffer
Like a debounced subject, this will only emit when a value has been received, but unlike a debounced subject, it collects all of the values into a buffer instead of discarding them.
I assume I would need a scheduler to accomplish this, and I figured I could copy/reused the debounce operator’s scheduler, but I can’t find it.
Edit: sorry for the title gore/grammar mistakes
The only downside to your answer is that you’re creating two separate subscriptions to MySubject
. If it is dynamically producing values that can cause grief. You should always use a shared subscription.
Here’s how:
MySubject.Publish(ps => ps.Buffer(() => ps.Throttle(TimeSpan.FromSeconds(1.0))));
To see the difference using Publish
makes, try the following code:
var rnd = new Random(); var source = Observable .Generate( 0, x => x < 100, x => x + 1, x => rnd.Next(10), x => TimeSpan.FromSeconds(rnd.Next(4)));
This generates a series of random numbers between 0
and 9
inclusively at an interval of between 0
and 3
seconds between successive values.
Then, run this query:
var query = source .Publish(ps => ps .Do(p => Console.WriteLine($"ps1: {p}")) .Buffer(() => ps .Do(p => Console.WriteLine($"ps2: {p}")) .Throttle(TimeSpan.FromSeconds(1.0))));
It outputs the following:
ps1: 1 ps2: 1 ps1: 2 ps2: 2 ps1: 3 ps2: 3 ps1: 3 ps2: 3 ps1: 6 ps1: 1 ps2: 1 ps1: 3 ps2: 3 ps1: 9 ps1: 5 ps2: 5 ps1: 7 ps2: 7 ps1: 9 ps2: 9 ps1: 1 ps2: 1 ps1: 8 ps2: 8 ps1: 5 ps2: 5 ps1: 6 ps2: 6 ...
Note that the ps1
and ps2
numbers are paired and in sequence – it’s always ps1
then ps2
before another ps1
.
The output of the actual observable was this:
{ 1, 2, 3 } { 3 } { 6, 1 } { 3 } { 9, 5, 7, 9, 1, 8 } { 5 } { 6 }
Now try without the Publish
:
var query = source .Do(p => Console.WriteLine($"ps1: {p}")) .Buffer(() => source .Do(p => Console.WriteLine($"ps2: {p}")) .Throttle(TimeSpan.FromSeconds(1.0)));
Here’s the output:
ps1: 0 ps1: 5 ps2: 3 ps1: 6 ps1: 8 ps2: 9 ps1: 7 ps2: 2 ps2: 3 ps2: 4 ps1: 1 ps2: 7 ps2: 4 ps2: 2 ps1: 0 ps1: 6 ps1: 7 ps1: 9 ps1: 2 ps2: 8 ps1: 0 ps2: 3 ps2: 3 ps1: 1 ps2: 9 ps1: 4 ps2: 8 ps2: 3 ps1: 1 ...
It’s now no longer paired nor in sequence.
The output of the observable was this:
{ 0, 5, 6, 8 } { } { 7, 1 } { } { 0, 6 } { 7, 9, 2, 0 } { } { 1, 4 } { } ...
Note the empty buffers!
The results without are not at all reliable.
Always use Publish
to ensure you’re sharing the same source observable.
I think I figured out what to do. Maybe it will help someone else.
MySubject .Buffer(() => MySubject.Throttle(TimeSpan.FromSeconds(1)) .Subscribe(() => ... )