Interface OutboundConnector
-
- All Superinterfaces:
ConnectorFactory
public interface OutboundConnector extends ConnectorFactory
SPI used to implement a connector managing a sink of messages for a specific transport. For example, to handle dispatch messages to Kafka, the reactive messaging extension would implement abean
implementing this interface. This bean is called for everystream
that needs to be created for this specific transport (so Kafka in this example). These streams are connected to methods annotated withOutgoing
.The factory is called to create a
subscriber
for each configured transport. The configuration is done using MicroProfile Config. The following snippet gives an example for a hypothetical Kafka connector:mp.messaging.outgoing.my-channel.connector=acme.kafka mp.messaging.outgoing.my-channel.topic=my-topic mp.messaging.connector.acme.kafka.bootstrap.servers=localhost:9092 ...
The configuration keys are structured as follows:
mp.messaging.[incoming|outgoing].channel-name.attribute
ormp.messaging.[connector].connector-name.attribute
. Channel names are not expected to contain.
so the first occurrence of a.
in the channel-name portion of a property terminates the channel name and precedes the attribute name. For connector attributes, the longest string, inclusive of.
s, that matches a loadable connector is used as aconnector-name
. The remainder, after a.
separator, is the attribute name. Configuration keys that beginmp.messaging.incoming
are not used forOutboundConnector
configuration.The
channel-name
segment in the configuration key corresponds to the name of the channel used in theOutgoing
annotation:@Outgoing("my-channel") public CompletionStage<String> produce(String s) { // ... }
The set of attributes depend on the connector and transport layer (For example, bootstrap.servers is Kafka specific). The
connector
attribute indicates the name of the connector. It will be matched to the value returned by theConnector
qualifier used on the relevantOutboundConnector
bean implementation. This is how a reactive messaging implementation looks for the specificOutboundConnector
required for a channel. Anymp.messaging.connector
attributes for the channel's connector are also included in the set of relevant attributes. Where an attribute is present for both a channel and its connector the value of the channel specific attribute will take precedence. In the previous configuration, the smallrye reactive messaging would need to find theOutboundConnector
implementation qualified with theConnector
qualifier with the valueacme.kafka
to create themy-channel
subscriber. Note that if the connector cannot be found, the deployment must be failed with aDeploymentException
.The
getSubscriber(Config)
is called for every channel that needs to be created. TheConfig
object passed to the method contains a subset of the global configuration, and with the prefixes removed. So for the previous configuration, it would be:bootstrap.servers = localhost:9092 topic = my-topic
So the connector implementation can retrieve the value with
Config.getValue(String, Class)
andConfig.getOptionalValue(String, Class)
.If the configuration is invalid, the
getSubscriber(Config)
method must throw anIllegalArgumentException
, caught by smallrye reactive messaging implementation and triggering a failure in the deployment.This class is specific to SmallRye and is uses internally instead of
OutgoingConnectorFactory
. Instead of aSubscriberBuilder
, it returns aSubscriber
.
-
-
Field Summary
-
Fields inherited from interface org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory
CHANNEL_NAME_ATTRIBUTE, CONNECTOR_ATTRIBUTE, CONNECTOR_PREFIX, INCOMING_PREFIX, OUTGOING_PREFIX
-
-
Method Summary
All Methods Instance Methods Abstract Methods Modifier and Type Method Description org.reactivestreams.Subscriber<? extends Message<?>>
getSubscriber(org.eclipse.microprofile.config.Config config)
Creates a channel for the given configuration.
-
-
-
Method Detail
-
getSubscriber
org.reactivestreams.Subscriber<? extends Message<?>> getSubscriber(org.eclipse.microprofile.config.Config config)
Creates a channel for the given configuration. The channel's configuration is associated with a specificconnector
, using theConnector
qualifier's parameter indicating a key to whichOutgoing
to use.Note that the connection to the transport or broker is generally postponed until the subscription.
- Parameters:
config
- the configuration, nevernull
, must contain theConnectorFactory.CHANNEL_NAME_ATTRIBUTE
attribute.- Returns:
- the created
Subscriber
, must not benull
. - Throws:
IllegalArgumentException
- if the configuration is invalid.NoSuchElementException
- if the configuration does not contain an expected attribute.
-
-