[jboss-svn-commits] JBL Code SVN: r35888 - in labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src: main/java/org/drools/grid/remote and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Fri Nov 5 20:02:24 EDT 2010
Author: salaboy21
Date: 2010-11-05 20:02:23 -0400 (Fri, 05 Nov 2010)
New Revision: 35888
Modified:
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java
Log:
JBRULES-2772: Drools Grid Impl2 remoting features (socket)
- NodeTests, now insert and firingAllRules
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java 2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java 2010-11-06 00:02:23 UTC (rev 35888)
@@ -72,8 +72,7 @@
localSessionContext.set( "kresults_" + cmd.getName(),
localKresults );
- //for ( GenericCommand cmd : commands ) {
- // evaluate the commands
+
Object result = command.execute( localSessionContext );
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java 2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java 2010-11-06 00:02:23 UTC (rev 35888)
@@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.UUID;
import org.drools.KnowledgeBase;
+import org.drools.command.KnowledgeBaseAddKnowledgePackagesCommand;
import org.drools.command.KnowledgeContextResolveFromContextCommand;
import org.drools.command.NewStatefulKnowledgeSessionCommand;
import org.drools.command.SetVariableCommand;
@@ -36,6 +37,7 @@
import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
import org.drools.grid.io.Conversation;
import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CollectionClient;
import org.drools.grid.io.impl.CommandImpl;
import org.drools.runtime.Environment;
import org.drools.runtime.KnowledgeSessionConfiguration;
@@ -59,7 +61,22 @@
}
public void addKnowledgePackages(Collection<KnowledgePackage> kpackages) {
- throw new UnsupportedOperationException("Not supported yet.");
+ String kuilderInstanceId = ((CollectionClient<KnowledgePackage>) kpackages).getParentInstanceId();
+ String kresultsId = "kresults_" + this.cm.toString();
+ String localId = UUID.randomUUID().toString();
+
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new KnowledgeBaseAddKnowledgePackagesCommand(),
+ kuilderInstanceId,
+ this.instanceId,
+ null,
+ kresultsId )}));
+
+ sendMessage(this.cm,
+ (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getServiceInterface().getName(),
+ cmd);
+
}
public Collection<KnowledgePackage> getKnowledgePackages() {
@@ -133,37 +150,7 @@
this.cm );
-// String kresultsId = "kresults_" + this.messageSession.getSessionId();
-//
-// String localId = UUID.randomUUID().toString();
-//
-// Message msg = new Message( this.messageSession.getSessionId(),
-// this.messageSession.counter.incrementAndGet(),
-// false,
-// new SetVariableCommand( "__TEMP__",
-// localId,
-// new KnowledgeContextResolveFromContextCommand( new NewStatefulKnowledgeSessionCommand( conf ),
-// null,
-// this.instanceId,
-// null,
-// kresultsId ) ) );
-//
-// try {
-// this.connector.connect();
-// Object object = this.connector.write( msg ).getPayload();
-//
-// // if (!(object instanceof FinishedCommand)) {
-// // throw new RuntimeException("Response was not correctly ended");
-// // }
-// this.connector.disconnect();
-// } catch ( Exception e ) {
-// throw new RuntimeException( "Unable to execute message",
-// e );
-// }
-//
-// return new StatefulKnowledgeSessionRemoteClient( localId,
-// this.connector,
-// this.messageSession );
+
}
public StatefulKnowledgeSession newStatefulKnowledgeSession() {
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java 2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java 2010-11-06 00:02:23 UTC (rev 35888)
@@ -78,7 +78,7 @@
this.gsd.getServiceInterface().getName(),
cmd);
- return new KnowledgeBuilderRemoteClient( localId, this.cm );
+ return new KnowledgeBuilderRemoteClient( localId,this.gsd, this.cm );
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java 2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java 2010-11-06 00:02:23 UTC (rev 35888)
@@ -17,15 +17,32 @@
package org.drools.grid.remote;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
import org.drools.KnowledgeBase;
import org.drools.builder.KnowledgeBuilder;
import org.drools.builder.KnowledgeBuilderErrors;
import org.drools.builder.ResourceConfiguration;
import org.drools.builder.ResourceType;
+import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.builder.KnowledgeBuilderAddCommand;
+import org.drools.command.builder.KnowledgeBuilderGetErrorsCommand;
+import org.drools.command.impl.GenericCommand;
+import org.drools.command.runtime.BatchExecutionCommandImpl;
import org.drools.definition.KnowledgePackage;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CollectionClient;
+import org.drools.grid.io.impl.CommandImpl;
import org.drools.io.Resource;
+import org.drools.runtime.ExecutionResults;
/**
*
@@ -33,26 +50,48 @@
*/
public class KnowledgeBuilderRemoteClient implements KnowledgeBuilder{
- private String localId;
+ private String instanceId;
private ConversationManager cm;
+ private GridServiceDescription gsd;
- public KnowledgeBuilderRemoteClient(String localId, ConversationManager cm) {
- this.localId = localId;
+ public KnowledgeBuilderRemoteClient(String localId, GridServiceDescription gsd, ConversationManager cm) {
+ this.instanceId = localId;
+ this.gsd = gsd;
this.cm = cm;
}
public void add(Resource resource, ResourceType type) {
- throw new UnsupportedOperationException("Not supported yet.");
+ add(resource, type, null);
+
}
public void add(Resource resource, ResourceType type, ResourceConfiguration configuration) {
- throw new UnsupportedOperationException("Not supported yet.");
+
+ String localId = UUID.randomUUID().toString();
+
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new KnowledgeBuilderAddCommand( resource,
+ type,
+ configuration ),
+ this.instanceId,
+ null,
+ null,
+ null )}));
+
+ sendMessage(this.cm,
+ (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getServiceInterface().getName(),
+ cmd);
+
+
+
+
}
public Collection<KnowledgePackage> getKnowledgePackages() {
- throw new UnsupportedOperationException("Not supported yet.");
+ return new CollectionClient<KnowledgePackage>( this.instanceId );
}
public KnowledgeBase newKnowledgeBase() {
@@ -64,7 +103,69 @@
}
public KnowledgeBuilderErrors getErrors() {
- throw new UnsupportedOperationException("Not supported yet.");
+ String commandId = "kbuilder.getErrors_" + this.gsd.getId();
+ String kresultsId = "kresults_" + this.gsd.getId();
+ String localId = UUID.randomUUID().toString();
+
+
+
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new KnowledgeBuilderGetErrorsCommand( ),
+ this.instanceId,
+ null,
+ null,
+ kresultsId ) }));
+
+ Object result = sendMessage(this.cm,
+ (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getServiceInterface().getName(),
+ cmd);
+
+ return (KnowledgeBuilderErrors) result;
+
+
}
+
+ public static Object sendMessage(ConversationManager conversationManager,
+ Serializable addr,
+ String id,
+ Object body) {
+ InetSocketAddress[] sockets = null;
+ if (addr instanceof InetSocketAddress[]) {
+ sockets = (InetSocketAddress[]) addr;
+ } else if (addr instanceof InetSocketAddress) {
+ sockets = new InetSocketAddress[1];
+ sockets[0] = (InetSocketAddress) addr;
+ }
+
+
+ BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
+ Exception exception = null;
+ for (InetSocketAddress socket : sockets) {
+ try {
+ Conversation conv = conversationManager.startConversation(socket,
+ id);
+ conv.sendMessage(body,
+ handler);
+ exception = null;
+ } catch (Exception e) {
+ exception = e;
+ conversationManager.endConversation();
+ }
+ if (exception == null) {
+ break;
+ }
+ }
+ if (exception != null) {
+ throw new RuntimeException("Unable to send message",
+ exception);
+ }
+ try {
+ return handler.getMessage().getBody();
+ } finally {
+ conversationManager.endConversation();
+ }
+ }
+
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java 2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java 2010-11-06 00:02:23 UTC (rev 35888)
@@ -17,15 +17,24 @@
package org.drools.grid.remote;
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import org.drools.KnowledgeBase;
import org.drools.command.Command;
+import org.drools.command.CommandFactory;
+import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.runtime.rule.InsertObjectCommand;
import org.drools.event.process.ProcessEventListener;
import org.drools.event.rule.AgendaEventListener;
import org.drools.event.rule.WorkingMemoryEventListener;
import org.drools.grid.GridServiceDescription;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CommandImpl;
import org.drools.runtime.Calendars;
import org.drools.runtime.Channel;
import org.drools.runtime.Environment;
@@ -70,7 +79,49 @@
}
public int fireAllRules() {
- throw new UnsupportedOperationException("Not supported yet.");
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newFireAllRules( ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ Object result = sendMessage(this.cm,
+ (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getServiceInterface().getName(),
+ cmd);
+
+
+
+ return (Integer) result;
+
+
+// String commandId = "ksession.fireAllRules" + this.messageSession.getNextId();
+// String kresultsId = "kresults_" + this.messageSession.getSessionId();
+//
+// Message msg = new Message( this.messageSession.getSessionId(),
+// this.messageSession.counter.incrementAndGet(),
+// false,
+// new KnowledgeContextResolveFromContextCommand( CommandFactory.newFireAllRules( commandId ),
+// null,
+// null,
+// this.instanceId,
+// kresultsId ) );
+// try {
+// this.connector.connect();
+// Object object = this.connector.write( msg ).getPayload();
+//
+// if ( object == null ) {
+// throw new RuntimeException( "Response was not correctly received" );
+// }
+// this.connector.disconnect();
+// //return (Integer) ((ExecutionResults) object).getValue(commandId);
+// return (Integer) object;
+// } catch ( Exception e ) {
+// throw new RuntimeException( "Unable to execute message",
+// e );
+// }
}
public int fireAllRules(int max) {
@@ -90,7 +141,52 @@
}
public <T> T execute(Command<T> command) {
- throw new UnsupportedOperationException("Not supported yet.");
+// String localId = UUID.randomUUID().toString();
+// String commandId = "ksession.execute" + this.gsd.getId();
+// String kresultsId = "kresults_" + this.gsd.getId();
+// CommandImpl cmd = new CommandImpl("execute",
+// Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( command ),
+// null,
+// null,
+// this.instanceId,
+// kresultsId )}));
+//
+// Object result = sendMessage(this.cm,
+// (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+// this.gsd.getServiceInterface().getName(),
+// cmd);
+//
+//
+//
+// return (T) result;
+
+// String commandId = "ksession.execute" + this.messageSession.getNextId();
+// String kresultsId = "kresults_" + this.messageSession.getSessionId();
+//
+// Message msg = new Message( this.messageSession.getSessionId(),
+// this.messageSession.counter.incrementAndGet(),
+// false,
+// new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( commandId,
+// command ),
+// null,
+// null,
+// this.instanceId,
+// kresultsId ) );
+//
+// try {
+// this.connector.connect();
+// Object object = this.connector.write( msg ).getPayload();
+// if ( object == null ) {
+// throw new RuntimeException( "Response was not correctly received" );
+// }
+// this.connector.disconnect();
+// return (ExecutionResults) ((ExecutionResults) object).getValue( commandId );
+// } catch ( Exception e ) {
+// throw new RuntimeException( "Unable to execute message",
+// e );
+// }
+ throw new UnsupportedOperationException("Not supported yet.");
+
}
public <T extends SessionClock> T getSessionClock() {
@@ -174,7 +270,24 @@
}
public FactHandle insert(Object object) {
- throw new UnsupportedOperationException("Not supported yet.");
+
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new InsertObjectCommand( object,
+ true ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ Object result = sendMessage(this.cm,
+ (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getServiceInterface().getName(),
+ cmd);
+
+
+
+ return (FactHandle) result;
}
public void retract(FactHandle handle) {
@@ -281,4 +394,45 @@
throw new UnsupportedOperationException("Not supported yet.");
}
+ public static Object sendMessage(ConversationManager conversationManager,
+ Serializable addr,
+ String id,
+ Object body) {
+
+ InetSocketAddress[] sockets = null;
+ if (addr instanceof InetSocketAddress[]) {
+ sockets = (InetSocketAddress[]) addr;
+ } else if (addr instanceof InetSocketAddress) {
+ sockets = new InetSocketAddress[1];
+ sockets[0] = (InetSocketAddress) addr;
+ }
+
+
+ BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
+ Exception exception = null;
+ for (InetSocketAddress socket : sockets) {
+ try {
+ Conversation conv = conversationManager.startConversation(socket,
+ id);
+ conv.sendMessage(body,
+ handler);
+ exception = null;
+ } catch (Exception e) {
+ exception = e;
+ conversationManager.endConversation();
+ }
+ if (exception == null) {
+ break;
+ }
+ }
+ if (exception != null) {
+ throw new RuntimeException("Unable to send message",
+ exception);
+ }
+ try {
+ return handler.getMessage().getBody();
+ } finally {
+ conversationManager.endConversation();
+ }
+ }
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java 2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java 2010-11-06 00:02:23 UTC (rev 35888)
@@ -17,6 +17,7 @@
package org.drools.grid;
+import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -24,7 +25,11 @@
import org.drools.KnowledgeBaseFactoryService;
import org.drools.SystemEventListenerFactory;
import org.drools.builder.KnowledgeBuilder;
+import org.drools.builder.KnowledgeBuilderError;
+import org.drools.builder.KnowledgeBuilderErrors;
import org.drools.builder.KnowledgeBuilderFactoryService;
+import org.drools.builder.ResourceType;
+import org.drools.command.runtime.rule.InsertObjectCommand;
import org.drools.grid.impl.GridImpl;
import org.drools.grid.impl.GridNodeLocalConfiguration;
import org.drools.grid.impl.GridNodeSocketConfiguration;
@@ -43,7 +48,9 @@
import org.drools.grid.timer.impl.RegisterSchedulerConfiguration;
import org.drools.grid.timer.impl.SchedulerLocalConfiguration;
import org.drools.grid.timer.impl.SchedulerSocketConfiguration;
+import org.drools.io.impl.ByteArrayResource;
import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -57,6 +64,8 @@
*/
public class NodeTests {
+
+
private Map<String, GridServiceDescription> coreServicesMap;
public NodeTests() {
@@ -152,10 +161,30 @@
Assert.assertNotNull(kbuilder);
+ String rule = "package test\n"
+ + "Rule \"test\""
+ + " when"
+ + " then"
+ + " System.out.println(\"Rule Fired!\");"
+ + " end";
+
+ kbuilder.add(new ByteArrayResource(rule.getBytes()), ResourceType.DRL);
+
+ KnowledgeBuilderErrors errors = kbuilder.getErrors();
+ if(errors != null && errors.size() > 0){
+ for(KnowledgeBuilderError error : errors){
+ System.out.println("Error: "+error.getMessage());
+
+ }
+ return;
+ }
+
KnowledgeBase kbase = gnode.get(KnowledgeBaseFactoryService.class).newKnowledgeBase();
Assert.assertNotNull(kbase);
+ kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
+
StatefulKnowledgeSession session = kbase.newStatefulKnowledgeSession();
Assert.assertNotNull(session);
@@ -167,11 +196,19 @@
Assert.assertNotNull(gnode);
-
-
+
+ FactHandle handle = session.insert(new MyObject());
+ Assert.assertNotNull(handle);
+
+ int i = session.fireAllRules();
+ Assert.assertEquals(1, i);
+
+
}
+
+
private void configureGrid1(Grid grid, int port){
@@ -227,5 +264,9 @@
}
+ private static class MyObject implements Serializable{
+ public MyObject() {
+ }
+ }
}
\ No newline at end of file
More information about the jboss-svn-commits
mailing list