Recipient List EIP
The recipients will receive a copy of the same Exchange, and Camel will execute them sequentially.
See the cacheSize option for more details on how much cache to use depending on how many or few unique endpoints are used.
|
Options
The Recipient List EIP supports 15 options which are listed below:
Name | Description | Default | Type |
---|---|---|---|
delimiter |
Delimiter used if the Expression returned multiple endpoints. Can be turned off using the value false. The default value is , |
, |
String |
parallelProcessing |
If enabled then sending messages to the recipients occurs concurrently. Note the caller thread will still wait until all messages has been fully processed, before it continues. Its only the sending and processing the replies from the recipients which happens concurrently. |
false |
Boolean |
strategyRef |
Sets a reference to the AggregationStrategy to be used to assemble the replies from the recipients, into a single outgoing message from the RecipientList. By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy |
String |
|
strategyMethodName |
This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. |
String |
|
strategyMethodAllowNull |
If this option is false then the aggregate method is not used if there was no data to enrich. If this option is true then null values is used as the oldExchange (when no data to enrich), when using POJOs as the AggregationStrategy |
false |
Boolean |
executorServiceRef |
Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatic implied, and you do not have to enable that option as well. |
String |
|
stopOnException |
Will now stop further processing if an exception or failure occurred during processing of an org.apache.camel.Exchange and the caused exception will be thrown. Will also stop if processing the exchange failed (has a fault message) or an exception was thrown and handled by the error handler (such as using onException). In all situations the recipient list will stop further processing. This is the same behavior as in pipeline, which is used by the routing engine. The default behavior is to not stop but continue processing till the end |
false |
Boolean |
ignoreInvalidEndpoints |
Ignore the invalidate endpoint exception when try to create a producer with that endpoint |
false |
Boolean |
streaming |
If enabled then Camel will process replies out-of-order, eg in the order they come back. If disabled, Camel will process replies in the same order as defined by the recipient list. |
false |
Boolean |
timeout |
Sets a total timeout specified in millis, when using parallel processing. If the Recipient List hasn’t been able to send and process all replies within the given timeframe, then the timeout triggers and the Recipient List breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. |
0 |
Long |
onPrepareRef |
Uses the Processor when preparing the org.apache.camel.Exchange to be send. This can be used to deep-clone messages that should be send, or any custom logic needed before the exchange is send. |
String |
|
shareUnitOfWork |
Shares the org.apache.camel.spi.UnitOfWork with the parent and each of the sub messages. Recipient List will by default not share unit of work between the parent exchange and each recipient exchange. This means each sub exchange has its own individual unit of work. |
false |
Boolean |
cacheSize |
Sets the maximum size used by the org.apache.camel.spi.ProducerCache which is used to cache and reuse producers when using this recipient list, when uris are reused. Beware that when using dynamic endpoints then it affects how well the cache can be utilized. If each dynamic endpoint is unique then its best to turn of caching by setting this to -1, which allows Camel to not cache both the producers and endpoints; they are regarded as prototype scoped and will be stopped and discarded after use. This reduces memory usage as otherwise producers/endpoints are stored in memory in the caches. However if there are a high degree of dynamic endpoints that have been used before, then it can benefit to use the cache to reuse both producers and endpoints and therefore the cache size can be set accordingly or rely on the default size (1000). If there is a mix of unique and used before dynamic endpoints, then setting a reasonable cache size can help reduce memory usage to avoid storing too many non frequent used producers. |
Integer |
|
parallelAggregate |
If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe. |
false |
Boolean |
stopOnAggregateException |
If enabled, unwind exceptions occurring at aggregation time to the error handler when parallelProcessing is used. Currently, aggregation time exceptions do not stop the route processing when parallelProcessing is used. Enabling this option allows to work around this behavior. The default value is false for the sake of backward compatibility. |
false |
Boolean |
You can use the RecipientList Annotation on a POJO to create a Dynamic Recipient List. For more details see the Bean Integration. |
Static Recipient List
The following example shows how to route a request from an input queue:a endpoint to a static list of destinations
from("jms:queue:a")
.recipientList("direct:b,direct:c,direct:d");
And in XML:
<camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="jms:queue:a"/>
<recipientList>
<constant>direct:b,direct:c,direct:d</constant>
</recipientList>
</route>
</camelContext>
Dynamic Recipient List
Usually one of the main reasons for using the Recipient List pattern is that the list of recipients is dynamic and calculated at runtime. The following example demonstrates how to create a dynamic recipient list using an Expression (which in this case extracts a named header value dynamically) to calculate the list of endpoints which are either of type Endpoint or are converted to a String and then resolved using the endpoint URIs (separated by comma).
from("jms:queue:a")
.recipientList(header("foo"));
Iteratable value
The dynamic list of recipients that are defined in the header must be iterable such as:
-
java.util.Collection
-
java.util.Iterator
-
arrays
-
org.w3c.dom.NodeList
-
a single String with values separated by comma (the delimiter configured)
-
any other type will be regarded as a single value
Using delimiter in Spring XML
In Spring DSL you can set the delimiter attribute for setting a delimiter to be used if the header value is a single String with multiple separated endpoints. By default Camel uses comma as delimiter, but this option lets you specify a custom delimiter to use instead.
<route>
<from uri="direct:a"/>
<!-- use semi-colon as a delimiter for String based values -->
<recipientList delimiter=";">
<header>myHeader</header>
</recipientList>
</route>
So if myHeader contains a String
with the value "activemq:queue:foo;activemq:topic:hello ; log:bar"
then Camel will split the String
using the delimiter given in the XML that was comma, resulting into 3 endpoints to send to.
You can use spaces between the endpoints as Camel will trim the value when it lookup the endpoint to send to.
And in Java:
from("direct:a")
.recipientList(header("myHeader"), ";");
Sending to multiple recipients in parallel
The Recipient List now supports parallelProcessing
that for example Splitter also supports.
You can use it to use a thread pool to have concurrent tasks sending the Exchange to multiple recipients concurrently.
from("direct:a")
.recipientList(header("myHeader")).parallelProcessing();
And in XML it is an attribute on the recipient list tag.
<route>
<from uri="direct:a"/>
<recipientList parallelProcessing="true">
<header>myHeader</header>
</recipientList>
</route>
Using custom thread pool
A thread pool is only used for parallelProcessing
. You supply your own custom thread pool via the ExecutorServiceStrategy
(see Camel’s Threading Model),
the same way you would do it for the aggregationStrategy
. By default Camel uses a thread pool with 10 threads (subject to change in future versions).
Stop continuing in case one recipient failed
The Recipient List now supports stopOnException
that for example Splitter also supports.
You can use it to stop sending to any further recipients in case any recipient failed.
from("direct:a")
.recipientList(header("myHeader")).stopOnException();
And in XML its an attribute on the recipient list tag.
<route>
<from uri="direct:a"/>
<recipientList stopOnException="true">
<header>myHeader</header>
</recipientList>
</route>
You can combine parallelProcessing and stopOnException and have them both true. |
Ignore invalid endpoints
The Recipient List now supports ignoreInvalidEndpoints
(like the Routing Slip).
You can use it to skip endpoints which are invalid.
from("direct:a")
.recipientList(header("myHeader")).ignoreInvalidEndpoints();
And in XML it is an attribute on the recipient list tag.
<route>
<from uri="direct:a"/>
<recipientList ignoreInvalidEndpoints="true">
<header>myHeader</header>
</recipientList>
</route>
Then let us say the myHeader
contains the following two endpoints direct:foo,xxx:bar
.
The first endpoint is valid and works. However the second one is invalid and will just be ignored.
Camel logs at DEBUG level about it, so you can see why the endpoint was invalid.
Using custom AggregationStrategy
You can now use your own AggregationStrategy
with the Recipient List. However this is rarely needed.
What it is good for is that in case you are using Request Reply messaging then the replies from the recipients can be aggregated.
By default Camel uses UseLatestAggregationStrategy
which just keeps that last received reply. If you must remember all the bodies that all the recipients sent back,
then you can use your own custom aggregator that keeps those. It is the same principle as with the Aggregator EIP so check it out for details.
from("direct:a")
.recipientList(header("myHeader")).aggregationStrategy(new MyOwnAggregationStrategy())
.to("direct:b");
And in XML it is again an attribute on the recipient list tag.
<route>
<from uri="direct:a"/>
<recipientList strategyRef="myStrategy">
<header>myHeader</header>
</recipientList>
<to uri="direct:b"/>
</route>
<!-- bean with the custom aggregation strategy -->
<bean id="myStrategy" class="com.mycompany.MyOwnAggregationStrategy"/>
The Multicast, Recipient List, and Splitter EIPs have special support for using |
Knowing which endpoint when using custom AggregationStrategy
When using a custom AggregationStrategy
then the aggregate
method is always invoked in sequential order
(also if parallel processing is enabled) of the endpoints the Recipient List is using.
However, Exchange has a property stored (key is Exchange.RECIPIENT_LIST_ENDPOINT
with the uri of the Endpoint.
So you know which endpoint you are aggregating from. The code block shows how to access this property in your Aggregator.
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
String uri = newExchange.getProperty(Exchange.RECIPIENT_LIST_ENDPOINT, String.class);
}
Using method call as recipient list
You can use a Bean to provide the recipients, for example:
from("activemq:queue:test")
.recipientList().method(MessageRouter.class, "routeTo");
And then MessageRouter
bean:
public class MessageRouter {
public String routeTo() {
String queueName = "activemq:queue:test2";
return queueName;
}
}
When you use a Bean then do not use the @RecipientList annotation as this will in fact add yet another recipient list, so you end up having two. Do not do the following:
|
public class MessageRouter {
// do not use recipientList in the Camel route calling a bean with the @RecipientList annotation!
@RecipientList
public String routeTo() {
String queueName = "activemq:queue:test2";
return queueName;
}
}
You should only use the snippet above (using @RecipientList
) if you just route to a Bean which you then want to act as a recipient list.
So the original route can be changed to:
from("activemq:queue:test").bean(MessageRouter.class, "routeTo");
Which then would invoke the routeTo method and detect that it is annotated with @RecipientList
and then act accordingly as if it was a recipient list EIP.
Using timeout
If you use parallelProcessing
then you can configure a total timeout
value in millis. Camel will then process the messages in parallel until the timeout is hit. This allows you to continue processing if one message consumer is slow. For example you can set a timeout value of 20 sec.
Tasks may keep running
If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care. We may be able to improve this functionality in future Camel releases.
|
For example in the unit test below you can see that we multicast the message to 3 destinations. We have a timeout of 2 seconds, which means only the last two messages can be completed within the timeframe. This means we will only aggregate the last two which yields a result aggregation which outputs "BC".
from("direct:start")
.multicast(new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
return oldExchange;
}
})
.parallelProcessing().timeout(250).to("direct:a", "direct:b", "direct:c")
// use end to indicate end of multicast route
.end()
.to("mock:result");
from("direct:a").delay(1000).to("mock:A").setBody(constant("A"));
from("direct:b").to("mock:B").setBody(constant("B"));
from("direct:c").to("mock:C").setBody(constant("C"));
=== Timeout in other EIPs |
This timeout feature is also supported by Splitter and both multicast and recipientList. ===
By default if a timeout occurs the AggregationStrategy
is not invoked. However you can implement the timeout
method:
This allows you to deal with the timeout in the AggregationStrategy
if you really need to.
=== Timeout is total |
The timeout is total, which means that after X time, Camel will aggregate the messages which have completed within the timeframe.
The remainders will be cancelled. Camel will also only invoke the timeout
method in the TimeoutAwareAggregationStrategy
once, for the first index which caused the timeout.
===
Using ExchangePattern in recipients
The recipient list will by default use the current Exchange Pattern. Though one can imagine use-cases where one wants to send
a message to a recipient using a different exchange pattern. For example you may have a route that initiates as an InOnly
route,
but want to use InOut
exchange pattern with a recipient list. You can configure the exchange pattern directly in the
recipient endpoints.
For example in the route below we pick up new files (which will be started as InOnly
) and then route to a recipient list.
As we want to use InOut
with the ActiveMQ (JMS) endpoint we can now specify this using the exchangePattern=InOut
option.
Then the response from the JMS request/reply will then be continued routed, and thus the response is what will be stored
in as a file in the outbox directory.
from("file:inbox")
// the exchange pattern is InOnly initially when using a file route
.recipientList().constant("activemq:queue:inbox?exchangePattern=InOut")
.to("file:outbox");
The recipient list will not alter the original exchange pattern. So in the example above the exchange pattern will still
be See more details at Request Reply and Event Message EIPs. |