Receiving messages from Pulsar
The Pulsar Connector connects to a Pulsar broker using a Pulsar client and creates consumers to
receive messages from Pulsar brokers and it maps each of them into Reactive Messaging Messages
.
Example
Let’s imagine you have a Pulsar broker running, and accessible
using the pulsar:6650
address (by default it would use
localhost:6650
). Configure your application to receive Pulsar messages
on the prices
channel as follows:
- Sets the connector for the
prices
channel - Configure the Pulsar broker service url.
- Configure the schema to consume prices as Double.
- Make sure consumer subscription starts receiving messages from the
Earliest
position.
Note
You don’t need to set the Pulsar topic, nor the consumer name.
By default, the connector uses the channel name (prices
).
You can configure the topic
and consumerName
attributes to override them.
Note
In Pulsar, consumers need to provide a subscriptionName
for topic subscriptions.
If not provided the connector is generating a unique subscription name.
Then, your application can receive the double
payload directly:
Or, you can retrieve the Message<Double>
:
Consumer Configuration
The Pulsar Connector allows flexibly configuring the underlying Pulsar consumer. One of the ways is to set consumer properties directly on the channel configuration. The list of available configuration properties are listed in Configuration Reference.
See the Configuring Pulsar consumers, producers and clients for more information.
Deserialization and Pulsar Schema
The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar consumer. See the Configuring the schema used for Pulsar channels for more information.
Inbound Metadata
The incoming Pulsar messages include an instance of PulsarIncomingMessageMetadata
in the metadata.
It provides the key, topic, partitions, headers and so on:
Acknowledgement
When a message produced from a Pulsar Message is acknowledged, the connector sends an acknowledgement request to the Pulsar broker. All Reactive Messaging messages need to be acknowledged, which is handled automatically in most cases. Acknowledgement requests can be sent to the Pulsar broker using the following two strategies:
- Individual acknowledgement is the default strategy, an acknowledgement request is to the broker for each message.
- Cumulative acknowledgement, configured using
ack-strategy=cumulative
, the consumer only acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.
Failure Management
If a message produced from a Pulsar message is nacked, a failure strategy is applied. The Pulsar connector supports 4 strategies:
nack
(default) sends negative acknowledgment to the broker, triggering the broker to redeliver this message to the consumer. The negative acknowledgment can be further configured usingnegativeAckRedeliveryDelayMicros
andnegativeAck.redeliveryBackoff
properties.fail
fail the application, no more messages will be processed.ignore
the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.continue
the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with acknowledgement timeout configuration.reconsume-later
sends the message to the retry letter topic using thereconsumeLater
API to be reconsumed with a delay. The delay can be configured using thereconsumeLater.delay
property and defaults to 3 seconds. Custom delay or properties per message can be configured by adding an instance of PulsarReconsumeLaterMetadata to the failure metadata.
For example the following configuration for the incoming channel data
uses reconsumer-later
failure strategy with default delays of 60 seconds:
Acknowledgement timeout
Similar to the negative acknowledgement, with the acknowledgment timeout mechanism, the Pulsar client tracks the unacknowledged messages, for the given ackTimeout period and sends redeliver unacknowledged messages request to the broker, thus the broker resends the unacknowledged messages to the consumer.
To configure the timeout and redelivery backoff mechanism you can set ackTimeoutMillis
and ackTimeout.redeliveryBackoff
properties.
The ackTimeout.redeliveryBackoff
value accepts comma separated values of min delay in milliseconds, max delay in milliseconds and multiplier respectively:
Dead-letter topic
The dead letter topic pushes messages that are not consumed successfully to a dead letter topic an continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.
Important
Negative acknowledgment or acknowledgment timeout methods for redelivery will redeliver the whole batch of messages containing at least an unprocessed message. See producer batching for more information.
Receiving Pulsar Messages in Batches
By default, incoming methods receive each Pulsar message individually.
You can enable batch mode using batchReceive=true
property, or setting a batchReceivePolicy
in consumer configuration.
Or you can directly receive the list of payloads to the consume method:
Accessing metadata of batch records
When receiving records in batch mode, the metadata of each record is accessible through the PulsarIncomingBatchMessageMetadata
:
Like in this example, this can be useful to propagate the tracing information of each record.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
ack-strategy | Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be ack , cumulative . |
string | false | ack |
ackTimeout.redeliveryBackoff | Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier. | string | false | |
batchReceive | Whether batch receive is used to consume messages | boolean | false | false |
client-configuration | Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map |
string | false | |
consumer-configuration | Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map |
string | false | |
deadLetterPolicy.deadLetterTopic | Name of the dead letter topic where the failing messages will be sent | string | false | |
deadLetterPolicy.initialSubscriptionName | Name of the initial subscription name of the dead letter topic | string | false | |
deadLetterPolicy.maxRedeliverCount | Maximum number of times that a message will be redelivered before being sent to the dead letter topic | int | false | |
deadLetterPolicy.retryLetterTopic | Name of the retry topic where the failing messages will be sent | string | false | |
failure-strategy | Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be nack (default), fail , ignore or reconsume-later | string | false | nack` |
|||
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
negativeAck.redeliveryBackoff | Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier. | string | false | |
reconsumeLater.delay | Default delay for reconsume failure-strategy, in seconds | long | false | 3 |
schema | The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used. |
string | false | |
serviceUrl | The service URL for the Pulsar service | string | false | pulsar://localhost:6650 |
topic | The consumed / populated Pulsar topic. If not set, the channel name is used | string | false | |
tracing-enabled | Whether tracing is enabled (default) or disabled | boolean | false | true |
In addition to the configuration properties provided by the connector, following Pulsar consumer properties can also be set on the channel:
Attribute | Description | Type | Config file | Default |
---|---|---|---|---|
topicNames | Topic name | Set | true | [] |
topicsPattern | Topic pattern | Pattern | true | |
subscriptionName | Subscription name | String | true | |
subscriptionType | Subscription type. Four subscription types are available: * Exclusive * Failover * Shared * Key_Shared |
SubscriptionType | true | Exclusive |
subscriptionProperties | Map | true | ||
subscriptionMode | SubscriptionMode | true | Durable | |
messageListener | MessageListener | false | ||
consumerEventListener | ConsumerEventListener | false | ||
negativeAckRedeliveryBackoff | Interface for custom message is negativeAcked policy. You can specify RedeliveryBackoff for a consumer. |
RedeliveryBackoff | false | |
ackTimeoutRedeliveryBackoff | Interface for custom message is ackTimeout policy. You can specify RedeliveryBackoff for a consumer. |
RedeliveryBackoff | false | |
receiverQueueSize | Size of a consumer's receiver queue. For example, the number of messages accumulated by a consumer before an application calls Receive .A value higher than the default value increases consumer throughput, though at the expense of more memory utilization. |
int | true | 1000 |
acknowledgementsGroupTimeMicros | Group a consumer acknowledgment for a specified time. By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker. Setting a group time of 0 sends out acknowledgments immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure. |
long | true | 100000 |
maxAcknowledgmentGroupSize | Group a consumer acknowledgment for the number of messages. | int | true | 1000 |
negativeAckRedeliveryDelayMicros | Delay to wait before redelivering messages that failed to be processed. When an application uses Consumer#negativeAcknowledge(Message) , failed messages are redelivered after a fixed timeout. |
long | true | 60000000 |
maxTotalReceiverQueueSizeAcrossPartitions | The max total receiver queue size across partitions. This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value. |
int | true | 50000 |
consumerName | Consumer name | String | true | |
ackTimeoutMillis | Timeout of unacked messages | long | true | 0 |
tickDurationMillis | Granularity of the ack-timeout redelivery. Using an higher tickDurationMillis reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour). |
long | true | 1000 |
priorityLevel | Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type. The broker follows descending priorities. For example, 0=max-priority, 1, 2,... In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers. Example 1 If a subscription has consumerA with priorityLevel 0 and consumerB with priorityLevel 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.Example 2 Consumer Priority, Level, Permits C1, 0, 2 C2, 0, 1 C3, 0, 1 C4, 1, 2 C5, 1, 1 Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4. |
int | true | 0 |
maxPendingChunkedMessage | The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization. | int | true | 10 |
autoAckOldestChunkedMessageOnQueueFull | Whether to automatically acknowledge pending chunked messages when the threshold of maxPendingChunkedMessage is reached. If set to false , these messages will be redelivered by their broker. |
boolean | true | false |
expireTimeOfIncompleteChunkedMessageMillis | The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute. | long | true | 60000 |
cryptoKeyReader | CryptoKeyReader | false | ||
messageCrypto | MessageCrypto | false | ||
cryptoFailureAction | Consumer should take action when it receives a message that can not be decrypted. * FAIL: this is the default option to fail messages until crypto succeeds. * DISCARD:silently acknowledge and not deliver message to an application. * CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message. The decompression of message fails. If messages contain batch messages, a client is not be able to retrieve individual messages in batch. Delivered encrypted message contains EncryptionContext which contains encryption and compression information in it using which application can decrypt consumed message payload. |
ConsumerCryptoFailureAction | true | FAIL |
properties | A name or value property of this consumer.properties is application defined metadata attached to a consumer.When getting a topic stats, associate this metadata with the consumer stats for easier identification. |
SortedMap | true | {} |
readCompacted | If enabling readCompacted , a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal. Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException . |
boolean | true | false |
subscriptionInitialPosition | Initial position at which to set cursor when subscribing to a topic at first time. | SubscriptionInitialPosition | true | Latest |
patternAutoDiscoveryPeriod | Topic auto discovery period when using a pattern for topic's consumer. The default and minimum value is 1 minute. |
int | true | 60 |
regexSubscriptionMode | When subscribing to a topic using a regular expression, you can pick a certain type of topics. * PersistentOnly: only subscribe to persistent topics. * NonPersistentOnly: only subscribe to non-persistent topics. * AllTopics: subscribe to both persistent and non-persistent topics. |
RegexSubscriptionMode | true | PersistentOnly |
deadLetterPolicy | Dead letter policy for consumers. By default, some messages are probably redelivered many times, even to the extent that it never stops. By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically. You can enable the dead letter mechanism by setting deadLetterPolicy .Example
Default dead letter topic name is {TopicName}-{Subscription}-DLQ .To set a custom dead letter topic name:
When specifying the dead letter policy while not specifying ackTimeoutMillis , you can set the ack timeout to 30000 millisecond. |
DeadLetterPolicy | true | |
retryEnable | boolean | true | false | |
batchReceivePolicy | BatchReceivePolicy | false | ||
autoUpdatePartitions | If autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.Note: this is only for partitioned consumers. |
boolean | true | true |
autoUpdatePartitionsIntervalSeconds | long | true | 60 | |
replicateSubscriptionState | If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters. |
boolean | true | false |
resetIncludeHead | boolean | true | false | |
keySharedPolicy | KeySharedPolicy | false | ||
batchIndexAckEnabled | boolean | true | false | |
ackReceiptEnabled | boolean | true | false | |
poolMessages | boolean | true | false | |
payloadProcessor | MessagePayloadProcessor | false | ||
startPaused | boolean | true | false | |
autoScaledReceiverQueueSizeEnabled | boolean | true | false | |
topicConfigurations | List | true | [] |