[jboss-svn-commits] JBL Code SVN: r36221 - in labs/jbossrules/trunk/drools-camel/src: test/java/org/drools/camel/component and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Mon Dec 6 21:46:04 EST 2010
Author: tirelli
Date: 2010-12-06 21:46:03 -0500 (Mon, 06 Dec 2010)
New Revision: 36221
Added:
labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java
labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java
Modified:
labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java
labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java
labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java
Log:
JBRULES-2820: adding support to channels in drools-camel
Added: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java (rev 0)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java 2010-12-07 02:46:03 UTC (rev 36221)
@@ -0,0 +1,70 @@
+/**
+ * Copyright 2010 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.camel.component;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.drools.runtime.Channel;
+import org.drools.runtime.KnowledgeRuntime;
+
+/**
+ * A consumer that consumes objects sent into channels of a drools
+ * session
+ *
+ * @author etirelli
+ *
+ */
+public class DroolsConsumer extends DefaultConsumer {
+
+ private DroolsEndpoint de;
+ private KnowledgeRuntime krt;
+ private String channelId;
+
+ public DroolsConsumer(Endpoint endpoint,
+ Processor processor ) {
+ super( endpoint, processor );
+ de = (DroolsEndpoint) endpoint;
+ krt = (KnowledgeRuntime) de.getExecutor();
+ channelId = de.getChannel();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ krt.unregisterChannel( channelId );
+ super.doStop();
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ DroolsCamelChannel channel = new DroolsCamelChannel();
+ krt.registerChannel( channelId, channel );
+ }
+
+ class DroolsCamelChannel implements Channel {
+ public void send(Object pojo) {
+ Exchange exchange = de.createExchange( pojo );
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ }
+
+}
Property changes on: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java
___________________________________________________________________
Name: svn:executable
+ *
Modified: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java 2010-12-06 15:33:27 UTC (rev 36220)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java 2010-12-07 02:46:03 UTC (rev 36221)
@@ -36,10 +36,13 @@
import java.util.regex.Pattern;
import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.spi.DataFormat;
import org.drools.command.impl.CommandBasedStatefulKnowledgeSession;
import org.drools.core.util.StringUtils;
@@ -111,7 +114,8 @@
}
public Consumer createConsumer(Processor processor) throws Exception {
- throw new RuntimeCamelException( "Drools consumers not supported." );
+ return new DroolsConsumer( this,
+ processor );
}
public Producer createProducer() throws Exception {
@@ -282,5 +286,15 @@
public void setChannel(String channel) {
this.channel = channel;
}
-
+
+ public Exchange createExchange( Object pojo ) {
+ DefaultMessage msg = new DefaultMessage();
+ msg.setBody( pojo );
+ DefaultExchange exchange = new DefaultExchange(this, getExchangePattern());
+ // DO WE NEED TO SET THE BINDING PROPERTY?
+ //exchange.setProperty(Exchange.BINDING, getBinding());
+ exchange.setIn( msg );
+ return exchange;
+ }
+
}
\ No newline at end of file
Modified: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java 2010-12-06 15:33:27 UTC (rev 36220)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java 2010-12-07 02:46:03 UTC (rev 36221)
@@ -57,14 +57,14 @@
public void process(Exchange exchange) throws Exception {
- Command cmd = exchange.getIn().getBody( Command.class );
+ Command<?> cmd = exchange.getIn().getBody( Command.class );
if ( cmd == null ) {
throw new RuntimeCamelException( "Body of in message not of the expected type 'org.drools.command.Command' for uri" + de.getEndpointUri() );
}
if ( !(cmd instanceof BatchExecutionCommandImpl) ) {
- cmd = new BatchExecutionCommandImpl( Arrays.asList( new GenericCommand< ? >[]{(GenericCommand) cmd} ) );
+ cmd = new BatchExecutionCommandImpl( Arrays.asList( new GenericCommand< ? >[]{(GenericCommand<?>) cmd} ) );
}
CommandExecutor exec;
Modified: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java 2010-12-06 15:33:27 UTC (rev 36220)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java 2010-12-07 02:46:03 UTC (rev 36221)
@@ -69,13 +69,12 @@
implements
Policy {
private static boolean augmented;
- private DroolsEndpoint dep;
public void beforeWrap(RouteContext routeContext,
- ProcessorDefinition processorDefinition) {
+ ProcessorDefinition<?> processorDefinition) {
augmentNodes( routeContext,
processorDefinition,
- new HashSet() );
+ new HashSet<Object>() );
}
public Processor wrap(RouteContext routeContext,
@@ -96,7 +95,7 @@
private ToDefinition getDroolsNode(RouteDefinition routeDef) {
ToDefinition toDrools = null;
- for ( ProcessorDefinition child : routeDef.getOutputs() ) {
+ for ( ProcessorDefinition<?> child : routeDef.getOutputs() ) {
toDrools = getDroolsNode( child );
if ( toDrools != null ) {
break;
@@ -106,7 +105,7 @@
}
public static void augmentNodes(RouteContext routeContext,
- ProcessorDefinition nav,
+ ProcessorDefinition<?> nav,
Set visited) {
if ( !nav.getOutputs().isEmpty() ) {
Added: labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java (rev 0)
+++ labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java 2010-12-07 02:46:03 UTC (rev 36221)
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2010 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.camel.component;
+
+import javax.naming.Context;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.drools.pipeline.camel.Person;
+import org.drools.runtime.StatefulKnowledgeSession;
+
+public class DroolsEndpointChannelTest extends DroolsCamelTestSupport {
+ private StatefulKnowledgeSession ksession;
+
+ public void testChannelSupport() throws Exception {
+ Person bob1 = new Person( "bob" );
+ Person bob2 = new Person( "bob" );
+ Person bob3 = new Person( "bob" );
+ Person mark1 = new Person( "mark" );
+
+ MockEndpoint bobs = getMockEndpoint("mock:bobs");
+ bobs.expectedMessageCount(3);
+ bobs.expectedBodiesReceived( bob1, bob2, bob3 );
+
+ MockEndpoint marks = getMockEndpoint("mock:marks");
+ marks.expectedMessageCount(1);
+ marks.expectedBodiesReceived( mark1 );
+
+ ksession.insert( bob1 );
+ ksession.insert( mark1 );
+ ksession.fireAllRules();
+
+ ksession.insert( bob2 );
+ ksession.fireAllRules();
+
+ ksession.insert( bob3 );
+ ksession.fireAllRules();
+
+ bobs.assertIsSatisfied();
+ marks.assertIsSatisfied();
+ }
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from( "drools://node/ksession1?channel=bobs" ).to( "mock:bobs" );
+ from( "drools://node/ksession1?channel=marks" ).to( "mock:marks" );
+ }
+ };
+ }
+
+ @Override
+ protected void configureDroolsContext(Context jndiContext) {
+ String rule = "";
+ rule += "package org.drools.pipeline.camel \n";
+ rule += "rule rule1 \n";
+ rule += " when \n";
+ rule += " $p : Person( name == 'bob' ) \n";
+ rule += " then \n";
+ rule += " channels[\"bobs\"].send( $p ); \n";
+ rule += "end\n";
+ rule += "rule rule2 \n";
+ rule += " when \n";
+ rule += " $p : Person( name == 'mark' ) \n";
+ rule += " then \n";
+ rule += " channels[\"marks\"].send( $p ); \n";
+ rule += "end\n";
+
+ ksession = registerKnowledgeRuntime( "ksession1",
+ rule );
+ }
+}
Property changes on: labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java
___________________________________________________________________
Name: svn:executable
+ *
More information about the jboss-svn-commits
mailing list