Skip to content

Receiving messages from Pulsar

The Pulsar Connector connects to a Pulsar broker using a Pulsar client and creates consumers to receive messages from Pulsar brokers and it maps each of them into Reactive Messaging Messages.

Example

Let’s imagine you have a Pulsar broker running, and accessible using the pulsar:6650 address (by default it would use localhost:6650). Configure your application to receive Pulsar messages on the prices channel as follows:

1
2
3
4
mp.messaging.incoming.prices.connector=smallrye-pulsar # <1>
mp.messaging.incoming.prices.serviceUrl=pulsar://pulsar:6650 # <2>
mp.messaging.incoming.prices.schema=DOUBLE # <3>
mp.messaging.incoming.prices.subscriptionInitialPosition=Earliest # <4>
  1. Sets the connector for the prices channel
  2. Configure the Pulsar broker service url.
  3. Configure the schema to consume prices as Double.
  4. Make sure consumer subscription starts receiving messages from the Earliest position.

Note

You don’t need to set the Pulsar topic, nor the consumer name. By default, the connector uses the channel name (prices). You can configure the topic and consumerName attributes to override them.

Note

In Pulsar, consumers need to provide a subscriptionName for topic subscriptions. If not provided the connector is generating a unique subscription name.

Then, your application can receive the double payload directly:

package pulsar.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class PulsarPriceConsumer {

    @Incoming("prices")
    public void consume(double price) {
        // process your price.
    }

}

Or, you can retrieve the Message<Double>:

package pulsar.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class PulsarPriceMessageConsumer {

    @Incoming("prices")
    public CompletionStage<Void> consume(Message<Double> price) {
        // process your price.

        // Acknowledge the incoming message
        return price.ack();
    }

}

Consumer Configuration

The Pulsar Connector allows flexibly configuring the underlying Pulsar consumer. One of the ways is to set consumer properties directly on the channel configuration. The list of available configuration properties are listed in Configuration Reference.

See the Configuring Pulsar consumers, producers and clients for more information.

Deserialization and Pulsar Schema

The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar consumer. See the Configuring the schema used for Pulsar channels for more information.

Inbound Metadata

The incoming Pulsar messages include an instance of PulsarIncomingMessageMetadata in the metadata. It provides the key, topic, partitions, headers and so on:

PulsarIncomingMessageMetadata metadata = incoming.getMetadata(PulsarIncomingMessageMetadata.class).orElse(null);
if (metadata != null) {
    // The topic
    String topic = metadata.getTopicName();

    // The key
    String key = metadata.getKey();

    // The event time
    long timestamp = metadata.getEventTime();

    // The raw data
    byte[] rawData = metadata.getData();

    // The underlying message
    org.apache.pulsar.client.api.Message<?> message = metadata.getMessage();

    // ...
}

Acknowledgement

When a message produced from a Pulsar Message is acknowledged, the connector sends an acknowledgement request to the Pulsar broker. All Reactive Messaging messages need to be acknowledged, which is handled automatically in most cases. Acknowledgement requests can be sent to the Pulsar broker using the following two strategies:

  • Individual acknowledgement is the default strategy, an acknowledgement request is to the broker for each message.
  • Cumulative acknowledgement, configured using ack-strategy=cumulative, the consumer only acknowledges the last message it received. All messages in the stream up to (and including) the provided message are not redelivered to that consumer.

Failure Management

If a message produced from a Pulsar message is nacked, a failure strategy is applied. The Pulsar connector supports 4 strategies:

  • nack (default) sends negative acknowledgment to the broker, triggering the broker to redeliver this message to the consumer. The negative acknowledgment can be further configured using negativeAckRedeliveryDelayMicros and negativeAck.redeliveryBackoff properties.
  • fail fail the application, no more messages will be processed.
  • ignore the failure is logged, but the acknowledgement strategy will be applied and the processing will continue.
  • continue the failure is logged, but processing continues without applying acknowledgement or negative acknowledgement. This strategy can be used with acknowledgement timeout configuration.
  • reconsume-later sends the message to the retry letter topic using the reconsumeLater API to be reconsumed with a delay. The delay can be configured using the reconsumeLater.delay property and defaults to 3 seconds. Custom delay or properties per message can be configured by adding an instance of PulsarReconsumeLaterMetadata to the failure metadata.

For example the following configuration for the incoming channel data uses reconsumer-later failure strategy with default delays of 60 seconds:

1
2
3
4
5
6
7
8
9
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.serviceUrl=pulsar://localhost:6650
mp.messaging.incoming.data.topic=data
mp.messaging.incoming.data.schema=INT32
mp.messaging.incoming.data.failure-strategy=reconsume-later
mp.messaging.incoming.data.retryEnable=true
mp.messaging.incoming.data.reconsumeLater.delay=60 // in seconds
mp.messaging.incoming.data.deadLetterPolicy.retryLetterTopic=data-retry
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2

Acknowledgement timeout

Similar to the negative acknowledgement, with the acknowledgment timeout mechanism, the Pulsar client tracks the unacknowledged messages, for the given ackTimeout period and sends redeliver unacknowledged messages request to the broker, thus the broker resends the unacknowledged messages to the consumer.

To configure the timeout and redelivery backoff mechanism you can set ackTimeoutMillis and ackTimeout.redeliveryBackoff properties. The ackTimeout.redeliveryBackoff value accepts comma separated values of min delay in milliseconds, max delay in milliseconds and multiplier respectively:

1
2
3
4
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.failure-strategy=continue
mp.messaging.incoming.data.ackTimeoutMillis=10000
mp.messaging.incoming.data.ackTimeout.redeliveryBackoff=1000,60000,2

Dead-letter topic

The dead letter topic pushes messages that are not consumed successfully to a dead letter topic an continue message consumption. Note that dead letter topic can be used in different message redelivery methods, such as acknowledgment timeout, negative acknowledgment or retry letter topic.

1
2
3
4
5
6
mp.messaging.incoming.data.connector=smallrye-pulsar
mp.messaging.incoming.data.failure-strategy=nack
mp.messaging.incoming.data.deadLetterPolicy.maxRedeliverCount=2
mp.messaging.incoming.data.deadLetterPolicy.deadLetterTopic=my-dead-letter-topic
mp.messaging.incoming.data.deadLetterPolicy.initialSubscriptionName=my-dlq-subscription
mp.messaging.incoming.data.subscriptionType=Shared

Important

Negative acknowledgment or acknowledgment timeout methods for redelivery will redeliver the whole batch of messages containing at least an unprocessed message. See producer batching for more information.

Receiving Pulsar Messages in Batches

By default, incoming methods receive each Pulsar message individually. You can enable batch mode using batchReceive=true property, or setting a batchReceivePolicy in consumer configuration.

@Incoming("prices")
public CompletionStage<Void> consumeMessage(Message<List<Double>> messages) {
    messages.getMetadata(PulsarIncomingBatchMessageMetadata.class).ifPresent(metadata -> {
        for (org.apache.pulsar.client.api.Message<Object> message : metadata.getMessages()) {
            String key = message.getKey();
            String topic = message.getTopicName();
            long timestamp = message.getEventTime();
            //... process messages
        }
    });
    // ack will commit the latest offsets (per partition) of the batch.
    return messages.ack();
}

@Incoming("prices")
public void consumeMessages(org.apache.pulsar.client.api.Messages<Double> messages) {
    for (org.apache.pulsar.client.api.Message<Double> msg : messages) {
        //... process messages
    }
}

Or you can directly receive the list of payloads to the consume method:

1
2
3
4
5
6
@Incoming("prices")
public void consume(List<Double> prices) {
    for (double price : prices) {
        // process price
    }
}

Accessing metadata of batch records

When receiving records in batch mode, the metadata of each record is accessible through the PulsarIncomingBatchMessageMetadata :

@Incoming("prices")
public void consumeMessages(org.apache.pulsar.client.api.Messages<Double> messages,
        PulsarIncomingBatchMessageMetadata metadata) {
    for (org.apache.pulsar.client.api.Message<Double> message : messages) {
        TracingMetadata tracing = metadata.getMetadataForMessage(message, TracingMetadata.class);
        if (tracing != null) {
            tracing.getCurrentContext().makeCurrent();
        }
        //... process messages
    }
}

Like in this example, this can be useful to propagate the tracing information of each record.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
ack-strategy Specify the commit strategy to apply when a message produced from a record is acknowledged. Values can be ack, cumulative. string false ack
ackTimeout.redeliveryBackoff Comma separated values for configuring ack timeout MultiplierRedeliveryBackoff, min delay, max delay, multiplier. string false
batchReceive Whether batch receive is used to consume messages boolean false false
client-configuration Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. string false
consumer-configuration Identifier of a CDI bean that provides the default Pulsar consumer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. string false
deadLetterPolicy.deadLetterTopic Name of the dead letter topic where the failing messages will be sent string false
deadLetterPolicy.initialSubscriptionName Name of the initial subscription name of the dead letter topic string false
deadLetterPolicy.maxRedeliverCount Maximum number of times that a message will be redelivered before being sent to the dead letter topic int false
deadLetterPolicy.retryLetterTopic Name of the retry topic where the failing messages will be sent string false
failure-strategy Specify the failure strategy to apply when a message produced from a record is acknowledged negatively (nack). Values can be nack (default), fail, ignore or reconsume-later | string | false |nack`
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
negativeAck.redeliveryBackoff Comma separated values for configuring negative ack MultiplierRedeliveryBackoff, min delay, max delay, multiplier. string false
reconsumeLater.delay Default delay for reconsume failure-strategy, in seconds long false 3
schema The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used. string false
serviceUrl The service URL for the Pulsar service string false pulsar://localhost:6650
topic The consumed / populated Pulsar topic. If not set, the channel name is used string false
tracing-enabled Whether tracing is enabled (default) or disabled boolean false true

In addition to the configuration properties provided by the connector, following Pulsar consumer properties can also be set on the channel:

Attribute Description Type Config file Default
topicNames Topic name Set true []
topicsPattern Topic pattern Pattern true
subscriptionName Subscription name String true
subscriptionType Subscription type.
Four subscription types are available:
* Exclusive
* Failover
* Shared
* Key_Shared
SubscriptionType true Exclusive
subscriptionProperties Map true
subscriptionMode SubscriptionMode true Durable
messageListener MessageListener false
consumerEventListener ConsumerEventListener false
negativeAckRedeliveryBackoff Interface for custom message is negativeAcked policy. You can specify RedeliveryBackoff for a consumer. RedeliveryBackoff false
ackTimeoutRedeliveryBackoff Interface for custom message is ackTimeout policy. You can specify RedeliveryBackoff for a consumer. RedeliveryBackoff false
receiverQueueSize Size of a consumer's receiver queue.

For example, the number of messages accumulated by a consumer before an application calls Receive.

A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.
int true 1000
acknowledgementsGroupTimeMicros Group a consumer acknowledgment for a specified time.

By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.

Setting a group time of 0 sends out acknowledgments immediately.

A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.
long true 100000
maxAcknowledgmentGroupSize Group a consumer acknowledgment for the number of messages. int true 1000
negativeAckRedeliveryDelayMicros Delay to wait before redelivering messages that failed to be processed.

When an application uses Consumer#negativeAcknowledge(Message), failed messages are redelivered after a fixed timeout.
long true 60000000
maxTotalReceiverQueueSizeAcrossPartitions The max total receiver queue size across partitions.

This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.
int true 50000
consumerName Consumer name String true
ackTimeoutMillis Timeout of unacked messages long true 0
tickDurationMillis Granularity of the ack-timeout redelivery.

Using an higher tickDurationMillis reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).
long true 1000
priorityLevel Priority level for a consumer to which a broker gives more priority while dispatching messages in Shared subscription type.

The broker follows descending priorities. For example, 0=max-priority, 1, 2,...

In Shared subscription type, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.

Example 1
If a subscription has consumerA with priorityLevel 0 and consumerB with priorityLevel 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.

Example 2
Consumer Priority, Level, Permits
C1, 0, 2
C2, 0, 1
C3, 0, 1
C4, 1, 2
C5, 1, 1

Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.
int true 0
maxPendingChunkedMessage The maximum size of a queue holding pending chunked messages. When the threshold is reached, the consumer drops pending messages to optimize memory utilization. int true 10
autoAckOldestChunkedMessageOnQueueFull Whether to automatically acknowledge pending chunked messages when the threshold of maxPendingChunkedMessage is reached. If set to false, these messages will be redelivered by their broker. boolean true false
expireTimeOfIncompleteChunkedMessageMillis The time interval to expire incomplete chunks if a consumer fails to receive all the chunks in the specified time period. The default value is 1 minute. long true 60000
cryptoKeyReader CryptoKeyReader false
messageCrypto MessageCrypto false
cryptoFailureAction Consumer should take action when it receives a message that can not be decrypted.
* FAIL: this is the default option to fail messages until crypto succeeds.
* DISCARD:silently acknowledge and not deliver message to an application.
* CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.

The decompression of message fails.

If messages contain batch messages, a client is not be able to retrieve individual messages in batch.

Delivered encrypted message contains EncryptionContext which contains encryption and compression information in it using which application can decrypt consumed message payload.
ConsumerCryptoFailureAction true FAIL
properties A name or value property of this consumer.

properties is application defined metadata attached to a consumer.

When getting a topic stats, associate this metadata with the consumer stats for easier identification.
SortedMap true {}
readCompacted If enabling readCompacted, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.

A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.

Only enabling readCompacted on subscriptions to persistent topics, which have a single active consumer (like failure or exclusive subscriptions).

Attempting to enable it on subscriptions to non-persistent topics or on shared subscriptions leads to a subscription call throwing a PulsarClientException.
boolean true false
subscriptionInitialPosition Initial position at which to set cursor when subscribing to a topic at first time. SubscriptionInitialPosition true Latest
patternAutoDiscoveryPeriod Topic auto discovery period when using a pattern for topic's consumer.

The default and minimum value is 1 minute.
int true 60
regexSubscriptionMode When subscribing to a topic using a regular expression, you can pick a certain type of topics.

* PersistentOnly: only subscribe to persistent topics.
* NonPersistentOnly: only subscribe to non-persistent topics.
* AllTopics: subscribe to both persistent and non-persistent topics.
RegexSubscriptionMode true PersistentOnly
deadLetterPolicy Dead letter policy for consumers.

By default, some messages are probably redelivered many times, even to the extent that it never stops.

By using the dead letter mechanism, messages have the max redelivery count. When exceeding the maximum number of redeliveries, messages are sent to the Dead Letter Topic and acknowledged automatically.

You can enable the dead letter mechanism by setting deadLetterPolicy.

Example

client.newConsumer()
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).build())
.subscribe();

Default dead letter topic name is {TopicName}-{Subscription}-DLQ.

To set a custom dead letter topic name:

client.newConsumer()
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10)
.deadLetterTopic("your-topic-name").build())
.subscribe();

When specifying the dead letter policy while not specifying ackTimeoutMillis, you can set the ack timeout to 30000 millisecond.
DeadLetterPolicy true
retryEnable boolean true false
batchReceivePolicy BatchReceivePolicy false
autoUpdatePartitions If autoUpdatePartitions is enabled, a consumer subscribes to partition increasement automatically.

Note: this is only for partitioned consumers.
boolean true true
autoUpdatePartitionsIntervalSeconds long true 60
replicateSubscriptionState If replicateSubscriptionState is enabled, a subscription state is replicated to geo-replicated clusters. boolean true false
resetIncludeHead boolean true false
keySharedPolicy KeySharedPolicy false
batchIndexAckEnabled boolean true false
ackReceiptEnabled boolean true false
poolMessages boolean true false
payloadProcessor MessagePayloadProcessor false
startPaused boolean true false
autoScaledReceiverQueueSizeEnabled boolean true false
topicConfigurations List true []