Using the Camel API
The Camel connector is based on the Reactive Streams
support
from Camel. If you have an application already using the Camel API
(routes, from
...), you can integrate it with Reactive Messaging.
Getting the CamelReactiveStreamsService
Once you add the Camel connector to your application, you can retrieve
the
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
object:
| @Inject
CamelReactiveStreamsService reactiveStreamsService;
|
This CamelReactiveStreamsService
lets you create Publisher
and
Subscriber
instances from existing routes.
Using Camel Route with @Outgoing
If you have an existing Camel route, you can transform it as a
Publisher
using the CamelReactiveStreamsService
. Then, you can
return this Publisher
from a method annotated with @Outgoing
:
| @Outgoing("camel")
public Publisher<Exchange> retrieveDataFromCamelRoute() {
return reactiveStreamsService.from("seda:camel");
}
|
You can also expose a RouteBuilder
bean, making sure to use the Singleton
scope,
as RouteBuilder
is no longer proxyable:
| @Singleton
static class MyRouteBuilder extends RouteBuilder {
@Inject
CamelReactiveStreamsService reactiveStreamsService;
@Outgoing("sink")
public Publisher<String> getDataFromCamelRoute() {
return reactiveStreamsService.fromStream("my-stream", String.class);
}
@Override
public void configure() {
from("seda:camel").process(
exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
.to("reactive-streams:my-stream");
}
}
|
Alternatively you can use the LambdaRouteBuilder
:
| @ApplicationScoped
static class MyLambdaRouteBuilder {
@Inject
CamelReactiveStreamsService reactiveStreamsService;
@Outgoing("sink")
public Publisher<String> getDataFromCamelRoute() {
return reactiveStreamsService.fromStream("my-stream", String.class);
}
@Produces
@BindToRegistry
public LambdaRouteBuilder route() {
return rb -> rb.from("seda:camel").process(
exchange -> exchange.getMessage().setBody(exchange.getIn().getBody(String.class).toUpperCase()))
.to("reactive-streams:my-stream");
}
}
|
Using Camel Route with @Incoming
If you have an existing Camel route, you can transform it as a
Subscriber
using the CamelReactiveStreamsService
. Then, you can
return this Subscriber
from a method annotated with @Incoming
:
| @Incoming("to-camel")
public Subscriber<String> sendDataToCamelRoute() {
return reactiveStreamsService.subscriber("file:./target?fileName=values.txt&fileExist=append",
String.class);
}
|
You can also use a producer:
| @Inject
CamelContext camel;
@Incoming("to-camel")
public CompletionStage<Void> sink(String value) {
return camel.createProducerTemplate()
.asyncSendBody("file:./target?fileName=values.txt&fileExist=append", value).thenApply(x -> null);
}
|