Skip to content

Configuring Pulsar clients, consumers and producers

Pulsar clients, consumers and producers are very customizable to configure how a Pulsar client application behaves.

The Pulsar connector creates a Pulsar client and, a consumer or a producer per channel, each with sensible defaults to ease their configuration. Although the creation is handled, all available configuration options remain configurable through Pulsar channels.

While idiomatic way of creating PulsarClient, PulsarConsumer or PulsarProducer are through builder APIs, in its essence those APIs build each time a configuration object, to pass onto the implementation. Those are ClientConfigurationData, ConsumerConfigurationData, ProducerConfigurationData.

Pulsar Connector allows receiving properties for those configuration objects directly. For example, the broker authentication information for PulsarClient is received using authPluginClassName and authParams properties. In order to configure the authentication for the incoming channel data :

1
2
3
4
5
6
7
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=topic
mp.messaging.incoming.data.subscriptionInitialPosition=Earliest
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationBasic
mp.messaging.incoming.data.authParams={"userId":"superuser","password":"admin"}

Note that the Pulsar consumer property subscriptionInitialPosition is also configured with the Earliest value which represents with enum value SubscriptionInitialPosition.Earliest.

This approach covers most of the configuration cases. However, non-serializable objects such as CryptoKeyReader, ServiceUrlProvider etc. cannot be configured this way. The Pulsar Connector allows taking into account instances of Pulsar configuration data objects – ClientConfigurationData, ConsumerConfigurationData, ProducerConfigurationData:

@Produces
@Identifier("my-consumer-options")
public ConsumerConfigurationData<String> getConsumerConfig() {
    ConsumerConfigurationData<String> data = new ConsumerConfigurationData<>();
    data.setAckReceiptEnabled(true);
    data.setCryptoKeyReader(DefaultCryptoKeyReader.builder()
            //...
            .build());
    return data;
}

This instance is retrieved and used to configure the client used by the connector. You need to indicate the name of the client using the client-configuration, consumer-configuration or producer-configuration attributes:

mp.messaging.incoming.prices.consumer-configuration=my-consumer-options

If no [client|consumer|producer]-configuration is configured, the connector will look for instances identified with the channel name:

@Produces
@Identifier("prices")
public ClientConfigurationData getClientConfig() {
    ClientConfigurationData data = new ClientConfigurationData();
    data.setEnableTransaction(true);
    data.setServiceUrlProvider(AutoClusterFailover.builder()
            // ...
            .build());
    return data;
}

You also can provide a Map<String, Object> containing configuration values by key:

1
2
3
4
5
6
7
8
9
@Produces
@Identifier("prices")
public Map<String, Object> getProducerconfig() {
    Map<String, Object> map = new HashMap<>();
    map.put("batcherBuilder", BatcherBuilder.KEY_BASED);
    map.put("sendTimeoutMs", 3000);
    map.put("customMessageRouter", new PartialRoundRobinMessageRouterImpl(4));
    return map;
}

Different configuration sources are loaded in the following order of precedence, from the least important to the highest:

  1. Map<String, Object> config map produced with default config identifier, default-pulsar-client, default-pulsar-consumer, default-pulsar-producer.
  2. Map<String, Object> config map produced with identifier in the configuration or channel name
  3. [Client|Producer|Consuemr]ConfigurationData object produced with identifier in the channel configuration or the channel name
  4. Channel configuration properties named with [Client|Producer|Consuemr]ConfigurationData field names.

Following is the configuration reference for the PulsarClient.

Corresponding sections list the Consumer Configuration Reference and Producer Configuration Reference. Configuration properties not configurable in configuration files (non-serializable) is noted in the column Config file.

PulsarClient Configuration Reference

Attribute Description Type Config file Default
serviceUrl Pulsar cluster HTTP URL to connect to a broker. String true
serviceUrlProvider The implementation class of ServiceUrlProvider used to generate ServiceUrl. ServiceUrlProvider false
authentication Authentication settings of the client. Authentication false
authPluginClassName Class name of authentication plugin of the client. String true
authParams Authentication parameter of the client. String true
authParamMap Authentication map of the client. Map true
operationTimeoutMs Client operation timeout (in milliseconds). long true 30000
lookupTimeoutMs Client lookup timeout (in milliseconds). long true -1
statsIntervalSeconds Interval to print client stats (in seconds). long true 60
numIoThreads Number of IO threads. int true 10
numListenerThreads Number of consumer listener threads. int true 10
connectionsPerBroker Number of connections established between the client and each Broker. A value of 0 means to disable connection pooling. int true 1
connectionMaxIdleSeconds Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections int true 180
useTcpNoDelay Whether to use TCP NoDelay option. boolean true true
useTls Whether to use TLS. boolean true false
tlsKeyFilePath Path to the TLS key file. String true
tlsCertificateFilePath Path to the TLS certificate file. String true
tlsTrustCertsFilePath Path to the trusted TLS certificate file. String true
tlsAllowInsecureConnection Whether the client accepts untrusted TLS certificates from the broker. boolean true false
tlsHostnameVerificationEnable Whether the hostname is validated when the client creates a TLS connection with brokers. boolean true false
concurrentLookupRequest The number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum prevents overloading a broker. int true 5000
maxLookupRequest Maximum number of lookup requests allowed on each broker connection to prevent overloading a broker. int true 50000
maxLookupRedirects Maximum times of redirected lookup requests. int true 20
maxNumberOfRejectedRequestPerConnection Maximum number of rejected requests of a broker in a certain time frame (60 seconds) after the current connection is closed and the client creating a new connection to connect to a different broker. int true 50
keepAliveIntervalSeconds Seconds of keeping alive interval for each client broker connection. int true 30
connectionTimeoutMs Duration of waiting for a connection to a broker to be established.If the duration passes without a response from a broker, the connection attempt is dropped. int true 10000
requestTimeoutMs Maximum duration for completing a request. int true 60000
readTimeoutMs Maximum read time of a request. int true 60000
autoCertRefreshSeconds Seconds of auto refreshing certificate. int true 300
initialBackoffIntervalNanos Initial backoff interval (in nanosecond). long true 100000000
maxBackoffIntervalNanos Max backoff interval (in nanosecond). long true 60000000000
enableBusyWait Whether to enable BusyWait for EpollEventLoopGroup. boolean true false
listenerName Listener name for lookup. Clients can use listenerName to choose one of the listeners as the service URL to create a connection to the broker as long as the network is accessible."advertisedListeners" must enabled in broker side. String true
useKeyStoreTls Set TLS using KeyStore way. boolean true false
sslProvider The TLS provider used by an internal client to authenticate with other Pulsar brokers. String true
tlsKeyStoreType TLS KeyStore type configuration. String true JKS
tlsKeyStorePath Path of TLS KeyStore. String true
tlsKeyStorePassword Password of TLS KeyStore. String true
tlsTrustStoreType TLS TrustStore type configuration. You need to set this configuration when client authentication is required. String true JKS
tlsTrustStorePath Path of TLS TrustStore. String true
tlsTrustStorePassword Password of TLS TrustStore. String true
tlsCiphers Set of TLS Ciphers. Set true []
tlsProtocols Protocols of TLS. Set true []
memoryLimitBytes Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput. long true 67108864
proxyServiceUrl URL of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. String true
proxyProtocol Protocol of proxy service. proxyServiceUrl and proxyProtocol must be mutually inclusive. ProxyProtocol true
enableTransaction Whether to enable transaction. boolean true false
clock Clock false
dnsLookupBindAddress The Pulsar client dns lookup bind address, default behavior is bind on 0.0.0.0 String true
dnsLookupBindPort The Pulsar client dns lookup bind port, takes effect when dnsLookupBindAddress is configured, default value is 0. int true 0
socks5ProxyAddress Address of SOCKS5 proxy. InetSocketAddress true
socks5ProxyUsername User name of SOCKS5 proxy. String true
socks5ProxyPassword Password of SOCKS5 proxy. String true
description The extra description of the client version. The length cannot exceed 64. String true