[jboss-svn-commits] JBL Code SVN: r36221 - in labs/jbossrules/trunk/drools-camel/src: test/java/org/drools/camel/component and 1 other directory.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Mon Dec 6 21:46:04 EST 2010


Author: tirelli
Date: 2010-12-06 21:46:03 -0500 (Mon, 06 Dec 2010)
New Revision: 36221

Added:
   labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java
   labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java
Modified:
   labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java
   labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java
   labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java
Log:
JBRULES-2820: adding support to channels in drools-camel

Added: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java	2010-12-07 02:46:03 UTC (rev 36221)
@@ -0,0 +1,70 @@
+/**
+ * Copyright 2010 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.drools.camel.component;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+import org.drools.runtime.Channel;
+import org.drools.runtime.KnowledgeRuntime;
+
+/**
+ * A consumer that consumes objects sent into channels of a drools
+ * session
+ * 
+ * @author etirelli
+ *
+ */
+public class DroolsConsumer extends DefaultConsumer {
+
+    private DroolsEndpoint de; 
+    private KnowledgeRuntime krt;
+    private String channelId;
+
+    public DroolsConsumer(Endpoint endpoint,
+                          Processor processor ) {
+        super( endpoint, processor );
+        de = (DroolsEndpoint) endpoint;
+        krt = (KnowledgeRuntime) de.getExecutor();
+        channelId = de.getChannel();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        krt.unregisterChannel( channelId );
+        super.doStop();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        DroolsCamelChannel channel = new DroolsCamelChannel();
+        krt.registerChannel( channelId, channel );
+    }
+
+    class DroolsCamelChannel implements Channel {
+        public void send(Object pojo) {
+            Exchange exchange = de.createExchange( pojo );
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                handleException(e);
+            }
+        }
+    }
+    
+}


Property changes on: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsConsumer.java
___________________________________________________________________
Name: svn:executable
   + *

Modified: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java	2010-12-06 15:33:27 UTC (rev 36220)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsEndpoint.java	2010-12-07 02:46:03 UTC (rev 36221)
@@ -36,10 +36,13 @@
 import java.util.regex.Pattern;
 
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultMessage;
 import org.apache.camel.spi.DataFormat;
 import org.drools.command.impl.CommandBasedStatefulKnowledgeSession;
 import org.drools.core.util.StringUtils;
@@ -111,7 +114,8 @@
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new RuntimeCamelException( "Drools consumers not supported." );
+        return new DroolsConsumer( this, 
+                                   processor );
     }
 
     public Producer createProducer() throws Exception {
@@ -282,5 +286,15 @@
     public void setChannel(String channel) {
         this.channel = channel;
     }
-
+    
+    public Exchange createExchange( Object pojo ) {
+        DefaultMessage msg = new DefaultMessage();
+        msg.setBody( pojo );
+        DefaultExchange exchange = new DefaultExchange(this, getExchangePattern());
+        // DO WE NEED TO SET THE BINDING PROPERTY?
+        //exchange.setProperty(Exchange.BINDING, getBinding());
+        exchange.setIn( msg );
+        return exchange;
+    }    
+    
 }
\ No newline at end of file

Modified: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java	2010-12-06 15:33:27 UTC (rev 36220)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsExecuteProducer.java	2010-12-07 02:46:03 UTC (rev 36221)
@@ -57,14 +57,14 @@
 
     public void process(Exchange exchange) throws Exception {
 
-        Command cmd = exchange.getIn().getBody( Command.class );
+        Command<?> cmd = exchange.getIn().getBody( Command.class );
 
         if ( cmd == null ) {
             throw new RuntimeCamelException( "Body of in message not of the expected type 'org.drools.command.Command' for uri" + de.getEndpointUri() );
         }
 
         if ( !(cmd instanceof BatchExecutionCommandImpl) ) {
-            cmd = new BatchExecutionCommandImpl( Arrays.asList( new GenericCommand< ? >[]{(GenericCommand) cmd} ) );
+            cmd = new BatchExecutionCommandImpl( Arrays.asList( new GenericCommand< ? >[]{(GenericCommand<?>) cmd} ) );
         }
 
         CommandExecutor exec;

Modified: labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java	2010-12-06 15:33:27 UTC (rev 36220)
+++ labs/jbossrules/trunk/drools-camel/src/main/java/org/drools/camel/component/DroolsPolicy.java	2010-12-07 02:46:03 UTC (rev 36221)
@@ -69,13 +69,12 @@
     implements
     Policy {
     private static boolean augmented;
-    private DroolsEndpoint dep;
 
     public void beforeWrap(RouteContext routeContext,
-                           ProcessorDefinition processorDefinition) {
+                           ProcessorDefinition<?> processorDefinition) {
         augmentNodes( routeContext,
                       processorDefinition,
-                      new HashSet() );
+                      new HashSet<Object>() );
     }
 
     public Processor wrap(RouteContext routeContext,
@@ -96,7 +95,7 @@
 
     private ToDefinition getDroolsNode(RouteDefinition routeDef) {
         ToDefinition toDrools = null;
-        for ( ProcessorDefinition child : routeDef.getOutputs() ) {
+        for ( ProcessorDefinition<?> child : routeDef.getOutputs() ) {
             toDrools = getDroolsNode( child );
             if ( toDrools != null ) {
                 break;
@@ -106,7 +105,7 @@
     }
 
     public static void augmentNodes(RouteContext routeContext,
-                                    ProcessorDefinition nav,
+                                    ProcessorDefinition<?> nav,
                                     Set visited) {
         if ( !nav.getOutputs().isEmpty() ) {
 

Added: labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java
===================================================================
--- labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java	2010-12-07 02:46:03 UTC (rev 36221)
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2010 JBoss Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ * 
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  under the License.
+ */
+
+package org.drools.camel.component;
+
+import javax.naming.Context;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.drools.pipeline.camel.Person;
+import org.drools.runtime.StatefulKnowledgeSession;
+
+public class DroolsEndpointChannelTest extends DroolsCamelTestSupport {
+    private StatefulKnowledgeSession ksession;
+
+    public void testChannelSupport() throws Exception {
+        Person bob1 = new Person( "bob" );
+        Person bob2 = new Person( "bob" );
+        Person bob3 = new Person( "bob" );
+        Person mark1 = new Person( "mark" );
+        
+        MockEndpoint bobs = getMockEndpoint("mock:bobs");
+        bobs.expectedMessageCount(3);
+        bobs.expectedBodiesReceived( bob1, bob2, bob3 );
+        
+        MockEndpoint marks = getMockEndpoint("mock:marks");
+        marks.expectedMessageCount(1);
+        marks.expectedBodiesReceived( mark1 );
+
+        ksession.insert( bob1 );
+        ksession.insert( mark1 );
+        ksession.fireAllRules();
+
+        ksession.insert( bob2 );
+        ksession.fireAllRules();
+
+        ksession.insert( bob3 );
+        ksession.fireAllRules();
+
+        bobs.assertIsSatisfied();
+        marks.assertIsSatisfied();
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from( "drools://node/ksession1?channel=bobs" ).to( "mock:bobs" );
+                from( "drools://node/ksession1?channel=marks" ).to( "mock:marks" );
+            }
+        };
+    }
+
+    @Override
+    protected void configureDroolsContext(Context jndiContext) {
+        String rule = "";
+        rule += "package org.drools.pipeline.camel \n";
+        rule += "rule rule1 \n";
+        rule += "  when \n";
+        rule += "    $p : Person( name == 'bob' ) \n";
+        rule += "  then \n";
+        rule += "    channels[\"bobs\"].send( $p ); \n";
+        rule += "end\n";
+        rule += "rule rule2 \n";
+        rule += "  when \n";
+        rule += "    $p : Person( name == 'mark' ) \n";
+        rule += "  then \n";
+        rule += "    channels[\"marks\"].send( $p ); \n";
+        rule += "end\n";
+
+        ksession = registerKnowledgeRuntime( "ksession1",
+                                             rule );
+    }
+}


Property changes on: labs/jbossrules/trunk/drools-camel/src/test/java/org/drools/camel/component/DroolsEndpointChannelTest.java
___________________________________________________________________
Name: svn:executable
   + *



More information about the jboss-svn-commits mailing list