Incoming Channel Concurrency
Experimental
Incoming channel concurrency
config is an experimental feature.
The concurrency
attribute for incoming channels provides a mechanism to enable concurrent non-blocking processing of incoming messages.
When applied to a channel, this attribute specifies the number of copies of that channel to be created and wired to the processing method,
allowing multiple messages to be processed concurrently.
For example, concurrency configuration for a Kafka incoming channel the configuration will look like:
In this example, there will be 4 copies of the my-channel
running concurrently, with distinctive internal channel names,
my-channel$1
, my-channel$2
, etc. but all registered with the name my-channel
to the ChannelRegistry
.
Kafka connector partitions
This is essentially very similar to the Kafka connector partitions
configuration, but addresses some its limitations.
Using partitions
config in Kafka connector, channels are merged into the downstream message processor
(method annotated with @Incoming
or an injected channel) which is therefore called sequentially.
This prevents concurrently processing messages from multiple partitions.
The concurrency
mechanism effectively allows polling Kafka partitions from separate clients
and concurrently processing records while preserving the in-partition order.
Copy channels inherit all configuration attributes of the main channel config.
Per-copy channel attributes can be configured separately using the $
separated channel names: mp.messaging.incoming.my-channel$1.attribute
.
For example, the following AMQP 1.0 channel defines 3 channels each with a different selector:
While the concurrency
attribute is applicable to channels of any connector type,
the channel implementation may need to take this configuration into account and adjust the threading accordingly.
Connectors based on Vert.x event loop create a new event loop context per copy-channel to dispatch messages on distinct contexts.
Non-blocking processing
Note that while this allows concurrent processing, messages are still dispatched on Vert.x event loop threads, and should not be blocked.
Otherwise, connectors treat copy channels as independent channels. For example, health check reports are registered separately for each copy-channel.