Skip to content

Controlling the demand#

Pacing the demand#

A subscription is used for 2 purposes: cancelling a request and demanding batches of items.

The Multi.paceDemand() operator can be used to automatically issue requests at certain points in time.

The following example issues requests of 25 items every 100ms:

1
2
3
4
FixedDemandPacer pacer = new FixedDemandPacer(25L, Duration.ofMillis(100L));

Multi<Integer> multi = Multi.createFrom().range(0, 100)
        .paceDemand().on(Infrastructure.getDefaultWorkerPool()).using(pacer);

FixedDemandPacer is a simple pacer with a fixed demand and a fixed delay.

You can create more elaborated pacers by implementing the DemandPacer interface. To do so you provide an initial request and a function to evaluate the next request which is evaluated based on the previous request and the number of items emitted since the last request:

DemandPacer pacer = new DemandPacer() {

    @Override
    public Request initial() {
        return new Request(10L, Duration.ofMillis(100L));
    }

    @Override
    public Request apply(Request previousRequest, long observedItemsCount) {
        return new Request(previousRequest.demand() * 2, previousRequest.delay().plus(10, ChronoUnit.MILLIS));
    }
};

The previous example is a custom pacer that doubles the demand and increases the delay for each new request.

Capping the demand requests#

The capDemandsTo and capDemandUsing operators can be used to cap the demand from downstream subscribers.

The capDemandTo operator defines a maximum demand that can flow:

AssertSubscriber<Integer> sub = AssertSubscriber.create();

sub = Multi.createFrom().range(0, 100)
        .capDemandsTo(50L)
        .subscribe().withSubscriber(sub);

// A first batch of 50 (capped), 25 remain outstanding
sub.request(75L).assertNotTerminated();
assertThat(sub.getItems()).hasSize(50);

// Second batch: 25 + 25 = 50
sub.request(25L).assertCompleted();
assertThat(sub.getItems()).hasSize(100);

Here we cap requests to 50 items, so it takes 2 requests to get all 100 items of the upstream range. The first request of 75 items is capped to a request of 50 items, leaving an outstanding demand of 25 items. The second request of 25 items is added to the outstanding demand, resulting in a request of 50 items and completing the stream.

You can also define a custom function that provides a capping value based on a custom formula, or based on earlier demand observations:

AssertSubscriber<Integer> sub = AssertSubscriber.create();

sub = Multi.createFrom().range(0, 100)
        .capDemandsUsing(n -> {
            if (n > 1) {
                return (long) (((double) n) * 0.75d);
            } else {
                return n;
            }
        })
        .subscribe().withSubscriber(sub);

sub.request(100L).assertNotTerminated();
assertThat(sub.getItems()).hasSize(75);

sub.request(1L).assertNotTerminated();
assertThat(sub.getItems()).hasSize(94);

sub.request(Long.MAX_VALUE).assertCompleted();
assertThat(sub.getItems()).hasSize(100);

Here we have a function that requests 75% of the downstream requests.

Note that the function must return a value n that satisfies (0 < n <= requested) where requested is the downstream demand.