Sending messages to Pulsar
The Pulsar Connector can write Reactive Messaging Messages as Pulsar Message.
Example
Let’s imagine you have a Pulsar broker running, and accessible
using the pulsar:6650 address (by default it would use
localhost:1883). Configure your application to write the messages from
the prices channel into a Pulsar Messages as follows:
- Sets the connector for the priceschannel
- Configure the Pulsar broker service url.
- Configure the schema to consume prices as Double.
Note
You don’t need to set the Pulsar topic, nor the producer name.
By default, the connector uses the channel name (prices).
You can configure the topic and producerName attributes to override them.
Then, your application must send Message<Double> to the prices
channel. It can use double payloads as in the following snippet:
Or, you can send Message<Double>:
Producer Configuration
The Pulsar Connector allows flexibly configuring the underlying Pulsar producer. One of the ways is to set producer properties directly on the channel configuration. The list of available configuration properties are listed in Configuration Reference.
See the Configuring Pulsar consumers, producers and clients for more information.
Serialization and Pulsar Schema
The Pulsar Connector allows configuring Schema configuration for the underlying Pulsar producer. See the Configuring the schema used for Pulsar channels for more information.
Sending key/value pairs
In order to send Kev/Value pairs to Pulsar, you can configure the Pulsar producer Schema with a org.apache.pulsar.common.schema.KeyValue type:
If you need more control on the written records, use
PulsarOutgoingMessageMetadata.
Outbound Metadata
When sending Messages, you can add an instance of
PulsarOutgoingMessageMetadata
to influence how the message is going to be written to Pulsar.
For example, configure the record key, and set message properties:
OutgoingMessage
Using OutgoingMessage,
is an easy way of customizing the Pulsar message to be published when dealing with payloads and not Messages.
You can create an OutgoingMessage with key and value, or from an incoming Pulsar Message:
Acknowledgement
Upon receiving a message from a Producer, a Pulsar broker assigns a MessageId to the message and sends it back to the producer,
confirming that the message is published.
By default, the connector does wait for Pulsar to acknowledge the record
to continue the processing (acknowledging the received Message).
You can disable this by setting the waitForWriteCompletion attribute to false.
If a record cannot be written, the message is nacked.
Important
The Pulsar client automatically retries sending messages in case of failure, until the send timeout is reached.
The send timeout is configurable with sendTimeoutMs attribute and by default is is 30 seconds.
Back-pressure and inflight records
The Pulsar outbound connector handles back-pressure monitoring the number
of pending messages waiting to be written to the Pulsar broker.
The number of pending messages is configured using the
max-inflight-messages attribute and defaults to 1000.
The connector only sends that amount of messages concurrently. No other messages will be sent until at least one pending message gets acknowledged by the broker. Then, the connector writes a new message to Pulsar when one of the broker’s pending messages get acknowledged.
The Pulsar producer also has a limit on the number of pending messages controlled by
maxPendingMessages and for partitioned topics maxPendingMessagesAcrossPartitions producer configuration properties.
The default value for the both is 0 which means the limit is enforced by the connector and not the underlying producer.
If a maxPendingMessagesAcrossPartitions is set and subscribed topic is partitioned, the connector will set the max-inflight-messages use the producer's limit.
Deprecation
Previously a single connector attribute maxPendingMessages controlled both the connector back-pressure and the Pulsar producer configuration.
The max-inflight-messages attribute is not the same as the Pulsar producer maxPendingMessages configuration property.
Producer Batching
By default, the Pulsar producer batches individual messages together to be published to the broker.
You can configure batching parameters using batchingMaxPublishDelayMicros, batchingPartitionSwitchFrequencyByPublishDelay,
batchingMaxMessages, batchingMaxBytes configuration properties, or disable it completely with batchingEnabled=false.
When using Key_Shared consumer subscriptions, the batcherBuilder can be configured to BatcherBuilder.KEY_BASED.
Configuration Reference
| Attribute (alias) | Description | Type | Mandatory | Default | 
|---|---|---|---|---|
| client-configuration | Identifier of a CDI bean that provides the default Pulsar client configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map | string | false | |
| health-enabled | Whether health reporting is enabled (default) or disabled | boolean | false | true | 
| max-inflight-messages | The maximum size of a queue holding pending messages, i.e messages waiting to receive an acknowledgment from a broker. Defaults to 1000 messages | int | false | |
| producer-configuration | Identifier of a CDI bean that provides the default Pulsar producer configuration for this channel. The channel configuration can still override any attribute. The bean must have a type of Map | string | false | |
| schema | The Pulsar schema type of this channel. When configured a schema is built with the given SchemaType and used for the channel. When absent, the schema is resolved searching for a CDI bean typed Schemaqualified with@Identifierand the channel name. As a fallback AUTO_CONSUME or AUTO_PRODUCE are used. | string | false | |
| serviceUrl | The service URL for the Pulsar service | string | false | pulsar://localhost:6650 | 
| topic | The consumed / populated Pulsar topic. If not set, the channel name is used | string | false | |
| tracing-enabled | Whether tracing is enabled (default) or disabled | boolean | false | true | 
| waitForWriteCompletion | Whether the client waits for the broker to acknowledge the written record before acknowledging the message | boolean | false | true | 
In addition to the configuration properties provided by the connector, following Pulsar producer properties can also be set on the channel:
| Attribute | Description | Type | Config file | Default | 
|---|---|---|---|---|
| topicName | Topic name | String | true | |
| producerName | Producer name | String | true | |
| sendTimeoutMs | Message send timeout in ms. If a message is not acknowledged by a server before the sendTimeoutexpires, an error occurs. | long | true | 30000 | 
| blockIfQueueFull | If it is set to true, when the outgoing message queue is full, theSendandSendAsyncmethods of producer block, rather than failing and throwing errors.If it is set to false, when the outgoing message queue is full, theSendandSendAsyncmethods of producer fail andProducerQueueIsFullErrorexceptions occur.The MaxPendingMessagesparameter determines the size of the outgoing message queue. | boolean | true | false | 
| maxPendingMessages | The maximum size of a queue holding pending messages. For example, a message waiting to receive an acknowledgment from a broker. By default, when the queue is full, all calls to the SendandSendAsyncmethods fail unless you setBlockIfQueueFulltotrue. | int | true | 0 | 
| maxPendingMessagesAcrossPartitions | The maximum number of pending messages across partitions. Use the setting to lower the max pending messages for each partition ( #setMaxPendingMessages(int)) if the total number exceeds the configured value. | int | true | 0 | 
| messageRoutingMode | Message routing logic for producers on partitioned topics. Apply the logic only when setting no key on messages. Available options are as follows: * pulsar.RoundRobinDistribution: round robin* pulsar.UseSinglePartition: publish all messages to a single partition* pulsar.CustomPartition: a custom partitioning scheme | MessageRoutingMode | true | |
| hashingScheme | Hashing function determining the partition where you publish a particular message (partitioned topics only). Available options are as follows: * pulsar.JavastringHash: the equivalent ofstring.hashCode()in Java* pulsar.Murmur3_32Hash: applies the Murmur3 hashing function* pulsar.BoostHash: applies the hashing function from C++'sBoost library | HashingScheme | true | JavaStringHash | 
| cryptoFailureAction | Producer should take action when encryption fails. * FAIL: if encryption fails, unencrypted messages fail to send. * SEND: if encryption fails, unencrypted messages are sent. | ProducerCryptoFailureAction | true | FAIL | 
| customMessageRouter | MessageRouter | false | ||
| batchingMaxPublishDelayMicros | Batching time period of sending messages. | long | true | 1000 | 
| batchingPartitionSwitchFrequencyByPublishDelay | int | true | 10 | |
| batchingMaxMessages | The maximum number of messages permitted in a batch. | int | true | 1000 | 
| batchingMaxBytes | int | true | 131072 | |
| batchingEnabled | Enable batching of messages. | boolean | true | true | 
| batcherBuilder | BatcherBuilder | false | ||
| chunkingEnabled | Enable chunking of messages. | boolean | true | false | 
| chunkMaxMessageSize | int | true | -1 | |
| cryptoKeyReader | CryptoKeyReader | false | ||
| messageCrypto | MessageCrypto | false | ||
| encryptionKeys | Set | true | [] | |
| compressionType | Message data compression type used by a producer. Available options: * LZ4 * ZLIB * ZSTD * SNAPPY | CompressionType | true | NONE | 
| initialSequenceId | Long | true | ||
| autoUpdatePartitions | boolean | true | true | |
| autoUpdatePartitionsIntervalSeconds | long | true | 60 | |
| multiSchema | boolean | true | true | |
| accessMode | ProducerAccessMode | true | Shared | |
| lazyStartPartitionedProducers | boolean | true | false | |
| properties | SortedMap | true | {} | |
| initialSubscriptionName | Use this configuration to automatically create an initial subscription when creating a topic. If this field is not set, the initial subscription is not created. | String | true |