[jboss-svn-commits] JBL Code SVN: r35938 - in labs/jbossrules/trunk/drools-grid/drools-grid-impl/src: main/java/org/drools/grid/remote/command and 1 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Nov 10 11:41:48 EST 2010


Author: salaboy21
Date: 2010-11-10 11:41:47 -0500 (Wed, 10 Nov 2010)
New Revision: 35938

Added:
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java
Modified:
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java
Log:
JBRULES-2772: Drools Grid Impl2 remoting features (socket)
	- NodeTests, Add ConversationUtil with send message
	- Adding more remoting implementations

Added: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.grid.remote;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
+import org.drools.grid.io.ConversationManager;
+
+/**
+ *
+ * @author salaboy
+ */
+public class ConversationUtil {
+    public static Object sendMessage(ConversationManager conversationManager,
+                                     Serializable addr,
+                                     String id,
+                                     Object body) {
+
+        InetSocketAddress[] sockets = null;
+        if ( addr instanceof InetSocketAddress[] ) {
+            sockets = (InetSocketAddress[]) addr;
+        } else if ( addr instanceof InetSocketAddress ) {
+            sockets = new InetSocketAddress[ 1 ];
+            sockets[0] = (InetSocketAddress) addr;
+        }
+
+        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
+        Exception exception = null;
+        for ( InetSocketAddress socket : sockets ) {
+            try {
+                Conversation conv = conversationManager.startConversation( socket,
+                                                                           id );
+                conv.sendMessage( body,
+                                  handler );
+                exception = null;
+            } catch ( Exception e ) {
+                exception = e;
+                conversationManager.endConversation();
+            }
+            if ( exception == null ) {
+                break;
+            }
+        }
+        if ( exception != null ) {
+            throw new RuntimeException( "Unable to send message",
+                                        exception );
+        }
+        try {
+            return handler.getMessage().getBody();
+        } finally {
+            conversationManager.endConversation();
+        }
+    }
+}

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -41,7 +41,8 @@
     private GridServiceDescription    gsd;
     private final Map<String, Object> localContext    = new ConcurrentHashMap<String, Object>();
     private final ServiceRegistry     serviceRegistry = ServiceRegistryImpl.getInstance();
-
+    private MinaConnector connector = new MinaConnector();
+    
     public GridNodeRemoteClient(GridServiceDescription gsd) {
         this.gsd = gsd;
         init( this.localContext );
@@ -73,7 +74,7 @@
 
     public void init(Object context) {
 
-        MinaConnector connector = new MinaConnector();
+        
         ConversationManager cm = new ConversationManagerImpl( this.gsd.getId(),
                                                               connector,
                                                               SystemEventListenerFactory.getSystemEventListener() );
@@ -87,7 +88,7 @@
     }
 
     public void dispose() {
-        throw new UnsupportedOperationException( "Not supported yet." );
+         connector.close();
     }
 
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -78,7 +78,7 @@
                                                                                                 localId,
                                                                                                 new NewKnowledgeBaseCommand( null ) ) } ) );
 
-        sendMessage( this.cm,
+        ConversationUtil.sendMessage( this.cm,
                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                      this.gsd.getId(),
                      cmd );
@@ -106,44 +106,5 @@
         throw new UnsupportedOperationException( "Not supported yet." );
     }
 
-    public static Object sendMessage(ConversationManager conversationManager,
-                                     Serializable addr,
-                                     String id,
-                                     Object body) {
-
-        InetSocketAddress[] sockets = null;
-        if ( addr instanceof InetSocketAddress[] ) {
-            sockets = (InetSocketAddress[]) addr;
-        } else if ( addr instanceof InetSocketAddress ) {
-            sockets = new InetSocketAddress[ 1 ];
-            sockets[0] = (InetSocketAddress) addr;
-        }
-
-        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
-        Exception exception = null;
-        for ( InetSocketAddress socket : sockets ) {
-            try {
-                Conversation conv = conversationManager.startConversation( socket,
-                                                                           id );
-                conv.sendMessage( body,
-                                  handler );
-                exception = null;
-            } catch ( Exception e ) {
-                exception = e;
-                conversationManager.endConversation();
-            }
-            if ( exception == null ) {
-                break;
-            }
-        }
-        if ( exception != null ) {
-            throw new RuntimeException( "Unable to send message",
-                                        exception );
-        }
-        try {
-            return handler.getMessage().getBody();
-        } finally {
-            conversationManager.endConversation();
-        }
-    }
+   
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -77,7 +77,7 @@
                                                                                                                        null,
                                                                                                                        kresultsId ) } ) );
 
-        sendMessage( this.cm,
+        ConversationUtil.sendMessage( this.cm,
                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                      this.gsd.getId(),
                      cmd );
@@ -152,7 +152,7 @@
                                                                                                                                                null,
                                                                                                                                                kresultsId ) ) } ) );
 
-        sendMessage( this.cm,
+        ConversationUtil.sendMessage( this.cm,
                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                      this.gsd.getId(),
                      cmd );
@@ -192,45 +192,5 @@
         throw new UnsupportedOperationException( "Not supported yet." );
     }
 
-    public static Object sendMessage(ConversationManager conversationManager,
-                                     Serializable addr,
-                                     String id,
-                                     Object body) {
-
-        InetSocketAddress[] sockets = null;
-        if ( addr instanceof InetSocketAddress[] ) {
-            sockets = (InetSocketAddress[]) addr;
-        } else if ( addr instanceof InetSocketAddress ) {
-            sockets = new InetSocketAddress[ 1 ];
-            sockets[0] = (InetSocketAddress) addr;
-        }
-
-        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
-        Exception exception = null;
-        for ( InetSocketAddress socket : sockets ) {
-            try {
-                Conversation conv = conversationManager.startConversation( socket,
-                                                                           id );
-                conv.sendMessage( body,
-                                  handler );
-                exception = null;
-            } catch ( Exception e ) {
-                exception = e;
-                conversationManager.endConversation();
-            }
-            if ( exception == null ) {
-                break;
-            }
-        }
-        if ( exception != null ) {
-            throw new RuntimeException( "Unable to send message",
-                                        exception );
-        }
-        try {
-            return handler.getMessage().getBody();
-        } finally {
-            conversationManager.endConversation();
-        }
-    }
-
+   
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -17,7 +17,6 @@
 package org.drools.grid.remote;
 
 import com.sun.tools.xjc.Options;
-import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Properties;
@@ -32,8 +31,6 @@
 import org.drools.command.builder.NewKnowledgeBuilderCommand;
 import org.drools.grid.GridNode;
 import org.drools.grid.GridServiceDescription;
-import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
-import org.drools.grid.io.Conversation;
 import org.drools.grid.io.ConversationManager;
 import org.drools.grid.io.impl.CommandImpl;
 
@@ -76,7 +73,7 @@
                                                                                                 localId,
                                                                                                 new NewKnowledgeBuilderCommand( null ) ) } ) );
 
-        sendMessage( this.cm,
+        ConversationUtil.sendMessage( this.cm,
                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                      this.gsd.getId(),
                      cmd );
@@ -105,44 +102,5 @@
         throw new UnsupportedOperationException( "Not supported yet." );
     }
 
-    public static Object sendMessage(ConversationManager conversationManager,
-                                     Serializable addr,
-                                     String recipientId,
-                                     Object body) {
-
-        InetSocketAddress[] sockets = null;
-        if ( addr instanceof InetSocketAddress[] ) {
-            sockets = (InetSocketAddress[]) addr;
-        } else if ( addr instanceof InetSocketAddress ) {
-            sockets = new InetSocketAddress[ 1 ];
-            sockets[0] = (InetSocketAddress) addr;
-        }
-
-        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
-        Exception exception = null;
-        for ( InetSocketAddress socket : sockets ) {
-            try {
-                Conversation conv = conversationManager.startConversation( socket,
-                                                                           recipientId );
-                conv.sendMessage( body,
-                                  handler );
-                exception = null;
-            } catch ( Exception e ) {
-                exception = e;
-                conversationManager.endConversation();
-            }
-            if ( exception == null ) {
-                break;
-            }
-        }
-        if ( exception != null ) {
-            throw new RuntimeException( "Unable to send message",
-                                        exception );
-        }
-        try {
-            return handler.getMessage().getBody();
-        } finally {
-            conversationManager.endConversation();
-        }
-    }
+    
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -17,12 +17,9 @@
 
 package org.drools.grid.remote;
 
-import java.io.Serializable;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 import java.util.UUID;
 import org.drools.KnowledgeBase;
 import org.drools.builder.KnowledgeBuilder;
@@ -32,18 +29,13 @@
 import org.drools.command.KnowledgeContextResolveFromContextCommand;
 import org.drools.command.builder.KnowledgeBuilderAddCommand;
 import org.drools.command.builder.KnowledgeBuilderGetErrorsCommand;
-import org.drools.command.impl.GenericCommand;
-import org.drools.command.runtime.BatchExecutionCommandImpl;
 import org.drools.definition.KnowledgePackage;
 import org.drools.grid.GridNode;
 import org.drools.grid.GridServiceDescription;
-import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
-import org.drools.grid.io.Conversation;
 import org.drools.grid.io.ConversationManager;
 import org.drools.grid.io.impl.CollectionClient;
 import org.drools.grid.io.impl.CommandImpl;
 import org.drools.io.Resource;
-import org.drools.runtime.ExecutionResults;
 
 /**
  *
@@ -88,7 +80,7 @@
                                                                                                                        null,
                                                                                                                        null ) } ) );
 
-        sendMessage( this.cm,
+        ConversationUtil.sendMessage( this.cm,
                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                      this.gsd.getId(),
                      cmd );
@@ -119,7 +111,7 @@
                                                                                                                        null,
                                                                                                                        kresultsId ) } ) );
 
-        Object result = sendMessage( this.cm,
+        Object result = ConversationUtil.sendMessage( this.cm,
                                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                                      this.gsd.getId(),
                                      cmd );
@@ -127,46 +119,5 @@
         return (KnowledgeBuilderErrors) result;
 
     }
-
-    public static Object sendMessage(ConversationManager conversationManager,
-                                     Serializable addr,
-                                     String id,
-                                     Object body) {
-
-        InetSocketAddress[] sockets = null;
-        if ( addr instanceof InetSocketAddress[] ) {
-            sockets = (InetSocketAddress[]) addr;
-        } else if ( addr instanceof InetSocketAddress ) {
-            sockets = new InetSocketAddress[ 1 ];
-            sockets[0] = (InetSocketAddress) addr;
-        }
-
-        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
-        Exception exception = null;
-        for ( InetSocketAddress socket : sockets ) {
-            try {
-                Conversation conv = conversationManager.startConversation( socket,
-                                                                           id );
-                conv.sendMessage( body,
-                                  handler );
-                exception = null;
-            } catch ( Exception e ) {
-                exception = e;
-                conversationManager.endConversation();
-            }
-            if ( exception == null ) {
-                break;
-            }
-        }
-        if ( exception != null ) {
-            throw new RuntimeException( "Unable to send message",
-                                        exception );
-        }
-        try {
-            return handler.getMessage().getBody();
-        } finally {
-            conversationManager.endConversation();
-        }
-    }
-
+ 
 }

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -17,7 +17,6 @@
 
 package org.drools.grid.remote;
 
-import java.io.Serializable;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
@@ -26,16 +25,19 @@
 import org.drools.command.Command;
 import org.drools.command.CommandFactory;
 import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.runtime.process.SignalEventCommand;
+import org.drools.command.runtime.process.StartProcessCommand;
+import org.drools.command.runtime.rule.GetFactHandleCommand;
 import org.drools.command.runtime.rule.InsertObjectCommand;
+import org.drools.command.runtime.rule.UpdateCommand;
 import org.drools.event.process.ProcessEventListener;
 import org.drools.event.rule.AgendaEventListener;
 import org.drools.event.rule.WorkingMemoryEventListener;
 import org.drools.grid.GridNode;
 import org.drools.grid.GridServiceDescription;
-import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
-import org.drools.grid.io.Conversation;
 import org.drools.grid.io.ConversationManager;
 import org.drools.grid.io.impl.CommandImpl;
+import org.drools.grid.remote.command.GetWorkingMemoryEntryPointRemoteCommand;
 import org.drools.runtime.Calendars;
 import org.drools.runtime.Channel;
 import org.drools.runtime.Environment;
@@ -91,38 +93,14 @@
                                                                                                                        this.instanceId,
                                                                                                                        kresultsId ) } ) );
 
-        Object result = sendMessage( this.cm,
+        Object result = ConversationUtil.sendMessage( this.cm,
                                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                                      this.gsd.getId(),
                                      cmd );
 
         return (Integer) result;
 
-        //         String commandId = "ksession.fireAllRules" + this.messageSession.getNextId();
-        //        String kresultsId = "kresults_" + this.messageSession.getSessionId();
-        //
-        //        Message msg = new Message( this.messageSession.getSessionId(),
-        //                                   this.messageSession.counter.incrementAndGet(),
-        //                                   false,
-        //                                   new KnowledgeContextResolveFromContextCommand( CommandFactory.newFireAllRules( commandId ),
-        //                                                                                  null,
-        //                                                                                  null,
-        //                                                                                  this.instanceId,
-        //                                                                                  kresultsId ) );
-        //        try {
-        //            this.connector.connect();
-        //            Object object = this.connector.write( msg ).getPayload();
-        //
-        //            if ( object == null ) {
-        //                throw new RuntimeException( "Response was not correctly received" );
-        //            }
-        //            this.connector.disconnect();
-        //            //return (Integer) ((ExecutionResults) object).getValue(commandId);
-        //            return (Integer) object;
-        //        } catch ( Exception e ) {
-        //            throw new RuntimeException( "Unable to execute message",
-        //                                        e );
-        //        }
+       
     }
 
     public int fireAllRules(int max) {
@@ -142,50 +120,7 @@
     }
 
     public <T> T execute(Command<T> command) {
-        //        String localId = UUID.randomUUID().toString();
-        //        String commandId = "ksession.execute" + this.gsd.getId();
-        //        String kresultsId = "kresults_" + this.gsd.getId();
-        //        CommandImpl cmd = new CommandImpl("execute",
-        //                Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( command ),
-        //                                                                                  null,
-        //                                                                                  null,
-        //                                                                                  this.instanceId,
-        //                                                                                  kresultsId )}));
-        //        
-        //        Object result = sendMessage(this.cm,
-        //                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
-        //                this.gsd.getServiceInterface().getName(),
-        //                cmd);
-        //        
-        //        
-        //        
-        //        return (T) result;
-
-        //        String commandId = "ksession.execute" + this.messageSession.getNextId();
-        //        String kresultsId = "kresults_" + this.messageSession.getSessionId();
-        //
-        //        Message msg = new Message( this.messageSession.getSessionId(),
-        //                                   this.messageSession.counter.incrementAndGet(),
-        //                                   false,
-        //                                   new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( commandId,
-        //                                                                                                      command ),
-        //                                                                                  null,
-        //                                                                                  null,
-        //                                                                                  this.instanceId,
-        //                                                                                  kresultsId ) );
-        //
-        //        try {
-        //            this.connector.connect();
-        //            Object object = this.connector.write( msg ).getPayload();
-        //            if ( object == null ) {
-        //                throw new RuntimeException( "Response was not correctly received" );
-        //            }
-        //            this.connector.disconnect();
-        //            return (ExecutionResults) ((ExecutionResults) object).getValue( commandId );
-        //        } catch ( Exception e ) {
-        //            throw new RuntimeException( "Unable to execute message",
-        //                                        e );
-        //        }
+        
         throw new UnsupportedOperationException( "Not supported yet." );
 
     }
@@ -196,11 +131,38 @@
 
     public void setGlobal(String identifier,
                           Object object) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+           String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newSetGlobal( identifier,
+                                                                                                       object ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
     }
 
     public Object getGlobal(String identifier) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newGetGlobal( identifier ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )})); 
+        
+        Object result = ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
+        
+        
+        
+        return result;
     }
 
     public Globals getGlobals() {
@@ -254,7 +216,26 @@
     }
 
     public WorkingMemoryEntryPoint getWorkingMemoryEntryPoint(String name) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new GetWorkingMemoryEntryPointRemoteCommand( name ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  name,
+                                                                                  kresultsId )}));
+        
+        ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
+        
+        
+        
+        return new WorkingMemoryEntryPointRemoteClient( this.instanceId, 
+                                                            name,
+                                                            this.gsd,
+                                                            this.cm );
     }
 
     public Collection< ? extends WorkingMemoryEntryPoint> getWorkingMemoryEntryPoints() {
@@ -287,7 +268,7 @@
                                                                                                                        this.instanceId,
                                                                                                                        kresultsId ) } ) );
 
-        Object result = sendMessage( this.cm,
+        Object result = ConversationUtil.sendMessage( this.cm,
                                      (InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
                                      this.gsd.getId(),
                                      cmd );
@@ -296,16 +277,51 @@
     }
 
     public void retract(FactHandle handle) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+         String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newRetract( handle ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
     }
 
     public void update(FactHandle handle,
                        Object object) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new UpdateCommand( handle, object ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
     }
 
     public FactHandle getFactHandle(Object object) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new GetFactHandleCommand( object, true ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        Object result = ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
+        
+        return (FactHandle) result;
     }
 
     public Object getObject(FactHandle factHandle) {
@@ -333,23 +349,60 @@
     }
 
     public ProcessInstance startProcess(String processId) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        return startProcess(processId, null);
     }
 
     public ProcessInstance startProcess(String processId,
                                         Map<String, Object> parameters) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new StartProcessCommand( processId, parameters ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        Object result = ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
+        
+        
+        
+        return (ProcessInstance) result;
     }
 
     public void signalEvent(String type,
                             Object event) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new SignalEventCommand( type, event ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
     }
 
     public void signalEvent(String type,
                             Object event,
                             long processInstanceId) {
-        throw new UnsupportedOperationException( "Not supported yet." );
+        String kresultsId = "kresults_" + this.gsd.getId();
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new SignalEventCommand( type, event ),
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  kresultsId )}));
+        
+        ConversationUtil.sendMessage(this.cm,
+                (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+                this.gsd.getId(),
+                cmd);
     }
 
     public Collection<ProcessInstance> getProcessInstances() {
@@ -404,44 +457,5 @@
         throw new UnsupportedOperationException( "Not supported yet." );
     }
 
-    public static Object sendMessage(ConversationManager conversationManager,
-                                     Serializable addr,
-                                     String id,
-                                     Object body) {
-
-        InetSocketAddress[] sockets = null;
-        if ( addr instanceof InetSocketAddress[] ) {
-            sockets = (InetSocketAddress[]) addr;
-        } else if ( addr instanceof InetSocketAddress ) {
-            sockets = new InetSocketAddress[ 1 ];
-            sockets[0] = (InetSocketAddress) addr;
-        }
-
-        BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
-        Exception exception = null;
-        for ( InetSocketAddress socket : sockets ) {
-            try {
-                Conversation conv = conversationManager.startConversation( socket,
-                                                                           id );
-                conv.sendMessage( body,
-                                  handler );
-                exception = null;
-            } catch ( Exception e ) {
-                exception = e;
-                conversationManager.endConversation();
-            }
-            if ( exception == null ) {
-                break;
-            }
-        }
-        if ( exception != null ) {
-            throw new RuntimeException( "Unable to send message",
-                                        exception );
-        }
-        try {
-            return handler.getMessage().getBody();
-        } finally {
-            conversationManager.endConversation();
-        }
-    }
+   
 }

Copied: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java (from rev 35787, labs/jbossrules/trunk/drools-grid/drools-grid-remote-api/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java)
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -0,0 +1,137 @@
+/*
+ *  Copyright 2010 salaboy.
+ * 
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ * 
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  under the License.
+ */
+
+package org.drools.grid.remote;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.drools.FactException;
+import org.drools.FactHandle;
+import org.drools.WorkingMemoryEntryPoint;
+import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.runtime.rule.InsertObjectCommand;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
+import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CommandImpl;
+import org.drools.grid.service.directory.Address;
+import org.drools.runtime.ObjectFilter;
+
+/**
+ *
+ * @author salaboy
+ */
+public class WorkingMemoryEntryPointRemoteClient
+    implements
+    WorkingMemoryEntryPoint {
+
+    private String               instanceId;
+    private String               name;
+    private GridServiceDescription gsd;
+    private ConversationManager cm;
+    
+
+    public WorkingMemoryEntryPointRemoteClient(String instanceId, String name,
+                                               GridServiceDescription gsd,
+                                               ConversationManager cm) {
+        this.instanceId = instanceId;
+        this.name = name;
+        this.gsd = gsd;
+        this.cm = cm;
+        
+    }
+
+    public FactHandle insert(Object object) throws FactException {
+
+        
+        String kresultsId = "kresults_" + this.gsd.getId();
+        
+        InsertObjectCommand insertCmd = new InsertObjectCommand( object, true );
+        insertCmd.setEntryPoint(name);
+        CommandImpl cmd = new CommandImpl("execute",
+                Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( insertCmd,
+                                                                                  null,
+                                                                                  null,
+                                                                                  this.instanceId,
+                                                                                  this.name,
+                                                                                  kresultsId )}));
+        
+        Object result = ConversationUtil.sendMessage(this.cm,
+                ((InetSocketAddress[])((Address) this.gsd.getAddresses().get("socket")).getObject()),
+                this.gsd.getServiceInterface().getName(),
+                cmd);
+        return ((FactHandle)result);
+       
+        
+    }
+
+    public FactHandle insert(Object object,
+                             boolean dynamic) throws FactException {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public void retract(org.drools.runtime.rule.FactHandle handle) throws FactException {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public void update(org.drools.runtime.rule.FactHandle handle,
+                       Object object) throws FactException {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public WorkingMemoryEntryPoint getWorkingMemoryEntryPoint(String name) {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public String getEntryPointId() {
+        return this.name;
+    }
+
+    public org.drools.runtime.rule.FactHandle getFactHandle(Object object) {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public Object getObject(org.drools.runtime.rule.FactHandle factHandle) {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public Collection<Object> getObjects() {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public Collection<Object> getObjects(ObjectFilter filter) {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public <T extends org.drools.runtime.rule.FactHandle> Collection<T> getFactHandles() {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public <T extends org.drools.runtime.rule.FactHandle> Collection<T> getFactHandles(ObjectFilter filter) {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+
+    public long getFactCount() {
+        throw new UnsupportedOperationException( "Not supported yet." );
+    }
+    
+    
+}

Copied: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java (from rev 35787, labs/jbossrules/trunk/drools-grid/drools-grid-remote-api/src/main/java/org/drools/grid/remote/internal/commands/GetWorkingMemoryEntryPointRemoteCommand.java)
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java	                        (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -0,0 +1,55 @@
+/*
+ *  Copyright 2010 salaboy.
+ * 
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ * 
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  under the License.
+ */
+
+package org.drools.grid.remote.command;
+
+/**
+ *
+ * @author salaboy
+ * this class should not exist!
+ */
+
+import org.drools.command.Context;
+import org.drools.command.impl.GenericCommand;
+import org.drools.command.impl.KnowledgeCommandContext;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.WorkingMemoryEntryPoint;
+
+public class GetWorkingMemoryEntryPointRemoteCommand
+    implements
+    GenericCommand<WorkingMemoryEntryPoint> {
+
+    private String name;
+
+    public GetWorkingMemoryEntryPointRemoteCommand(String name) {
+        this.name = name;
+    }
+
+    public WorkingMemoryEntryPoint execute(Context context) {
+        StatefulKnowledgeSession ksession = ((KnowledgeCommandContext) context).getStatefulKnowledgesession();
+        WorkingMemoryEntryPoint ep = ksession.getWorkingMemoryEntryPoint( this.name );
+        context.getContextManager().getDefaultContext().set( this.name,
+                                                             ep );
+        // If I return the command I need to create a serializable version of NamedEntryPoint
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return "session.getWorkingMemoryEntryPoint( " + this.name + " );";
+    }
+}

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java	2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java	2010-11-10 16:41:47 UTC (rev 35938)
@@ -18,7 +18,6 @@
 package org.drools.grid;
 
 import java.io.Serializable;
-import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 import org.drools.KnowledgeBase;
@@ -29,32 +28,19 @@
 import org.drools.builder.KnowledgeBuilderErrors;
 import org.drools.builder.KnowledgeBuilderFactoryService;
 import org.drools.builder.ResourceType;
-import org.drools.command.assertion.AssertEquals;
-import org.drools.command.runtime.rule.InsertObjectCommand;
 import org.drools.grid.conf.GridPeerServiceConfiguration;
-import org.drools.grid.conf.impl.GridNodeLocalConfiguration;
-import org.drools.grid.conf.impl.GridNodeSocketConfiguration;
 import org.drools.grid.conf.impl.GridPeerConfiguration;
 import org.drools.grid.impl.GridImpl;
 import org.drools.grid.impl.MultiplexSocketServerImpl;
 import org.drools.grid.io.impl.MultiplexSocketServiceCongifuration;
-import org.drools.grid.remote.GridNodeRemoteClient;
 import org.drools.grid.remote.mina.MinaAcceptorFactoryService;
-import org.drools.grid.service.directory.Address;
 import org.drools.grid.service.directory.WhitePages;
 import org.drools.grid.service.directory.impl.CoreServicesLookupConfiguration;
-import org.drools.grid.service.directory.impl.GridServiceDescriptionImpl;
 import org.drools.grid.service.directory.impl.WhitePagesLocalConfiguration;
-import org.drools.grid.service.directory.impl.WhitePagesSocketConfiguration;
 import org.drools.grid.timer.impl.CoreServicesSchedulerConfiguration;
-import org.drools.grid.timer.impl.RegisterSchedulerConfiguration;
-import org.drools.grid.timer.impl.SchedulerLocalConfiguration;
-import org.drools.grid.timer.impl.SchedulerSocketConfiguration;
 import org.drools.io.impl.ByteArrayResource;
 import org.drools.runtime.StatefulKnowledgeSession;
 import org.drools.runtime.rule.FactHandle;
-import org.drools.time.Scheduler;
-import org.drools.time.SchedulerService;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -152,7 +138,7 @@
         Assert.assertNotNull( kbuilder );
 
         String rule = "package test\n"
-                      + "Rule \"test\""
+                      + "rule \"test\""
                       + "  when"
                       + "  then"
                       + "      System.out.println(\"Rule Fired!\");"
@@ -180,15 +166,96 @@
 
         Assert.assertNotNull( session );
 
-        FactHandle handle = session.insert( new MyObject() );
+        FactHandle handle = session.insert( new MyObject("myObj1") );
         Assert.assertNotNull( handle );
 
         int i = session.fireAllRules();
         Assert.assertEquals( 1,
                              i );
+        
+        remoteN1.dispose(); 
+         grid1.get(SocketService.class).close();
 
     }
+    
+     @Test
+    public void remoteNodeRetractUpdateGlobalsTest() {
+        Grid grid1 = new GridImpl( new HashMap<String, Object>() );
+        configureGrid1( grid1,
+                        8000,
+                        null );
 
+        Grid grid2 = new GridImpl( new HashMap<String, Object>() );
+        configureGrid1( grid2,
+                        -1,
+                        grid1.get( WhitePages.class ) );
+
+        GridNode n1 = grid1.createGridNode( "n1" );
+        grid1.get( SocketService.class ).addService( "n1", 8000, n1 );
+               
+        GridServiceDescription<GridNode> n1Gsd = grid2.get( WhitePages.class ).lookup( "n1" );
+        GridConnection<GridNode> conn = grid2.get( ConnectionFactoryService.class ).createConnection( n1Gsd );
+        GridNode remoteN1 = conn.connect();
+
+        KnowledgeBuilder kbuilder = remoteN1.get( KnowledgeBuilderFactoryService.class ).newKnowledgeBuilder();
+
+        Assert.assertNotNull( kbuilder );
+
+         String rule = "package test\n"
+                 + "import org.drools.grid.NodeTests.MyObject;\n"
+                 + "global MyObject myGlobalObj;\n"
+                 + "rule \"test\""
+                 + "  when"
+                 + "       $o: MyObject()"
+                 + "  then"
+                 + "      System.out.println(\"My Global Object -> \"+myGlobalObj.getName());"
+                 + "      System.out.println(\"Rule Fired! ->\"+$o.getName());"
+                 + " end";
+
+        kbuilder.add( new ByteArrayResource( rule.getBytes() ),
+                      ResourceType.DRL );
+
+        KnowledgeBuilderErrors errors = kbuilder.getErrors();
+        if ( errors != null && errors.size() > 0 ) {
+            for ( KnowledgeBuilderError error : errors ) {
+                System.out.println( "Error: " + error.getMessage() );
+
+            }
+            fail("KnowledgeBase did not build");
+        }
+
+        KnowledgeBase kbase = remoteN1.get( KnowledgeBaseFactoryService.class ).newKnowledgeBase();
+
+        Assert.assertNotNull( kbase );
+
+        kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
+
+        StatefulKnowledgeSession session = kbase.newStatefulKnowledgeSession();
+
+        Assert.assertNotNull( session );
+        session.setGlobal("myGlobalObj", new MyObject("myGlobalObj"));
+
+        FactHandle handle = session.insert( new MyObject("myObj1") );
+        Assert.assertNotNull( handle );
+
+        int fired = session.fireAllRules();
+        Assert.assertEquals( 1,
+                             fired );
+        
+         session.retract(handle);
+         
+         
+         handle = session.insert(new MyObject("myObj2"));
+         
+         session.update(handle, new MyObject("myObj3"));
+         
+         fired = session.fireAllRules();
+         
+         remoteN1.dispose(); 
+         grid1.get(SocketService.class).close();
+
+    }
+
     private void configureGrid1(Grid grid,
                                 int port,
                                 WhitePages wp) {
@@ -228,11 +295,21 @@
 
     }
 
-    private static class MyObject
+    public static class MyObject
         implements
         Serializable {
+        private String name;
+        public MyObject(String name) {
+            this.name = name;
+        }
 
-        public MyObject() {
+        public String getName() {
+            return name;
         }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+        
     }
 }
\ No newline at end of file



More information about the jboss-svn-commits mailing list