Skipping messages
Sometimes you receive a message and don’t want to produce an output
message. To handle this, you have several choices:
- for method processing single message or payload, producing
null
would produce an ignored message (not forwarded)
- for method processing streams, you can generate an empty stream.
Skipping a single item
To skip a single message or payload, return null
:
| // Skip when processing payload synchronously - returning `null`
@Incoming("in")
@Outgoing("out")
public String processPayload(String s) {
if (s.equalsIgnoreCase("skip")) {
return null;
}
return s.toUpperCase();
}
// Skip when processing message synchronously - returning `null`
@Incoming("in")
@Outgoing("out")
public Message<String> processMessage(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
m.ack();
return null;
}
return m.withPayload(s.toUpperCase());
}
// Skip when processing payload asynchronously - returning a `Uni` with a `null` value
@Incoming("in")
@Outgoing("out")
public Uni<String> processPayloadAsync(String s) {
if (s.equalsIgnoreCase("skip")) {
// Important, you must not return `null`, but a `null` content
return Uni.createFrom().nullItem();
}
return Uni.createFrom().item(s.toUpperCase());
}
// Skip when processing message asynchronously - returning a `Uni` with a `null` value
@Incoming("in")
@Outgoing("out")
public Uni<Message<String>> processMessageAsync(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
m.ack();
return Uni.createFrom().nullItem();
}
return Uni.createFrom().item(m.withPayload(s.toUpperCase()));
}
|
Skipping in a stream
To skip a message or payload when manipulating a stream, emit an empty Multi
(or Publisher
):
| @Incoming("in")
@Outgoing("out-1")
public Multi<String> processPayload(String s) {
if (s.equalsIgnoreCase("skip")) {
return Multi.createFrom().empty();
}
return Multi.createFrom().item(s.toUpperCase());
}
@Incoming("in")
@Outgoing("out-2")
public Multi<Message<String>> processMessage(Message<String> m) {
String s = m.getPayload();
if (s.equalsIgnoreCase("skip")) {
return Multi.createFrom().empty();
}
return Multi.createFrom().item(m.withPayload(s.toUpperCase()));
}
@Incoming("in")
@Outgoing("out-3")
public Multi<String> processPayloadStream(Multi<String> stream) {
return stream
.select().where(s -> !s.equalsIgnoreCase("skip"))
.onItem().transform(String::toUpperCase);
}
@Incoming("in")
@Outgoing("out-4")
public Multi<Message<String>> processMessageStream(Multi<Message<String>> stream) {
return stream
.select().where(m -> !m.getPayload().equalsIgnoreCase("skip"))
.onItem().transform(m -> m.withPayload(m.getPayload().toUpperCase()));
}
|