Skip to content

Sending messages to Pulsar

The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.

Example

Let’s imagine you have a Pulsar broker running, and accessible using the pulsar:6650 address (by default it would use localhost:1883). Configure your application to write the messages from the prices channel into a Pulsar Messages as follows:

1
2
3
mp.messaging.outgoing.prices.connector=smallrye-pulsar # <1>
mp.messaging.outgoing.prices.serviceUrl=pulsar://pulsar:6650 # <2>
mp.messaging.outgoing.prices.schema=DOUBLE # <3>
  1. Sets the connector for the prices channel
  2. Configure the Pulsar broker service url.
  3. Configure the schema to consume prices as Double.

Note

You don’t need to set the Pulsar topic, nor the producer name. By default, the connector uses the channel name (prices). You can configure the topic and producerName attributes to override them.

Then, your application must send Message<Double> to the prices channel. It can use double payloads as in the following snippet:

package pulsar.outbound;

import java.time.Duration;
import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PulsarPriceProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Double> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> random.nextDouble());
    }

}

Or, you can send Message<Double>:

package pulsar.outbound;

import java.time.Duration;
import java.util.Random;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

@ApplicationScoped
public class PulsarPriceMessageProducer {

    private Random random = new Random();

    @Outgoing("prices")
    public Multi<Message<Double>> generate() {
        // Build an infinite stream of random prices
        // It emits a price every second
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
                .map(x -> Message.of(random.nextDouble()));
    }

}

Producer Configuration

The Pulsar Connector allows flexibly configuring the underlying Pulsar producer. One of the ways is to set producer properties directly on the channel configuration. The list of available configuration properties are listed in Configuration Reference.

See the Configuring Pulsar consumers, producers and clients for more information.

Serialization and Pulsar Schema

The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. See the Configuring the schema used for Pulsar channels for more information.

Sending key/value pairs

In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a org.apache.pulsar.common.schema.KeyValue type:

1
2
3
4
5
6
7
8
9
@Identifier("out")
@Produces
Schema<KeyValue<String, Long>> schema = Schema.KeyValue(Schema.STRING, Schema.INT64);

@Incoming("in")
@Outgoing("out")
public KeyValue<String, Long> process(long in) {
    return new KeyValue<>("my-key", in);
}

If you need more control on the written records, use PulsarOutgoingMessageMetadata.

Outbound Metadata

When sending Messages, you can add an instance of PulsarOutgoingMessageMetadata to influence how the message is going to be written to Pulsar. For example, configure the record key, and set message properties:

// Creates an PulsarOutgoingMessageMetadata
// The type parameter is the type of the record's key
PulsarOutgoingMessageMetadata metadata = PulsarOutgoingMessageMetadata.builder()
        .withKey("my-key")
        .withProperties(Map.of("property-key", "value"))
        .build();

// Create a new message from the `incoming` message
// Add `metadata` to the metadata from the `incoming` message.
return incoming.addMetadata(metadata);

OutgoingMessage

Using OutgoingMessage, is an easy way of customizing the Pulsar message to be published when dealing with payloads and not Messages.

You can create an OutgoingMessage with key and value, or from an incoming Pulsar Message:

1
2
3
4
5
6
@Incoming("in")
@Outgoing("out")
OutgoingMessage<Long> process(org.apache.pulsar.client.api.Message<String> in) {
    return OutgoingMessage.from(in)
            .withValue(Long.valueOf(in.getValue()));
}

Acknowledgement

Upon receiving a message from a Producer, a Pulsar broker assigns a MessageId to the message and sends it back to the producer, confirming that the message is published.

By default, the connector does wait for Pulsar to acknowledge the record to continue the processing (acknowledging the received Message). You can disable this by setting the waitForWriteCompletion attribute to false.

If a record cannot be written, the message is nacked.

Important

The Pulsar client automatically retries sending messages in case of failure, until the send timeout is reached. The send timeout is configurable with sendTimeoutMs attribute and by default is is 30 seconds.

Back-pressure and inflight records

The Pulsar outbound connector handles back-pressure monitoring the number of pending messages waiting to be written to the Pulsar broker. The number of pending messages is configured using the max-inflight-messages attribute and defaults to 1000.

The connector only sends that amount of messages concurrently. No other messages will be sent until at least one pending message gets acknowledged by the broker. Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged.

The Pulsar producer also has a limit on the number of pending messages controlled by maxPendingMessages and for partitioned topics maxPendingMessagesAcrossPartitions producer configuration properties. The default value for the both is 0 which means the limit is enforced by the connector and not the underlying producer. If a maxPendingMessagesAcrossPartitions is set and subscribed topic is partitioned, the connector will set the max-inflight-messages use the producer's limit.

Deprecation

Previously a single connector attribute maxPendingMessages controlled both the connector back-pressure and the Pulsar producer configuration. The max-inflight-messages attribute is not the same as the Pulsar producer maxPendingMessages configuration property.

Producer Batching

By default, the Pulsar producer batches individual messages together to be published to the broker. You can configure batching parameters using batchingMaxPublishDelayMicros, batchingPartitionSwitchFrequencyByPublishDelay, batchingMaxMessages, batchingMaxBytes configuration properties, or disable it completely with batchingEnabled=false.

When using Key_Shared consumer subscriptions, the batcherBuilder can be configured to BatcherBuilder.KEY_BASED.

Configuration Reference

Attribute (alias) Description Type Mandatory Default
client-configuration Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. string false
health-enabled Whether health reporting is enabled (default) or disabled boolean false true
max-inflight-messages The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker. Defaults to 1000 messages int false
producer-configuration Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map and must use the @io.smallrye.common.annotation.Identifier qualifier to set the identifier. string false
schema The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used. string false
serviceUrl The service URL for the Pulsar service string false pulsar://localhost:6650
topic The consumed / populated Pulsar topic. If not set, the channel name is used string false
tracing-enabled Whether tracing is enabled (default) or disabled boolean false true
waitForWriteCompletion Whether the client waits for the broker to acknowledge the written record before acknowledging the message boolean false true

In addition to the configuration properties provided by the connector, following Pulsar producer properties can also be set on the channel:

Attribute Description Type Config file Default
topicName Topic name String true
producerName Producer name String true
sendTimeoutMs Message send timeout in ms.
If a message is not acknowledged by a server before the sendTimeout expires, an error occurs.
long true 30000
blockIfQueueFull If it is set to true, when the outgoing message queue is full, the Send and SendAsync methods of producer block, rather than failing and throwing errors.
If it is set to false, when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.

The MaxPendingMessages parameter determines the size of the outgoing message queue.
boolean true false
maxPendingMessages The maximum size of a queue holding pending messages.

For example, a message waiting to receive an acknowledgment from a broker.

By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true.
int true 0
maxPendingMessagesAcrossPartitions The maximum number of pending messages across partitions.

Use the setting to lower the max pending messages for each partition (#setMaxPendingMessages(int)) if the total number exceeds the configured value.
int true 0
messageRoutingMode Message routing logic for producers on partitioned topics.
Apply the logic only when setting no key on messages.
Available options are as follows:
* pulsar.RoundRobinDistribution: round robin
* pulsar.UseSinglePartition: publish all messages to a single partition
* pulsar.CustomPartition: a custom partitioning scheme
MessageRoutingMode true
hashingScheme Hashing function determining the partition where you publish a particular message (partitioned topics only).
Available options are as follows:
* pulsar.JavastringHash: the equivalent of string.hashCode() in Java
* pulsar.Murmur3_32Hash: applies the Murmur3 hashing function
* pulsar.BoostHash: applies the hashing function from C++'sBoost library
HashingScheme true JavaStringHash
cryptoFailureAction Producer should take action when encryption fails.
* FAIL: if encryption fails, unencrypted messages fail to send.
* SEND: if encryption fails, unencrypted messages are sent.
ProducerCryptoFailureAction true FAIL
customMessageRouter MessageRouter false
batchingMaxPublishDelayMicros Batching time period of sending messages. long true 1000
batchingPartitionSwitchFrequencyByPublishDelay int true 10
batchingMaxMessages The maximum number of messages permitted in a batch. int true 1000
batchingMaxBytes int true 131072
batchingEnabled Enable batching of messages. boolean true true
batcherBuilder BatcherBuilder false
chunkingEnabled Enable chunking of messages. boolean true false
chunkMaxMessageSize int true -1
cryptoKeyReader CryptoKeyReader false
messageCrypto MessageCrypto false
encryptionKeys Set true []
compressionType Message data compression type used by a producer.
Available options:
* LZ4
* ZLIB
* ZSTD
* SNAPPY
CompressionType true NONE
initialSequenceId Long true
autoUpdatePartitions boolean true true
autoUpdatePartitionsIntervalSeconds long true 60
multiSchema boolean true true
accessMode ProducerAccessMode true Shared
lazyStartPartitionedProducers boolean true false
properties SortedMap true {}
initialSubscriptionName Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created. String true