Skip to content

Can I have custom operators?#

Yes, but please write operators responsibly!

Both Uni and Multi support custom operators using the plug operator. Here is an example where we use a custom Multi operator that randomly drops items:

1
2
3
4
Multi.createFrom()
        .range(1, 101)
        .plug(RandomDrop::new)
        .subscribe().with(System.out::println);

with the operator defined as follows:

public class RandomDrop<T> extends AbstractMultiOperator<T, T> {
    public RandomDrop(Multi<? extends T> upstream) {
        super(upstream);
    }

    @Override
    public void subscribe(MultiSubscriber<? super T> downstream) {
        upstream.subscribe().withSubscriber(new DropProcessor(downstream));
    }

    private class DropProcessor extends MultiOperatorProcessor<T, T> {
        DropProcessor(MultiSubscriber<? super T> downstream) {
            super(downstream);
        }

        @Override
        public void onItem(T item) {
            if (ThreadLocalRandom.current().nextBoolean()) {
                super.onItem(item);
            }
        }
    }
}

Caution

Custom operators are an advanced feature: when possible please use the existing operators and use helpers such as stage to write readable code.

In the case of custom Multi operators it is wise to test them against the Reactive Streams TCK.