guide
advanced
Can I have custom operators?
Yes, but please write operators responsibly!
Both Uni
and Multi
support custom operators using the plug
operator.
Here is an example where we use a custom Multi
operator that randomly drops items:
Multi . createFrom ()
. range ( 1 , 101 )
. plug ( RandomDrop :: new )
. subscribe (). with ( System . out :: println );
with the operator defined as follows:
public class RandomDrop < T > extends AbstractMultiOperator < T , T > {
public RandomDrop ( Multi <? extends T > upstream ) {
super ( upstream );
}
@Override
public void subscribe ( MultiSubscriber <? super T > downstream ) {
upstream . subscribe (). withSubscriber ( new DropProcessor ( downstream ));
}
private class DropProcessor extends MultiOperatorProcessor < T , T > {
DropProcessor ( MultiSubscriber <? super T > downstream ) {
super ( downstream );
}
@Override
public void onItem ( T item ) {
if ( ThreadLocalRandom . current (). nextBoolean ()) {
super . onItem ( item );
}
}
}
}
Caution
Custom operators are an advanced feature: when possible please use the existing operators and use helpers such as stage
to write readable code.
In the case of custom Multi
operators it is wise to test them against the Reactive Streams TCK .