[jboss-svn-commits] JBL Code SVN: r35888 - in labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src: main/java/org/drools/grid/remote and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Fri Nov 5 20:02:24 EDT 2010


Author: salaboy21
Date: 2010-11-05 20:02:23 -0400 (Fri, 05 Nov 2010)
New Revision: 35888

Modified:
   labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java
Log:
JBRULES-2772: Drools Grid Impl2 remoting features (socket)
	- NodeTests, now insert and firingAllRules 
            

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java	2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/impl/GridNodeServer.java	2010-11-06 00:02:23 UTC (rev 35888)
@@ -72,8 +72,7 @@
                                                         localSessionContext.set( "kresults_" + cmd.getName(),
                                                                                  localKresults );
 
-                                                        //for ( GenericCommand cmd : commands ) {
-                                                        // evaluate the commands
+                                                       
 
                                                         Object result = command.execute( localSessionContext );
                                                          

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java	2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java	2010-11-06 00:02:23 UTC (rev 35888)
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.UUID;
 import org.drools.KnowledgeBase;
+import org.drools.command.KnowledgeBaseAddKnowledgePackagesCommand;
 import org.drools.command.KnowledgeContextResolveFromContextCommand;
 import org.drools.command.NewStatefulKnowledgeSessionCommand;
 import org.drools.command.SetVariableCommand;
@@ -36,6 +37,7 @@
 import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
 import org.drools.grid.io.Conversation;
 import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CollectionClient;
 import org.drools.grid.io.impl.CommandImpl;
 import org.drools.runtime.Environment;
 import org.drools.runtime.KnowledgeSessionConfiguration;
@@ -59,7 +61,22 @@
     }
 
     public void addKnowledgePackages(Collection<KnowledgePackage> kpackages) {
-        throw new UnsupportedOperationException("Not supported yet.");
+        String kuilderInstanceId = ((CollectionClient<KnowledgePackage>) kpackages).getParentInstanceId();
+        String kresultsId = "kresults_" + this.cm.toString();
+        String localId = UUID.randomUUID().toString();
+        
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new KnowledgeBaseAddKnowledgePackagesCommand(),
+                                                                                  kuilderInstanceId,
+                                                                                  this.instanceId,
+                                                                                  null,
+                                                                                  kresultsId )}));
+        
+        sendMessage(this.cm,
+                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getServiceInterface().getName(),
+                cmd);
+        
     }
 
     public Collection<KnowledgePackage> getKnowledgePackages() {
@@ -133,37 +150,7 @@
                                                          this.cm );
         
         
-//        String kresultsId = "kresults_" + this.messageSession.getSessionId();
-//
-//        String localId = UUID.randomUUID().toString();
-//
-//        Message msg = new Message( this.messageSession.getSessionId(),
-//                                   this.messageSession.counter.incrementAndGet(),
-//                                   false,
-//                                   new SetVariableCommand( "__TEMP__",
-//                                                           localId,
-//                                                           new KnowledgeContextResolveFromContextCommand( new NewStatefulKnowledgeSessionCommand( conf ),
-//                                                                                                          null,
-//                                                                                                          this.instanceId,
-//                                                                                                          null,
-//                                                                                                          kresultsId ) ) );
-//
-//        try {
-//            this.connector.connect();
-//            Object object = this.connector.write( msg ).getPayload();
-//
-//            //            if (!(object instanceof FinishedCommand)) {
-//            //                throw new RuntimeException("Response was not correctly ended");
-//            //            }
-//            this.connector.disconnect();
-//        } catch ( Exception e ) {
-//            throw new RuntimeException( "Unable to execute message",
-//                                        e );
-//        }
-//
-//        return new StatefulKnowledgeSessionRemoteClient( localId,
-//                                                         this.connector,
-//                                                         this.messageSession );
+
     }
 
     public StatefulKnowledgeSession newStatefulKnowledgeSession() {

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java	2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java	2010-11-06 00:02:23 UTC (rev 35888)
@@ -78,7 +78,7 @@
                 this.gsd.getServiceInterface().getName(),
                 cmd);
         
-        return new KnowledgeBuilderRemoteClient( localId, this.cm );
+        return new KnowledgeBuilderRemoteClient( localId,this.gsd, this.cm );
 
     }
 

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java	2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java	2010-11-06 00:02:23 UTC (rev 35888)
@@ -17,15 +17,32 @@
 
 package org.drools.grid.remote;
 
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
 import org.drools.KnowledgeBase;
 import org.drools.builder.KnowledgeBuilder;
 import org.drools.builder.KnowledgeBuilderErrors;
 import org.drools.builder.ResourceConfiguration;
 import org.drools.builder.ResourceType;
+import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.builder.KnowledgeBuilderAddCommand;
+import org.drools.command.builder.KnowledgeBuilderGetErrorsCommand;
+import org.drools.command.impl.GenericCommand;
+import org.drools.command.runtime.BatchExecutionCommandImpl;
 import org.drools.definition.KnowledgePackage;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
 import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CollectionClient;
+import org.drools.grid.io.impl.CommandImpl;
 import org.drools.io.Resource;
+import org.drools.runtime.ExecutionResults;
 
 /**
  *
@@ -33,26 +50,48 @@
  */
 public class KnowledgeBuilderRemoteClient implements KnowledgeBuilder{
 
-    private String localId;
+    private String instanceId;
     private ConversationManager cm;
+    private GridServiceDescription gsd;
     
-    public KnowledgeBuilderRemoteClient(String localId, ConversationManager cm) {
-        this.localId = localId;
+    public KnowledgeBuilderRemoteClient(String localId, GridServiceDescription gsd, ConversationManager cm) {
+        this.instanceId = localId;
+        this.gsd = gsd;
         this.cm = cm;
     }
 
     
     
     public void add(Resource resource, ResourceType type) {
-        throw new UnsupportedOperationException("Not supported yet.");
+       add(resource, type, null);
+        
     }
 
     public void add(Resource resource, ResourceType type, ResourceConfiguration configuration) {
-        throw new UnsupportedOperationException("Not supported yet.");
+         
+        String localId = UUID.randomUUID().toString();
+        
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new KnowledgeBuilderAddCommand( resource,
+                                                                                                                  type,
+                                                                                                                  configuration ),
+                                                                                            this.instanceId,
+                                                                                            null,
+                                                                                            null,
+                                                                                            null )}));
+        
+        sendMessage(this.cm,
+                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getServiceInterface().getName(),
+                cmd);
+        
+       
+        
+
     }
 
     public Collection<KnowledgePackage> getKnowledgePackages() {
-        throw new UnsupportedOperationException("Not supported yet.");
+        return new CollectionClient<KnowledgePackage>( this.instanceId );
     }
 
     public KnowledgeBase newKnowledgeBase() {
@@ -64,7 +103,69 @@
     }
 
     public KnowledgeBuilderErrors getErrors() {
-        throw new UnsupportedOperationException("Not supported yet.");
+        String commandId = "kbuilder.getErrors_" + this.gsd.getId();
+        String kresultsId = "kresults_" + this.gsd.getId();
+        String localId = UUID.randomUUID().toString();
+        
+        
+        
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new KnowledgeBuilderGetErrorsCommand( ),
+                                                                                  this.instanceId,
+                                                                                  null,
+                                                                                  null,
+                                                                                  kresultsId ) }));
+        
+        Object result = sendMessage(this.cm,
+                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getServiceInterface().getName(),
+                cmd);
+        
+         return (KnowledgeBuilderErrors) result;
+
+
     }
+    
+     public static Object sendMessage(ConversationManager conversationManager,
+            Serializable addr,
+            String id,
+            Object body) {
 
+        InetSocketAddress[] sockets = null;
+        if (addr instanceof InetSocketAddress[]) {
+            sockets = (InetSocketAddress[]) addr;
+        } else if (addr instanceof InetSocketAddress) {
+            sockets = new InetSocketAddress[1];
+            sockets[0] = (InetSocketAddress) addr;
+        }
+
+
+        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
+        Exception exception = null;
+        for (InetSocketAddress socket : sockets) {
+            try {
+                Conversation conv = conversationManager.startConversation(socket,
+                        id);
+                conv.sendMessage(body,
+                        handler);
+                exception = null;
+            } catch (Exception e) {
+                exception = e;
+                conversationManager.endConversation();
+            }
+            if (exception == null) {
+                break;
+            }
+        }
+        if (exception != null) {
+            throw new RuntimeException("Unable to send message",
+                    exception);
+        }
+        try {
+            return handler.getMessage().getBody();
+        } finally {
+            conversationManager.endConversation();
+        }
+    }
+
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java	2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java	2010-11-06 00:02:23 UTC (rev 35888)
@@ -17,15 +17,24 @@
 
 package org.drools.grid.remote;
 
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import org.drools.KnowledgeBase;
 import org.drools.command.Command;
+import org.drools.command.CommandFactory;
+import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.runtime.rule.InsertObjectCommand;
 import org.drools.event.process.ProcessEventListener;
 import org.drools.event.rule.AgendaEventListener;
 import org.drools.event.rule.WorkingMemoryEventListener;
 import org.drools.grid.GridServiceDescription;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
 import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CommandImpl;
 import org.drools.runtime.Calendars;
 import org.drools.runtime.Channel;
 import org.drools.runtime.Environment;
@@ -70,7 +79,49 @@
     }
 
     public int fireAllRules() {
-        throw new UnsupportedOperationException("Not supported yet.");
+         String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newFireAllRules( ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        Object result = sendMessage(this.cm,
+                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getServiceInterface().getName(),
+                cmd);
+        
+        
+        
+        return (Integer) result;
+        
+        
+//         String commandId = "ksession.fireAllRules" + this.messageSession.getNextId();
+//        String kresultsId = "kresults_" + this.messageSession.getSessionId();
+//
+//        Message msg = new Message( this.messageSession.getSessionId(),
+//                                   this.messageSession.counter.incrementAndGet(),
+//                                   false,
+//                                   new KnowledgeContextResolveFromContextCommand( CommandFactory.newFireAllRules( commandId ),
+//                                                                                  null,
+//                                                                                  null,
+//                                                                                  this.instanceId,
+//                                                                                  kresultsId ) );
+//        try {
+//            this.connector.connect();
+//            Object object = this.connector.write( msg ).getPayload();
+//
+//            if ( object == null ) {
+//                throw new RuntimeException( "Response was not correctly received" );
+//            }
+//            this.connector.disconnect();
+//            //return (Integer) ((ExecutionResults) object).getValue(commandId);
+//            return (Integer) object;
+//        } catch ( Exception e ) {
+//            throw new RuntimeException( "Unable to execute message",
+//                                        e );
+//        }
     }
 
     public int fireAllRules(int max) {
@@ -90,7 +141,52 @@
     }
 
     public <T> T execute(Command<T> command) {
-        throw new UnsupportedOperationException("Not supported yet.");
+//        String localId = UUID.randomUUID().toString();
+//        String commandId = "ksession.execute" + this.gsd.getId();
+//        String kresultsId = "kresults_" + this.gsd.getId();
+//        CommandImpl cmd = new CommandImpl("execute",
+//                Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( command ),
+//                                                                                  null,
+//                                                                                  null,
+//                                                                                  this.instanceId,
+//                                                                                  kresultsId )}));
+//        
+//        Object result = sendMessage(this.cm,
+//                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+//                this.gsd.getServiceInterface().getName(),
+//                cmd);
+//        
+//        
+//        
+//        return (T) result;
+        
+//        String commandId = "ksession.execute" + this.messageSession.getNextId();
+//        String kresultsId = "kresults_" + this.messageSession.getSessionId();
+//
+//        Message msg = new Message( this.messageSession.getSessionId(),
+//                                   this.messageSession.counter.incrementAndGet(),
+//                                   false,
+//                                   new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( commandId,
+//                                                                                                      command ),
+//                                                                                  null,
+//                                                                                  null,
+//                                                                                  this.instanceId,
+//                                                                                  kresultsId ) );
+//
+//        try {
+//            this.connector.connect();
+//            Object object = this.connector.write( msg ).getPayload();
+//            if ( object == null ) {
+//                throw new RuntimeException( "Response was not correctly received" );
+//            }
+//            this.connector.disconnect();
+//            return (ExecutionResults) ((ExecutionResults) object).getValue( commandId );
+//        } catch ( Exception e ) {
+//            throw new RuntimeException( "Unable to execute message",
+//                                        e );
+//        }
+    throw new UnsupportedOperationException("Not supported yet.");    
+        
     }
 
     public <T extends SessionClock> T getSessionClock() {
@@ -174,7 +270,24 @@
     }
 
     public FactHandle insert(Object object) {
-        throw new UnsupportedOperationException("Not supported yet.");
+        
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new InsertObjectCommand( object,
+                                                                                                           true ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        Object result = sendMessage(this.cm,
+                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getServiceInterface().getName(),
+                cmd);
+        
+        
+        
+        return (FactHandle) result;
     }
 
     public void retract(FactHandle handle) {
@@ -281,4 +394,45 @@
         throw new UnsupportedOperationException("Not supported yet.");
     }
 
+    public static Object sendMessage(ConversationManager conversationManager,
+            Serializable addr,
+            String id,
+            Object body) {
+
+        InetSocketAddress[] sockets = null;
+        if (addr instanceof InetSocketAddress[]) {
+            sockets = (InetSocketAddress[]) addr;
+        } else if (addr instanceof InetSocketAddress) {
+            sockets = new InetSocketAddress[1];
+            sockets[0] = (InetSocketAddress) addr;
+        }
+
+
+        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
+        Exception exception = null;
+        for (InetSocketAddress socket : sockets) {
+            try {
+                Conversation conv = conversationManager.startConversation(socket,
+                        id);
+                conv.sendMessage(body,
+                        handler);
+                exception = null;
+            } catch (Exception e) {
+                exception = e;
+                conversationManager.endConversation();
+            }
+            if (exception == null) {
+                break;
+            }
+        }
+        if (exception != null) {
+            throw new RuntimeException("Unable to send message",
+                    exception);
+        }
+        try {
+            return handler.getMessage().getBody();
+        } finally {
+            conversationManager.endConversation();
+        }
+    }
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java	2010-11-05 20:59:36 UTC (rev 35887)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl2/src/test/java/org/drools/grid/NodeTests.java	2010-11-06 00:02:23 UTC (rev 35888)
@@ -17,6 +17,7 @@
 
 package org.drools.grid;
 
+import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
@@ -24,7 +25,11 @@
 import org.drools.KnowledgeBaseFactoryService;
 import org.drools.SystemEventListenerFactory;
 import org.drools.builder.KnowledgeBuilder;
+import org.drools.builder.KnowledgeBuilderError;
+import org.drools.builder.KnowledgeBuilderErrors;
 import org.drools.builder.KnowledgeBuilderFactoryService;
+import org.drools.builder.ResourceType;
+import org.drools.command.runtime.rule.InsertObjectCommand;
 import org.drools.grid.impl.GridImpl;
 import org.drools.grid.impl.GridNodeLocalConfiguration;
 import org.drools.grid.impl.GridNodeSocketConfiguration;
@@ -43,7 +48,9 @@
 import org.drools.grid.timer.impl.RegisterSchedulerConfiguration;
 import org.drools.grid.timer.impl.SchedulerLocalConfiguration;
 import org.drools.grid.timer.impl.SchedulerSocketConfiguration;
+import org.drools.io.impl.ByteArrayResource;
 import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -57,6 +64,8 @@
  */
 public class NodeTests {
 
+    
+
     private Map<String, GridServiceDescription> coreServicesMap;
     
     public NodeTests() {
@@ -152,10 +161,30 @@
          
          Assert.assertNotNull(kbuilder);
          
+         String rule = "package test\n"
+                 + "Rule \"test\""
+                 + "  when"
+                 + "  then"
+                 + "      System.out.println(\"Rule Fired!\");"
+                 + " end";
+         
+         kbuilder.add(new ByteArrayResource(rule.getBytes()), ResourceType.DRL);
+         
+         KnowledgeBuilderErrors errors = kbuilder.getErrors();
+         if(errors != null && errors.size() > 0){
+             for(KnowledgeBuilderError error : errors){
+                 System.out.println("Error: "+error.getMessage());
+
+             }
+             return;
+         }
+         
          KnowledgeBase kbase = gnode.get(KnowledgeBaseFactoryService.class).newKnowledgeBase();
          
          Assert.assertNotNull(kbase);
          
+         kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
+         
          StatefulKnowledgeSession session = kbase.newStatefulKnowledgeSession();
          
          Assert.assertNotNull(session);
@@ -167,11 +196,19 @@
          
          
          Assert.assertNotNull(gnode);
-                 
-                 
+         
+         FactHandle handle = session.insert(new MyObject());
+         Assert.assertNotNull(handle);
+         
+         int i = session.fireAllRules();
+         Assert.assertEquals(1, i);        
+        
+         
      
      }
      
+      
+      
      
      private void configureGrid1(Grid grid, int port){
     
@@ -227,5 +264,9 @@
     
     }
      
+    private static class MyObject implements Serializable{
 
+        public MyObject() {
+        }
+    }
 }
\ No newline at end of file



More information about the jboss-svn-commits mailing list