Skip to content

Using the Camel API

The Camel connector is based on the Reactive Streams support from Camel. If you have an application already using the Camel API (routes, from...), you can integrate it with Reactive Messaging.

Getting the CamelReactiveStreamsService

Once you add the Camel connector to your application, you can retrieve the org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService object:

@Inject
CamelReactiveStreamsService reactiveStreamsService;

This CamelReactiveStreamsService lets you create Publisher and Subscriber instances from existing routes.

Using Camel Route with @Outgoing

If you have an existing Camel route, you can transform it as a Publisher using the CamelReactiveStreamsService. Then, you can return this Publisher from a method annotated with @Outgoing:

1
2
3
4
@Outgoing("camel")
public Publisher<Exchange> retrieveDataFromCamelRoute() {
    return reactiveStreamsService.from("seda:camel");
}

You can also expose a RouteBuilder bean, making sure to use the Singleton scope, as RouteBuilder is no longer proxyable:

@Singleton
static class MyRouteBuilder extends RouteBuilder {
    @Inject
    CamelReactiveStreamsService reactiveStreamsService;

    @Outgoing("sink")
    public Publisher<String> getDataFromCamelRoute() {
        return reactiveStreamsService.fromStream("my-stream", String.class);
    }

    @Override
    public void configure() {
        from("seda:camel").process(
                exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
                .to("reactive-streams:my-stream");
    }
}

Alternatively you can use the LambdaRouteBuilder:

@ApplicationScoped
static class MyLambdaRouteBuilder {
    @Inject
    CamelReactiveStreamsService reactiveStreamsService;

    @Outgoing("sink")
    public Publisher<String> getDataFromCamelRoute() {
        return reactiveStreamsService.fromStream("my-stream", String.class);
    }

    @Produces
    @BindToRegistry
    public LambdaRouteBuilder route() {
        return rb -> rb.from("seda:camel").process(
                exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
                .to("reactive-streams:my-stream");
    }
}

Using Camel Route with @Incoming

If you have an existing Camel route, you can transform it as a Subscriber using the CamelReactiveStreamsService. Then, you can return this Subscriber from a method annotated with @Incoming:

1
2
3
4
5
@Incoming("to-camel")
public Subscriber<String> sendDataToCamelRoute() {
    return reactiveStreamsService.subscriber("file:./target?fileName=values.txt&fileExist=append",
            String.class);
}

You can also use a producer:

1
2
3
4
5
6
7
8
@Inject
CamelContext camel;

@Incoming("to-camel")
public CompletionStage<Void> sink(String value) {
    return camel.createProducerTemplate()
            .asyncSendBody("file:./target?fileName=values.txt&fileExist=append", value).thenApply(x -> null);
}