How do I buffer a subject such that it emits somewhat like a debounced subject?

I tried using the regular buffer operator that takes a TimeSpan as input, but this has a few issues. I only want the buffer to be emitted if these conditions are satisfied:

  1. At least one value has been received
  2. A set amount of time has passed since the last value was received without receiving any more values

Essentially, I want the buffered subject to behave like this:

  1. If the next value is received when the buffer is empty, add the value to the buffer and start a timeout
  2. If the next value is received when the buffer is not empty, add the value to the buffer and re-start the timeout
  3. Only after the timeout has started and finished, emit the current buffer and start a new buffer

Like a debounced subject, this will only emit when a value has been received, but unlike a debounced subject, it collects all of the values into a buffer instead of discarding them.

I assume I would need a scheduler to accomplish this, and I figured I could copy/reused the debounce operator’s scheduler, but I can’t find it.

Edit: sorry for the title gore/grammar mistakes

Add Comment
2 Answer(s)

The only downside to your answer is that you’re creating two separate subscriptions to MySubject. If it is dynamically producing values that can cause grief. You should always use a shared subscription.

Here’s how:

MySubject.Publish(ps => ps.Buffer(() => ps.Throttle(TimeSpan.FromSeconds(1.0)))); 

To see the difference using Publish makes, try the following code:

var rnd = new Random();  var source =     Observable         .Generate(             0, x => x < 100, x => x + 1, x => rnd.Next(10),             x => TimeSpan.FromSeconds(rnd.Next(4))); 

This generates a series of random numbers between 0 and 9 inclusively at an interval of between 0 and 3 seconds between successive values.

Then, run this query:

var query =     source         .Publish(ps =>             ps                 .Do(p => Console.WriteLine($"ps1: {p}"))                 .Buffer(() =>                     ps                         .Do(p => Console.WriteLine($"ps2: {p}"))                         .Throttle(TimeSpan.FromSeconds(1.0)))); 

It outputs the following:

ps1: 1 ps2: 1 ps1: 2 ps2: 2 ps1: 3 ps2: 3 ps1: 3 ps2: 3 ps1: 6 ps1: 1 ps2: 1 ps1: 3 ps2: 3 ps1: 9 ps1: 5 ps2: 5 ps1: 7 ps2: 7 ps1: 9 ps2: 9 ps1: 1 ps2: 1 ps1: 8 ps2: 8 ps1: 5 ps2: 5 ps1: 6 ps2: 6 ... 

Note that the ps1 and ps2 numbers are paired and in sequence – it’s always ps1 then ps2 before another ps1.

The output of the actual observable was this:

{ 1, 2, 3 } { 3 } { 6, 1 } { 3 } { 9, 5, 7, 9, 1, 8 } { 5 } { 6 } 

Now try without the Publish:

var query =     source         .Do(p => Console.WriteLine($"ps1: {p}"))         .Buffer(() =>             source                 .Do(p => Console.WriteLine($"ps2: {p}"))                 .Throttle(TimeSpan.FromSeconds(1.0))); 

Here’s the output:

ps1: 0 ps1: 5 ps2: 3 ps1: 6 ps1: 8 ps2: 9 ps1: 7 ps2: 2 ps2: 3 ps2: 4 ps1: 1 ps2: 7 ps2: 4 ps2: 2 ps1: 0 ps1: 6 ps1: 7 ps1: 9 ps1: 2 ps2: 8 ps1: 0 ps2: 3 ps2: 3 ps1: 1 ps2: 9 ps1: 4 ps2: 8 ps2: 3 ps1: 1 ... 

It’s now no longer paired nor in sequence.

The output of the observable was this:

{ 0, 5, 6, 8 } { } { 7, 1 } { } { 0, 6 } { 7, 9, 2, 0 } { } { 1, 4 } { }   ... 

Note the empty buffers!

The results without are not at all reliable.

Always use Publish to ensure you’re sharing the same source observable.

Add Comment

I think I figured out what to do. Maybe it will help someone else.

MySubject     .Buffer(() => MySubject.Throttle(TimeSpan.FromSeconds(1))     .Subscribe(() => ... ) 
Answered on July 16, 2020.
Add Comment

Your Answer

By posting your answer, you agree to the privacy policy and terms of service.