Interface InboundConnector

  • All Superinterfaces:
    ConnectorFactory

    public interface InboundConnector
    extends ConnectorFactory
    SPI used to implement a connector managing a source of messages for a specific transport. For example, to handle the consumption of records from Kafka, the reactive messaging extension would need to implement a bean implementing this interface. This bean is called for every channel that needs to be created for this specific transport (so Kafka in this case). These channels are connected to methods annotated with Incoming.

    Implementations are called to create a channel for each configured transport. The configuration is done using MicroProfile Config. The following snippet gives an example for a hypothetical Kafka connector:

     mp.messaging.incoming.my-channel.topic=my-topic
     mp.messaging.connector.acme.kafka.bootstrap.servers=localhost:9092
     ...
     

    The configuration keys are structured as follows: mp.messaging.[incoming|outgoing].channel-name.attribute or mp.messaging.[connector].connector-name.attribute. Channel names are not expected to contain . so the first occurrence of a . in the channel-name portion of a property terminates the channel name and precedes the attribute name. For connector attributes, the longest string, inclusive of .s, that matches a loadable connector is used as a connector-name. The remainder, after a . separator, is the attribute name. Configuration keys that begin mp.messaging.outgoing} are not used for InboundConnector configuration.

    The portion of the key that precedes the attribute acts as a property prefix that has a common structure across all MicroProfile Reactive Messaging configuration properties.

    The channel-name segment in the configuration key corresponds to the name of the channel used in the Incoming annotation:

     @Incoming("my-channel")
     public void consume(String s) {
         // ...
     }
     

    The set of attributes depend on the connector and transport layer (for example, bootstrap.servers is Kafka specific). The connector attribute indicates the name of the connector. It will be matched to the value returned by the Connector qualifier used on the relevant InboundConnector bean implementation. This is how a smallrye reactive messaging implementation looks for the specific InboundConnector required for a channel. Any mp.messaging.connector attributes for the channel's connector are also included in the set of relevant attributes. Where an attribute is present for both a channel and its connector the value of the channel specific attribute will take precedence. In the previous configuration, smallrye reactive messaging would need to find the InboundConnector qualified using the Connector qualifier with the value acme.kafka class to create the my-channel channel. Note that if the connector cannot be found, the deployment must be failed with a DeploymentException.

    The getPublisher(Config) is called for every channel that needs to be created. The Config object passed to the method contains a subset of the global configuration, and with the prefixes removed. So for the previous configuration, it would be:

     bootstrap.servers = localhost:9092
     topic = my-topic
     

    In this example, if topic was missing as a configuration property, the Kafka connector would be at liberty to default to the channel name indicated in the annotation as the Kafka topic. Such connector specific behaviours are outside the scope of this specification.

    So the connector implementation can retrieve the value with Config.getValue(String, Class) and Config.getOptionalValue(String, Class).

    If the configuration is invalid, the getPublisher(Config) method must throw an IllegalArgumentException, caught by smallrye reactive messaging and failing the deployment by throwing a DeploymentException wrapping the exception.

    This class is specific to SmallRye and is uses internally instead of IncomingConnectorFactory. Instead of a PublisherBuilder, it returns a Publisher.

    • Method Detail

      • getPublisher

        org.reactivestreams.Publisher<? extends Message<?>> getPublisher​(org.eclipse.microprofile.config.Config config)
        Creates a channel for the given configuration. The channel's configuration is associated with a specific connector, using the Connector qualifier's parameter indicating a key to which InboundConnector to use.

        Note that the connection to the transport or broker is generally postponed until the subscription occurs.

        Parameters:
        config - the configuration, must not be null, must contain the ConnectorFactory.CHANNEL_NAME_ATTRIBUTE attribute.
        Returns:
        the created Publisher, will not be null.
        Throws:
        IllegalArgumentException - if the configuration is invalid.
        NoSuchElementException - if the configuration does not contain an expected attribute.