Multicast EIP
The Multicast EIP allows to route the same message to a number of endpoints and process them in a different way. The main difference between the Multicast and Splitter is that Splitter will split the message into several pieces but the Multicast will not modify the request message.
Options
The Multicast EIP supports 12 options which are listed below:
Name | Description | Default | Type |
---|---|---|---|
parallelProcessing |
If enabled then sending messages to the multicasts occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the multicasts which happens concurrently. |
false |
Boolean |
strategyRef |
Refers to an AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy |
String |
|
strategyMethodName |
This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |
String |
|
strategyMethodAllowNull |
If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy |
false |
Boolean |
executorServiceRef |
Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. |
String |
|
streaming |
If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as defined by the multicast. |
false |
Boolean |
stopOnException |
Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the multicast will stop further processing. This is the same behavior as in pipeline, which is used by the routing engine. The default behavior is to not stop but continue processing till the end |
false |
Boolean |
timeout |
Sets a total timeout specified in millis, when using parallel processing. If the Multicast hasn’t been able to send and process all replies within the given timeframe, then the timeout triggers and the Multicast breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. |
0 |
Long |
onPrepareRef |
Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |
String |
|
shareUnitOfWork |
Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Multicast will by default not share unit of work between the parent exchange and each multicasted exchange. This means each sub exchange has its own individual unit of work. |
false |
Boolean |
parallelAggregate |
If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. |
false |
Boolean |
stopOnAggregateException |
If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. |
false |
Boolean |
Multicast example
The following example shows how to take a request from the direct:a endpoint , then multicast these request to direct:x, direct:y, direct:z.
Using the fluent builder
By default Multicast invokes each endpoint sequentially. If parallel processing is desired, simply use
from("direct:a").multicast().parallelProcessing().to("direct:x", "direct:y", "direct:z");
In case of using InOut MEP, an AggregationStrategy is used for aggregating all reply messages. The default is to only use the latest reply message and discard any earlier replies. The aggregation strategy is configurable:
from("direct:start")
.multicast(new MyAggregationStrategy())
.parallelProcessing().timeout(500).to("direct:a", "direct:b", "direct:c")
.end()
.to("mock:result");
The Multicast, Recipient List, and Splitter EIPs have special support for using |
Stop processing in case of exception
The mutlicast EIP will by default continue to process
the entire exchange even in case one of the
multicasted messages will thrown an exception during routing.
For example if you want to multicast to 3 destinations and the 2nd
destination fails by an exception. What Camel does by default is to
process the remainder destinations. You have the chance to remedy or
handle this in the AggregationStrategy
.
But sometimes you just want Camel to stop and let the exception be
propagated back, and let the Camel error handler handle it. You can do
this by specifying that it should stop in case of an
exception occurred. This is done by the stopOnException
option as
shown below:
from("direct:start")
.multicast()
.stopOnException().to("direct:foo", "direct:bar", "direct:baz")
.end()
.to("mock:result");
from("direct:foo").to("mock:foo");
from("direct:bar").process(new MyProcessor()).to("mock:bar");
from("direct:baz").to("mock:baz");
And using XML DSL you specify it as follows:
<route>
<from uri="direct:start"/>
<multicast stopOnException="true">
<to uri="direct:foo"/>
<to uri="direct:bar"/>
<to uri="direct:baz"/>
</multicast>
<to uri="mock:result"/>
</route>
<route>
<from uri="direct:foo"/>
<to uri="mock:foo"/>
</route>
<route>
<from uri="direct:bar"/>
<process ref="myProcessor"/>
<to uri="mock:bar"/>
</route>
<route>
<from uri="direct:baz"/>
<to uri="mock:baz"/>
</route>
Using onPrepare to execute custom logic when preparing messages
The multicast EIP will copy the source
exchange and multicast each copy. However the copy
is a shallow copy, so in case you have mutateable message bodies, then
any changes will be visible by the other copied messages. If you want to
use a deep clone copy then you need to use a custom onPrepare
which
allows you to do this using the processor
interface.
Notice the onPrepare
can be used for any kind of custom logic which
you would like to execute before the Exchange is
being multicasted.