FixedDemandPacer is a simple pacer with a fixed demand and a fixed delay.
You can create more elaborated pacers by implementing the DemandPacer interface.
To do so you provide an initial request and a function to evaluate the next request which is evaluated based on the previous request and the number of items emitted since the last request:
AssertSubscriber<Integer>sub=AssertSubscriber.create();sub=Multi.createFrom().range(0,100).capDemandsTo(50L).subscribe().withSubscriber(sub);// A first batch of 50 (capped), 25 remain outstandingsub.request(75L).assertNotTerminated();assertThat(sub.getItems()).hasSize(50);// Second batch: 25 + 25 = 50sub.request(25L).assertCompleted();assertThat(sub.getItems()).hasSize(100);
Here we cap requests to 50 items, so it takes 2 requests to get all 100 items of the upstream range.
The first request of 75 items is capped to a request of 50 items, leaving an outstanding demand of 25 items.
The second request of 25 items is added to the outstanding demand, resulting in a request of 50 items and completing the stream.
You can also define a custom function that provides a capping value based on a custom formula, or based on earlier demand observations:
The Multi.pauseDemand() operator provides fine-grained control over demand propagation in reactive streams.
Unlike cancellation, which terminates the subscription, pausing allows to suspend demand without unsubscribing from the upstream.
This is useful for implementing flow control patterns where item flow needs to be paused based on external conditions.
DemandPauserpauser=newDemandPauser();AssertSubscriber<Integer>sub=AssertSubscriber.create();Multi.createFrom().range(0,100).pauseDemand().using(pauser)// Throttle the multi.onItem().call(i->Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10))).subscribe().withSubscriber(sub);// Unbounded requestsub.request(Long.MAX_VALUE);// Wait for some itemsawait().untilAsserted(()->assertThat(sub.getItems()).hasSizeGreaterThan(10));// Pause the streampauser.pause();assertThat(pauser.isPaused()).isTrue();intsizeWhenPaused=sub.getItems().size();// Wait - no new items should arrive (except a few in-flight)await().pollDelay(Duration.ofMillis(100)).until(()->true);assertThat(sub.getItems()).hasSizeLessThanOrEqualTo(sizeWhenPaused+5);// Resume the streampauser.resume();assertThat(pauser.isPaused()).isFalse();// All items eventually arrivesub.awaitCompletion();assertThat(sub.getItems()).hasSize(100);
The DemandPauser provides methods to:
pause(): Stop propagating demand to upstream
resume(): Resume demand propagation and deliver buffered items
isPaused(): Check the current pause state
Note that a few items may still arrive after pausing due to in-flight requests that were already issued to upstream.
DemandPauserpauser=newDemandPauser();AssertSubscriber<Integer>sub=Multi.createFrom().range(0,50).pauseDemand().paused(true)// Start paused.using(pauser).subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));// No items arrive while pausedawait().pollDelay(Duration.ofMillis(100)).until(()->true);assertThat(sub.getItems()).isEmpty();assertThat(pauser.isPaused()).isTrue();// Resume to start receiving itemspauser.resume();sub.awaitCompletion();assertThat(sub.getItems()).hasSize(50);
This is useful when you want to prepare a stream but delay its execution until certain conditions are met.
By default, the upstream subscription happens immediately even when starting paused.
The lateSubscription() option delays the upstream subscription until the stream is resumed:
DemandPauserpauser=newDemandPauser();AtomicBooleansubscribed=newAtomicBoolean(false);AssertSubscriber<Integer>sub=Multi.createFrom().range(0,50).onSubscription().invoke(()->subscribed.set(true)).pauseDemand().paused(true)// Start paused.lateSubscription(true)// Delay subscription until resumed.using(pauser).subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));// Stream is not subscribed yetawait().pollDelay(Duration.ofMillis(100)).until(()->true);assertThat(subscribed.get()).isFalse();assertThat(sub.getItems()).isEmpty();// Resume triggers subscription and item flowpauser.resume();await().untilAsserted(()->assertThat(subscribed.get()).isTrue());sub.awaitCompletion();assertThat(sub.getItems()).hasSize(50);
When a stream is paused, the operator stops requesting new items from upstream.
However, items that were already requested (due to downstream demand) may still arrive.
Buffer strategies control what happens to these in-flight items.
The pauseDemand() operator supports three buffer strategies: BUFFER (default), DROP, and IGNORE.
Configuring any other strategy will throw an IllegalArgumentException.
DemandPauserpauser=newDemandPauser();AssertSubscriber<Long>sub=AssertSubscriber.create(Long.MAX_VALUE);Multi.createFrom().ticks().every(Duration.ofMillis(10)).select().first(100).pauseDemand().bufferStrategy(BackPressureStrategy.BUFFER).bufferSize(20)// Buffer up to 20 items.using(pauser).subscribe().withSubscriber(sub);// Wait for some itemsawait().untilAsserted(()->assertThat(sub.getItems()).hasSizeGreaterThan(5));// Pause - items buffer up to the limitpauser.pause();// Wait for buffer to fillawait().pollDelay(Duration.ofMillis(100)).untilAsserted(()->assertThat(pauser.bufferSize()).isGreaterThan(0));// Buffer size is cappedassertThat(pauser.bufferSize()).isLessThanOrEqualTo(20);// Resume drains the bufferpauser.resume();await().untilAsserted(()->assertThat(pauser.bufferSize()).isEqualTo(0));
You can configure the buffer size:
bufferUnconditionally(): Unbounded buffer
bufferSize(n): Buffer up to n items, then fail with buffer overflow
When the buffer overflows, the stream fails with an IllegalStateException.
Important: The buffer only holds items that were already requested from upstream before pausing.
When paused, no new requests are issued to upstream, so the buffer size is bounded by the outstanding demand at the time of pausing.
DemandPauserpauser=newDemandPauser();AssertSubscriber<Long>sub=Multi.createFrom().ticks().every(Duration.ofMillis(5)).select().first(200).pauseDemand().bufferStrategy(BackPressureStrategy.DROP)// Drop items while paused.using(pauser).subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));// Wait for some itemsawait().untilAsserted(()->assertThat(sub.getItems()).hasSizeGreaterThan(20));// Pause - subsequent items are droppedpauser.pause();intsizeWhenPaused=sub.getItems().size();// Wait while items are droppedawait().pollDelay(Duration.ofMillis(200)).until(()->true);// Resume - continue from current positionpauser.resume();await().atMost(Duration.ofSeconds(3)).untilAsserted(()->assertThat(sub.getItems().size()).isGreaterThan(sizeWhenPaused+20));sub.awaitCompletion();// Not all items arrived (some were dropped)assertThat(sub.getItems()).hasSizeLessThan(200);
Items that arrive while paused are discarded, and when resumed, the stream continues requesting fresh items.
Already-requested items continue to flow downstream while paused.
This strategy doesn’t use any buffers.
It only pauses demand from being issued to upstream, but does not pause the flow of already requested items.
DemandPauserpauser=newDemandPauser();AssertSubscriber<Long>sub=Multi.createFrom().ticks().every(Duration.ofMillis(10)).select().first(100).pauseDemand().bufferStrategy(BackPressureStrategy.BUFFER).bufferUnconditionally()// Unbounded buffer.using(pauser).subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));// Wait for some itemsawait().untilAsserted(()->assertThat(sub.getItems()).hasSizeGreaterThan(5));// Pause and let buffer fillpauser.pause();await().pollDelay(Duration.ofMillis(100)).untilAsserted(()->assertThat(pauser.bufferSize()).isGreaterThan(0));intbufferSize=pauser.bufferSize();assertThat(bufferSize).isGreaterThan(0);// Clear the bufferbooleancleared=pauser.clearBuffer();assertThat(cleared).isTrue();assertThat(pauser.bufferSize()).isEqualTo(0);// Resume - items continue from current position (cleared items are lost)pauser.resume();
The DemandPauser provides:
bufferSize(): Returns the current number of buffered items
clearBuffer(): Clears the buffer (only works while paused), returns true if successful