Sending messages to Pulsar
The Pulsar Connector can write Reactive Messaging Message
s as Pulsar Message.
Example
Let’s imagine you have a Pulsar broker running, and accessible
using the pulsar:6650
address (by default it would use
localhost:1883
). Configure your application to write the messages from
the prices
channel into a Pulsar Messages as follows:
- Sets the connector for the
prices
channel - Configure the Pulsar broker service url.
- Configure the schema to consume prices as Double.
Note
You don’t need to set the Pulsar topic, nor the producer name.
By default, the connector uses the channel name (prices
).
You can configure the topic
and producerName
attributes to override them.
Then, your application must send Message<Double>
to the prices
channel. It can use double
payloads as in the following snippet:
Or, you can send Message<Double>
:
Producer Configuration
The Pulsar Connector allows flexibly configuring the underlying Pulsar producer. One of the ways is to set producer properties directly on the channel configuration. The list of available configuration properties are listed in Configuration Reference.
See the Configuring Pulsar consumers, producers and clients for more information.
Serialization and Pulsar Schema
The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. See the Configuring the schema used for Pulsar channels for more information.
Sending key/value pairs
In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a org.apache.pulsar.common.schema.KeyValue type:
If you need more control on the written records, use
PulsarOutgoingMessageMetadata
.
Outbound Metadata
When sending Message
s, you can add an instance of
PulsarOutgoingMessageMetadata
to influence how the message is going to be written to Pulsar.
For example, configure the record key, and set message properties:
OutgoingMessage
Using OutgoingMessage,
is an easy way of customizing the Pulsar message to be published when dealing with payloads and not Message
s.
You can create an OutgoingMessage
with key and value, or from an incoming Pulsar Message:
Acknowledgement
Upon receiving a message from a Producer, a Pulsar broker assigns a MessageId
to the message and sends it back to the producer,
confirming that the message is published.
By default, the connector does wait for Pulsar to acknowledge the record
to continue the processing (acknowledging the received Message
).
You can disable this by setting the waitForWriteCompletion
attribute to false
.
If a record cannot be written, the message is nacked
.
Important
The Pulsar client automatically retries sending messages in case of failure, until the send timeout is reached.
The send timeout is configurable with sendTimeoutMs
attribute and by default is is 30 seconds.
Back-pressure and inflight records
The Pulsar outbound connector handles back-pressure monitoring the number
of pending messages waiting to be written to the Pulsar broker.
The number of pending messages is configured using the
maxPendingMessages
attribute and defaults to 1000.
The connector only sends that amount of messages concurrently. No other messages will be sent until at least one pending message gets acknowledged by the broker. Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged.
You can also remove the limit of pending messages by setting maxPendingMessages
to 0
.
Note that Pulsar also enables to configure the number of pending messages per partition using maxPendingMessagesAcrossPartitions
.
Producer Batching
By default, the Pulsar producer batches individual messages together to be published to the broker.
You can configure batching parameters using batchingMaxPublishDelayMicros
, batchingPartitionSwitchFrequencyByPublishDelay
,
batchingMaxMessages
, batchingMaxBytes
configuration properties, or disable it completely with batchingEnabled=false
.
When using Key_Shared
consumer subscriptions, the batcherBuilder
can be configured to BatcherBuilder.KEY_BASED
.
Configuration Reference
Attribute (alias) | Description | Type | Mandatory | Default |
---|---|---|---|---|
client-configuration | Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map |
string | false | |
health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true |
maxPendingMessages | The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker | int | false | 1000 |
producer-configuration | Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map |
string | false | |
schema | The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schema qualified with @Identifier and the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used. |
string | false | |
serviceUrl | The service URL for the Pulsar service | string | false | pulsar://localhost:6650 |
topic | The consumed / populated Pulsar topic. If not set, the channel name is used | string | false | |
tracing-enabled | Whether tracing is enabled (default) or disabled | boolean | false | true |
waitForWriteCompletion | Whether the client waits for the broker to acknowledge the written record before acknowledging the message | boolean | false | true |
In addition to the configuration properties provided by the connector, following Pulsar producer properties can also be set on the channel:
Attribute | Description | Type | Config file | Default |
---|---|---|---|---|
topicName | Topic name | String | true | |
producerName | Producer name | String | true | |
sendTimeoutMs | Message send timeout in ms. If a message is not acknowledged by a server before the sendTimeout expires, an error occurs. |
long | true | 30000 |
blockIfQueueFull | If it is set to true , when the outgoing message queue is full, the Send and SendAsync methods of producer block, rather than failing and throwing errors.If it is set to false , when the outgoing message queue is full, the Send and SendAsync methods of producer fail and ProducerQueueIsFullError exceptions occur.The MaxPendingMessages parameter determines the size of the outgoing message queue. |
boolean | true | false |
maxPendingMessages | The maximum size of a queue holding pending messages. For example, a message waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the Send and SendAsync methods fail unless you set BlockIfQueueFull to true . |
int | true | 0 |
maxPendingMessagesAcrossPartitions | The maximum number of pending messages across partitions. Use the setting to lower the max pending messages for each partition ( #setMaxPendingMessages(int) ) if the total number exceeds the configured value. |
int | true | 0 |
messageRoutingMode | Message routing logic for producers on partitioned topics. Apply the logic only when setting no key on messages. Available options are as follows: * pulsar.RoundRobinDistribution : round robin* pulsar.UseSinglePartition : publish all messages to a single partition* pulsar.CustomPartition : a custom partitioning scheme |
MessageRoutingMode | true | |
hashingScheme | Hashing function determining the partition where you publish a particular message (partitioned topics only). Available options are as follows: * pulsar.JavastringHash : the equivalent of string.hashCode() in Java* pulsar.Murmur3_32Hash : applies the Murmur3 hashing function* pulsar.BoostHash : applies the hashing function from C++'sBoost library |
HashingScheme | true | JavaStringHash |
cryptoFailureAction | Producer should take action when encryption fails. * FAIL: if encryption fails, unencrypted messages fail to send. * SEND: if encryption fails, unencrypted messages are sent. |
ProducerCryptoFailureAction | true | FAIL |
customMessageRouter | MessageRouter | false | ||
batchingMaxPublishDelayMicros | Batching time period of sending messages. | long | true | 1000 |
batchingPartitionSwitchFrequencyByPublishDelay | int | true | 10 | |
batchingMaxMessages | The maximum number of messages permitted in a batch. | int | true | 1000 |
batchingMaxBytes | int | true | 131072 | |
batchingEnabled | Enable batching of messages. | boolean | true | true |
batcherBuilder | BatcherBuilder | false | ||
chunkingEnabled | Enable chunking of messages. | boolean | true | false |
chunkMaxMessageSize | int | true | -1 | |
cryptoKeyReader | CryptoKeyReader | false | ||
messageCrypto | MessageCrypto | false | ||
encryptionKeys | Set | true | [] | |
compressionType | Message data compression type used by a producer. Available options: * LZ4 * ZLIB * ZSTD * SNAPPY |
CompressionType | true | NONE |
initialSequenceId | Long | true | ||
autoUpdatePartitions | boolean | true | true | |
autoUpdatePartitionsIntervalSeconds | long | true | 60 | |
multiSchema | boolean | true | true | |
accessMode | ProducerAccessMode | true | Shared | |
lazyStartPartitionedProducers | boolean | true | false | |
properties | SortedMap | true | {} | |
initialSubscriptionName | Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created. | String | true |