Skip to content

Customizing the underlying AMQP client

You can customize the underlying AMQP Client configuration by producing an instance of AmqpClientOptions:

@Produces
@Identifier("my-options")
public AmqpClientOptions getNamedOptions() {
    // You can use the produced options to configure the TLS connection
    PemKeyCertOptions keycert = new PemKeyCertOptions()
            .addCertPath("./tls/tls.crt")
            .addKeyPath("./tls/tls.key");
    PemTrustOptions trust = new PemTrustOptions().addCertPath("./tlc/ca.crt");

    return new AmqpClientOptions()
            .setSsl(true)
            .setPemKeyCertOptions(keycert)
            .setPemTrustOptions(trust)
            .addEnabledSaslMechanism("EXTERNAL")
            .setHostnameVerificationAlgorithm("") // Disable hostname verification
            .setConnectTimeout(30000)
            .setReconnectInterval(5000)
            .setContainerId("my-container");
}

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-options-name attribute:

mp.messaging.incoming.prices.client-options-name=my-named-options

If you experience frequent disconnections from the broker, the AmqpClientOptions can also be used to set a heartbeat if you need to keep the AMQP connection permanently. Some brokers might terminate the AMQP connection after a certain idle timeout. You can provide a heartbeat value which will be used by the Vert.x proton client to advertise the idle timeout when opening transport to a remote peer.

1
2
3
4
5
6
7
@Produces
@Identifier("my-named-options")
public AmqpClientOptions getNamedOptions() {
  // set a heartbeat of 30s (in milliseconds)
  return new AmqpClientOptions()
        .setHeartbeat(30000);
}

Client capabilities

Both incoming and outgoing AMQP channels can be configured with a list of capabilities to declare during sender and receiver connections with the AMQP broker. Note that supported capability names are broker specific.

1
2
3
mp.messaging.incoming.sink.capabilities=temporary-topic
...
mp.messaging.outgoing.source.capabilities=shared