Split模式
split是EIP模式的一种,用于把把消息分解为若干个消息,分别进行处理。
Split Java Object
可以被split的消息
- Collection
- Iterator
- Array
- String
该方法定义在org.apache.camel.util.ObjectHelper类
public static Iterable<Object> createIterable(Object value, String delimiter,
final boolean allowEmptyValues, final boolean pattern) {
// if its a message than we want to iterate its body
if (value instanceof Message) {
value = ((Message) value).getBody();
}
if (value == null) {
return Collections.emptyList();
} else if (value instanceof Iterator) {
final Iterator<Object> iterator = (Iterator<Object>)value;
return new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return iterator;
}
};
} else if (value instanceof Iterable) {
return (Iterable<Object>)value;
} else if (value.getClass().isArray()) {
if (isPrimitiveArrayType(value.getClass())) {
final Object array = value;
return new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return new Iterator<Object>() {
private int idx;
public boolean hasNext() {
return idx < Array.getLength(array);
}
public Object next() {
if (!hasNext()) {
throw new NoSuchElementException("no more element available for '" + array + "' at the index " + idx);
}
return Array.get(array, idx++);
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
} else {
return Arrays.asList((Object[]) value);
}
} else if (value instanceof NodeList) {
// lets iterate through DOM results after performing XPaths
final NodeList nodeList = (NodeList) value;
return new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return new Iterator<Object>() {
private int idx;
public boolean hasNext() {
return idx < nodeList.getLength();
}
public Object next() {
if (!hasNext()) {
throw new NoSuchElementException("no more element available for '" + nodeList + "' at the index " + idx);
}
return nodeList.item(idx++);
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
} else if (value instanceof String) {
final String s = (String) value;
// this code is optimized to only use a Scanner if needed, eg there is a delimiter
if (delimiter != null && (pattern || s.contains(delimiter))) {
// use a scanner if it contains the delimiter or is a pattern
final Scanner scanner = new Scanner((String)value);
if (DEFAULT_DELIMITER.equals(delimiter)) {
// we use the default delimiter which is a comma, then cater for bean expressions with OGNL
// which may have balanced parentheses pairs as well.
// if the value contains parentheses we need to balance those, to avoid iterating
// in the middle of parentheses pair, so use this regular expression (a bit hard to read)
// the regexp will split by comma, but honor parentheses pair that may include commas
// as well, eg if value = "bean=foo?method=killer(a,b),bean=bar?method=great(a,b)"
// then the regexp will split that into two:
// -> bean=foo?method=killer(a,b)
// -> bean=bar?method=great(a,b)
// http://stackoverflow.com/questions/1516090/splitting-a-title-into-separate-parts
delimiter = ",(?!(?:[^\\(,]|[^\\)],[^\\)])+\\))";
}
scanner.useDelimiter(delimiter);
return new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
return CastUtils.cast(scanner);
}
};
} else {
return new Iterable<Object>() {
@Override
public Iterator<Object> iterator() {
// use a plain iterator that returns the value as is as there are only a single value
return new Iterator<Object>() {
private int idx;
public boolean hasNext() {
return idx == 0 && (allowEmptyValues || ObjectHelper.isNotEmpty(s));
}
public Object next() {
if (!hasNext()) {
throw new NoSuchElementException("no more element available for '" + s + "' at the index " + idx);
}
idx++;
return s;
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
} else {
return Collections.singletonList(value);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
Spring XML
<simple>
标签是一个表达式标签,告诉split使用什么表达式来提取消息,进行拆分
表达式有多种语言表示,参考:http://camel.apache.org/languages.html
<split>
<simple>${body}</simple>
<to uri="mock:result"/>
</split>
<split>
<simple>${header.foo}</simple>
<to uri="mock:result"/>
</split>
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
stream流模式
对于较大的文件或者消息可以使用流模式处理,减少内存开销 例如有这样需要处理的xml消息
<orders>
<order> <!-- order stuff here --> </order>
<order> <!-- order stuff here --> </order> ...
<order> <!-- order stuff here --> </order>
</orders>
1
2
3
4
5
2
3
4
5
如果使用xpath处理会导致整个文件被加载入内存,可以使用Tokenizer方式处理
<route>
<from uri="file:inbox"/>
<split streaming="true">
<tokenize token="order" xml="true"/>
<to uri="activemq:queue:order"/>
</split>
</route>
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
tokenizeXML方法会把使用order tag标签把
<order> <!-- order stuff here --> </order>
1
拆分消息放入队列
<route>
<from uri="file:inbox"/>
<split streaming="true">
<tokenize token="order" xml="true"/>
<to uri="activemq:queue:order"/>
</split>
</route>
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
更详细的内容参考:http://camel.apache.org/splitter.html
Expression表达式提取数据
提取数据
Object value = expression.evaluate(exchange, Object.class)
1
创建迭代器
if (isStreaming()) {
answer = createProcessorExchangePairsIterable(exchange, value);
} else {
answer = createProcessorExchangePairsList(exchange, value);
}
1
2
3
4
5
2
3
4
5
createProcessorExchangePairsIterable返回一个SplitterIterable对象,这个对象的构造器会调用上面提到的 ObjectHelper.createIterator
@Override
protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
Object value = expression.evaluate(exchange, Object.class);
if (exchange.getException() != null) {
// force any exceptions occurred during evaluation to be thrown
throw exchange.getException();
}
Iterable<ProcessorExchangePair> answer;
if (isStreaming()) {
answer = createProcessorExchangePairsIterable(exchange, value);
} else {
answer = createProcessorExchangePairsList(exchange, value);
}
if (exchange.getException() != null) {
// force any exceptions occurred during creation of exchange paris to be thrown
// before returning the answer;
throw exchange.getException();
}
return answer;
}
private final class SplitterIterable implements Iterable<ProcessorExchangePair>, Closeable {
.....
private SplitterIterable(Exchange exchange, Object value) {
this.original = exchange;
this.value = value;
this.iterator = ObjectHelper.createIterator(value);
this.copy = copyExchangeNoAttachments(exchange, true);
this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null;
}
.....
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static Expression simpleExpression(final String expression) {
return new ExpressionAdapter() {
public Object evaluate(Exchange exchange) {
Language language = exchange.getContext().resolveLanguage("simple");
return language.createExpression(expression).evaluate(exchange, Object.class);
}
public String toString() {
return "simple(" + expression + ")";
}
};
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
split实例
下例为读取xls生成list,然后把list的每一个元素拆分单独的exchange消息
<route>
<from uri="file:src/data"/>
<log message="readfile"/>
<to uri="file:src/processed"/>
<log message="parsefile"/>
<unmarshal>
<custom ref="xlsBeansDataFormat" />
</unmarshal>
<log message="split"/>
<split>
<simple>${body.items}</simple>
<log message="${body}"/>
<to uri="mock:result"/>
</split>
</route>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Split Json数据
JsonPath
sample json
{ "store": {
"book": [
{ "category": "reference",
"author": "Nigel Rees",
"title": "Sayings of the Century",
"price": 8.95
},
{ "category": "fiction",
"author": "Evelyn Waugh",
"title": "Sword of Honour",
"price": 12.99
},
{ "category": "fiction",
"author": "Herman Melville",
"title": "Moby Dick",
"isbn": "0-553-21311-3",
"price": 8.99
},
{ "category": "fiction",
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings",
"isbn": "0-395-19395-8",
"price": 22.99
}
],
"bicycle": {
"color": "red",
"price": 19.95
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
JsonPath 举例
XPath | JSONPath | Result |
---|---|---|
/store/book/author | $.store.book[*].author | the authors of all books in the store |
//author | $..author | all authors |
/store/* | $.store.* | all things in store, which are some books and a red bicycle. |
/store//price | $.store..price | the price of everything in the store. |
//book[3] | $..book[2] | the third book |
//book[last()] | $..book[(@.length-1)] $..book[-1:] | the last book in order. |
//book[position() < 3] | $..book[0,1] $..book[:2] | the first two books |
//book[isbn] | $..book[?(@.isbn)] | filter all books with isbn number |
//book[price<10] | $..book[?(@.price<10)] | filter all books cheapier than 10 |
//* | $..* | all Elements in XML document. |
拆分举例
json数据格式
[
{
"name": "Ram",
"email": "ram@gmail.com",
"age": 23,
"hobby":[
"writing",
"riding",
"drawing"
]
},
{
"name": "Shyam",
"email": "shyam23@gmail.com",
"age": 28
}
]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Spring XML DSL 配置
<camel:camelContext id="jsonAggregate">
<camel:route>
<camel:from uri="file://src/data"/>
<camel:convertBodyTo type="java.lang.String"/>
<camel:log message="${body}"></camel:log>
<camel:to uri="direct:start"/>
</camel:route>
<camel:route>
<camel:from uri="direct:start" />
<camel:split streaming="true">
<camel:jsonpath>$.[*].email</camel:jsonpath>
<camel:log message="${body}"></camel:log>
<camel:to uri="activemq:result"></camel:to>
</camel:split>
</camel:route>
</camel:camelContext>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
说明: 第一个route用于从文件中读取json数据,转换成string 第二个route使用jsonpath作为表达式,拆分数据