Skip to content

Vert.x-based Reactive Streams Publishers#

The mutiny-zero-vertx-publishers library (Maven coordinates io.smallrye.reactive:mutiny-zero-vertx-publishers) allows creating Reactive Streams publishers from Vert.x streams.

This library acts as a thin adapter between Vert.x ReadStream and java.util.concurrent.Flow.Publisher and uses Mutiny Zero to expose Reactive Streams compliant publishers.

API overview#

The entry point is the mutiny.zero.vertxpublishers.VertxPublisher interface that exposes 2 static factory methods.

  • Publisher<T> fromSupplier(Supplier<ReadStream<T>> streamSupplier) is to be used when some Vert.x API returns a ReadStream<T>.
  • Publisher<T> fromFuture(Supplier<Future<? extends ReadStream<T>>> futureStreamSupplier) is to be used when some Vert.x API asynchronously returns a ReadStream<T> through a Future.

The factory methods use suppliers so that the ReadStream instances to be adapted are on a per-subscriber basis.

Sample usage#

The following example makes HTTP requests to the Newcastle University Urban Observatory API using the Vert.x HTTP client:

package docsamples;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import mutiny.zero.vertxpublishers.VertxPublisher;

public class UrbanObservatoryHttpClient {

    public static void main(String[] args) {

        Vertx vertx = Vertx.vertx();
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

        RequestOptions opts = new RequestOptions()
                .setSsl(true)
                .setHost("api.usb.urbanobservatory.ac.uk")
                .setPort(443)
                .setMethod(HttpMethod.GET)
                .addHeader("Accept", "application/json")
                .setURI("/api/v2.0a/sensors/entity");

        HttpClient httpClient = vertx.createHttpClient();

        Flow.Publisher<Buffer> publisher = VertxPublisher.fromFuture(() -> httpClient
                .request(opts)
                .compose(HttpClientRequest::send));

        publisher.subscribe(new Flow.Subscriber<>() {

            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription s) {
                System.out.println("======================================");
                this.subscription = s;
                s.request(1L);
            }

            @Override
            public void onNext(Buffer buffer) {
                System.out.print(buffer.toString(StandardCharsets.UTF_8));
                executor.schedule(() -> subscription.request(1L), 500, TimeUnit.MILLISECONDS);
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("======================================");
                t.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("======================================");
            }
        });
    }
}

A new HTTP connection is issued everytime the publisher is being subscribed. In this example the subscriber controls demand by requesting a new Vert.x Buffer every 500ms.

If you run this program then you will see the JSON response being progressively printed to the standard console in chunks, every 500 milliseconds.