[jboss-svn-commits] JBL Code SVN: r34589 - in labs/jbossrules/trunk/drools-grid: drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina and 4 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Sun Aug 8 13:06:48 EDT 2010


Author: salaboy21
Date: 2010-08-08 13:06:46 -0400 (Sun, 08 Aug 2010)
New Revision: 34589

Modified:
   labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java
   labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java
   labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java
   labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java
   labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java
   labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java
   labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java
   labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java
   labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java
Log:
JBRULES-2615: Drools Grid Services remove unnessary dependencies
	- now the test should pass always. Adding some comments + documentation. Now I need to clean the solution

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -21,14 +21,11 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import org.drools.KnowledgeBase;
 import org.drools.grid.ConnectorException;
-import org.drools.grid.ConnectorType;
 import org.drools.grid.DirectoryNodeService;
-import org.drools.grid.GenericConnection;
 import org.drools.grid.GenericNodeConnector;
 import org.drools.grid.NodeConnectionType;
 import org.drools.grid.internal.Message;

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -14,7 +14,6 @@
  *  limitations under the License.
  *  under the License.
  */
-
 package org.drools.grid.remote.directory;
 
 import java.net.InetSocketAddress;
@@ -24,7 +23,10 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import org.apache.mina.core.future.CloseFuture;
 import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
@@ -46,7 +48,8 @@
  *
  * @author salaboy
  */
-public class RemoteMinaDirectoryConnector implements GenericNodeConnector{
+public class RemoteMinaDirectoryConnector implements GenericNodeConnector {
+
     private GridConnection connection;
     private String providerName;
     private SystemEventListener eventListener;
@@ -54,13 +57,11 @@
     private AtomicInteger counter;
     private SocketConnector connector;
     private SocketAddress address;
-
     
 
     public RemoteMinaDirectoryConnector(String providerName, String providerAddress,
-                            Integer providerPort, SystemEventListener systemEventListener) {
-        SocketConnector minaconnector = new NioSocketConnector();
-        minaconnector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+            Integer providerPort, SystemEventListener systemEventListener) {
+
         if (providerName == null) {
             throw new IllegalArgumentException("Name can not be null");
         }
@@ -68,19 +69,19 @@
         this.providerName = providerName;
         this.eventListener = systemEventListener;
         this.address = new InetSocketAddress(providerAddress, providerPort);
-        this.connector = minaconnector;
         this.connection = new GridConnection();
 
     }
 
-
     public void connect() throws ConnectorException {
         if (session != null && session.isConnected()) {
             throw new IllegalStateException("Already connected. Disconnect first.");
         }
 
         try {
-            this.connector.getFilterChain().addLast("codec"+UUID.randomUUID().toString(),
+            this.connector = new NioSocketConnector();
+            this.connector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+            this.connector.getFilterChain().addLast("codec" + UUID.randomUUID().toString(),
                     new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
             
             ConnectFuture future1 = this.connector.connect(this.address);
@@ -92,7 +93,7 @@
             }
             eventListener.info("connected : " + address);
             this.session = future1.getSession();
-            
+
         } catch (Exception e) {
             throw new ConnectorException(e);
         }
@@ -101,21 +102,34 @@
     public void disconnect() throws ConnectorException {
         
         if (session != null && session.isConnected()) {
-            session.close();
-            session.getCloseFuture().join();
+            
+            CloseFuture future = session.getCloseFuture();
+            future.addListener((IoFutureListener<?>) new IoFutureListener<IoFuture>() {
+
+                public void operationComplete(IoFuture future) {
+                    System.out.println("The remote directory session is now closed");
+                }
+            });
+            session.close(false);
+            
+            future.awaitUninterruptibly();
+
+            connector.dispose();
         }
+        
     }
 
     public String getId() {
-        String hostName = ((InetSocketAddress)this.address).getHostName();
-        int hostPort = ((InetSocketAddress)this.address).getPort();
-        return "Remote:Mina:Directory:"+hostName+":"+hostPort;
+        String hostName = ((InetSocketAddress) this.address).getHostName();
+        int hostPort = ((InetSocketAddress) this.address).getPort();
+        return "Remote:Mina:Directory:" + hostName + ":" + hostPort;
     }
 
     public int getSessionId() {
         return (int) session.getId();
     }
-    public IoSession getSession(){
+
+    public IoSession getSession() {
         return session;
     }
 
@@ -123,7 +137,7 @@
         return this.connection;
     }
 
-    public ConnectorType getConnectorType()  {
+    public ConnectorType getConnectorType() {
         return ConnectorType.REMOTE;
     }
 
@@ -133,7 +147,7 @@
 
     public NodeConnectionType getNodeConnectionType() throws ConnectorException, RemoteException {
         return new RemoteMinaConnectionDirectory();
-        
+
     }
 
     public void write(Message msg, MessageResponseHandler responseHandler) {
@@ -143,8 +157,4 @@
     public AtomicInteger getCounter() {
         return counter;
     }
-
-    
 }
-
-

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -2,11 +2,13 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.rmi.RemoteException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import org.apache.mina.core.future.CloseFuture;
 import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
@@ -41,27 +43,27 @@
             String providerAddress, Integer providerPort,
             SystemEventListener eventListener) {
 
-        SocketConnector minaconnector = new NioSocketConnector();
-        minaconnector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+
         if (name == null) {
             throw new IllegalArgumentException("Name can not be null");
         }
         this.name = name;
         this.counter = new AtomicInteger();
         this.address = new InetSocketAddress(providerAddress, providerPort);
-        this.connector = minaconnector;
+
         this.eventListener = eventListener;
         this.connection = new GridConnection();
     }
 
     public void connect() throws ConnectorException {
         if (session != null && session.isConnected()) {
-            return;
-            //throw new IllegalStateException("Already connected. Disconnect first.");
+            throw new IllegalStateException("Already connected. Disconnect first.");
         }
 
         try {
-            this.connector.getFilterChain().addLast(this.name+"codec",
+            this.connector = new NioSocketConnector();
+            this.connector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+            this.connector.getFilterChain().addLast(this.name + "codec",
                     new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
 
             ConnectFuture future1 = this.connector.connect(this.address);
@@ -74,17 +76,30 @@
             eventListener.info("connected : " + address);
             this.session = future1.getSession();
         } catch (Exception e) {
-             throw new ConnectorException(e);
+            throw new ConnectorException(e);
         }
     }
 
     public void disconnect() throws ConnectorException {
-        this.connector.getFilterChain().clear();
+        
         if (session != null && session.isConnected()) {
+            
+            CloseFuture future = session.getCloseFuture();
+
+            future.addListener((IoFutureListener<?>) new IoFutureListener<IoFuture>() {
+
+                public void operationComplete(IoFuture future) {
+                    System.out.println("The remote node session is now closed");
+                }
+            });
+
             session.close(false);
-            session.getCloseFuture().join();
+            future.awaitUninterruptibly();
+
+            connector.dispose();
         }
-        //this.connector.dispose();
+
+        
     }
 
     private void addResponseHandler(int id,
@@ -125,13 +140,11 @@
         return "Remote:Mina:Node:" + hostName + ":" + hostPort;
     }
 
-
     public GenericConnection getConnection() {
         return this.connection;
     }
 
-
-    public NodeConnectionType getNodeConnectionType() throws ConnectorException{
+    public NodeConnectionType getNodeConnectionType() throws ConnectorException {
         return new RemoteConnectionNode();
     }
 

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -1,8 +1,8 @@
 package org.drools.grid.services;
 
 import java.rmi.RemoteException;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -23,31 +23,72 @@
 import org.drools.grid.services.strategies.TaskServerInstanceSelectionStrategy;
 
 /**
- * @author salaboy  
+ * @author salaboy 
+ *
+ * This class will represent a Grid Topology where we can execute and manager our
+ * knowledge sessions.
+ * A Grid Topology can be conformed using different types of nodes/resources depending on
+ * your business requirements and architecture.
+ * The grid topology can contains a set of the following type of nodes:
+ *   - Execution Environments: we will use them to host knowledge sessions (runtime)
+ *   - Directory Instances: we will use them to keep track of the current nodes in our topology
+ *   - Task Servers Instance: we will use them to execute and manage Human Tasks for business processes
+ *
+ * From the user perspective the GridTopology will be normally constructed with a HelperClass called: GridTopologyFactory
+ * This helper class will be in charge of reading the GridTopology configuration and based on that
+ * create the GridTopology object with all the nodes registered.
+ *
+ * It's important to understand that the GridTopologyConfiguration will represent a static description/configuration of a
+ * runtime environment. Based on this static description/configuration the GridTopology object will represent a
+ * live status of this runtime environment.
+ *
+ *
+ *
  */
 public class GridTopology {
 
     private String topologyName;
-    private Map<String, ExecutionEnvironment> executionEnvironments = new HashMap<String, ExecutionEnvironment>();
-    private Map<String, String> executionEnvironmentsByConnectorId = new HashMap<String, String>();
-    private Map<String, DirectoryInstance> directories = new HashMap<String, DirectoryInstance>();
-    private Map<String, TaskServerInstance> taskServerInstance = new HashMap<String, TaskServerInstance>();
-
+    private Map<String, ExecutionEnvironment> executionEnvironments = new ConcurrentHashMap<String, ExecutionEnvironment>();
+    private Map<String, String> executionEnvironmentsByConnectorId = new ConcurrentHashMap<String, String>();
+    private Map<String, DirectoryInstance> directoryInstances = new ConcurrentHashMap<String, DirectoryInstance>();
+    private Map<String, TaskServerInstance> taskServerInstances = new ConcurrentHashMap<String, TaskServerInstance>();
     private final ExecutionEnvironmentSelectionStrategy DEFAULT_EXECTUTION_STRATEGY = new ExecutionEnvByPrioritySelectionStrategy();
     private final DirectoryInstanceSelectionStrategy DEFAULT_DIRECTORY_STRATEGY = new DirectoryInstanceByPrioritySelectionStrategy();
     private final TaskServerInstanceSelectionStrategy DEFAULT_TASK_STRATEGY = new TaskServerInstanceByPrioritySelectionStrategy();
-   
 
+
+    /*
+     * Create a new Grid Topology
+     * @param topologyName
+     */
     public GridTopology(String topologyName) {
         this.topologyName = topologyName;
 
     }
 
+    /*
+     * Get the GridTopology name
+     */
     public String getTopologyName() {
         return topologyName;
     }
 
-    //Execution Environments Methods
+    /*
+     * This method will register a new Execution Environment based on the configured Provider.
+     * The provider will contain all the information to be able to establish a connection with it.
+     * The following steps are executed inside this method:
+     *  1) Create the new ExecutionEnvironment object that will represent a remote host for our knowledge sessions.
+     *  2) Each ExecutionEnvironment will have an underlaying connection to support remote/distribtued interactions
+     *  3) for each Execution Environment registered in this topology
+     *     3.1) We need to inject the reference from the newly created Execution Enviroment to the existing ones
+     *     3.2) We need to inject the reference from all the existing Execution Environments to the newly created
+     *  4) for each Directory Instance registered in this topology
+     *     4.1) We need to inject the reference from the newly created Execution Environment in each exisiting Directory
+     *     4.2) We need to inject a reference from each existing directory to the newly created Execution Environment
+     *  5) Add the newly created Execution Environment to the topology maps. We keep to maps for Execution Environments
+     *     to be able to look based on the underlaying connector and based on the defined Execution Environment name.
+     *  6) Register the Execution Environment inside all the currently available Directory Instances
+     */
     public void registerExecutionEnvironment(String name, GenericProvider provider) {
         //Create the executionEnvironment using the provider
 
@@ -65,13 +106,14 @@
             e.getConnector().getConnection().addExecutionNode(connector);
         }
         //We need to add all the other directory connectors inside this connection
-        for (DirectoryInstance d : directories.values()) {
+        for (DirectoryInstance d : directoryInstances.values()) {
             connection.addDirectoryNode(d.getConnector());
             //I need to add this execution node to all the other DI
             d.getConnector().getConnection().addExecutionNode(connector);
         }
         //Adding the env to the local cache
         executionEnvironments.put(name, environment);
+
         try {
             executionEnvironmentsByConnectorId.put(connector.getId(), environment.getName());
         } catch (ConnectorException ex) {
@@ -80,28 +122,59 @@
             Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
         }
         //Register all the Execution Environments into the current directories
-        registerResourceInCurrentDirectories(name, provider.getId());
+        registerResourceInDirectories(name, provider.getId());
 
     }
 
+    /*
+     * This method unregister the Execution Environment from this running instance of the grid topology
+     * based on the name. The following steps are executed in order to unregister
+     * an ExecutionEnvironment from the GridTopology:
+     *  1) Get the ExecutionEnvironment from the executionEnvironmentMap
+     *  2) Get the ExecutionEnvironment connector
+     *     2.1) Remove the ExecutionEnvironment from the executionEnvironmentByConnectorId map
+     *     2.2) Disconnect the connector
+     *  3) Remove the executionEnvironment from the executionEnvironmentMap
+     *  4) Unregister the ExecutionEnvironment from the running Directory Instances
+     *
+     */
     public void unregisterExecutionEnvironment(String name) {
+        ExecutionEnvironment ee = executionEnvironments.get(name);
+        try {
+
+            GenericNodeConnector connector = ee.getConnector();
+            connector.getConnection().dispose();
+            executionEnvironmentsByConnectorId.remove(connector.getId());
+            connector.disconnect();
+        } catch (ConnectorException ex) {
+            Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+        } catch (RemoteException ex) {
+            Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+        }
+
+
         //Remove Execution Environment
         executionEnvironments.remove(name);
         //UnRegister EE from current Directories
-        unregisterResourceInCurrentDirectories(name);
+        unregisterResourceFromDirectories(name);
 
-
     }
-
+    /*
+     * Get Execution Environment by Name
+     * @param name: String Execution Environment Name
+     */
     public ExecutionEnvironment getExecutionEnvironment(String name) {
         return executionEnvironments.get(name);
     }
 
-    public ExecutionEnvironment getExecutionEnvironment(GenericNodeConnector connector){
+    /*
+     * Get ExecutionEnvironment by connector
+     */
+    public ExecutionEnvironment getExecutionEnvironment(GenericNodeConnector connector) {
         ExecutionEnvironment ee = null;
         try {
-             String eeName = executionEnvironmentsByConnectorId.get(connector.getId());
-             ee = executionEnvironments.get(eeName);
+            String eeName = executionEnvironmentsByConnectorId.get(connector.getId());
+            ee = executionEnvironments.get(eeName);
         } catch (ConnectorException ex) {
             Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
         } catch (RemoteException ex) {
@@ -110,122 +183,201 @@
         return ee;
     }
 
+    /*
+     * Get the Best ExecutionEnvironment available based on a ExecutionEnvironmentSelectionStrategy
+     * @param strategy: it's an implementation of the ExecutionEnvironmentSelectionStrategy interface
+     */
     public ExecutionEnvironment getBestExecutionEnvironment(ExecutionEnvironmentSelectionStrategy strategy) {
         return strategy.getBestExecutionEnvironment(executionEnvironments);
     }
 
+    /*
+     * Get the Best ExecutionEnvironment available based on the default ExecutionEnvironmentSelectionStrategy
+     */
     public ExecutionEnvironment getExecutionEnvironment() {
         return DEFAULT_EXECTUTION_STRATEGY.getBestExecutionEnvironment(executionEnvironments);
     }
 
-    // DirectoryInstances Methods
+    /*
+     * This method register a new Directory instance based on the information provided by the GenericProvider
+     * The provider will contain all the information to be able to establish a connection with it.
+     * The following steps are executed inside this method:
+     *  1) Create the new DirectoryInstance object that will represent map that will store information about
+     *     the resources that are currently running in our topology.
+     *  2) Each DirectoryInstance will have an underlaying connection to support remote/distribtued interactions
+     *  3) for each Execution Environment registered in this topology
+     *     3.1) We need to inject the reference from the newly created Directory to the existing ExecutionEnvironments
+     *     3.2) We need to inject the reference from all the existing Execution Environments to the newly created Directory Instance
+     *  4) for each Directory Instance registered in this topology
+     *     4.1) We need to inject the reference from the newly created Directory Instance in each exisiting Directory
+     *     4.2) We need to inject a reference from each existing directory to the newly created DirectoryInstance
+     *  5) Add the newly created Directory Instance to the topology map. We keep a map for Directory Instances
+     *     to be able to lookup based on the Directory Instance name.
+     *  6) Register the Directory Instance inside all the currently available Directory Instances
+     */
     public void registerDirectoryInstance(String name, GenericProvider provider) {
 
         DirectoryInstance directory = DirectoryInstanceFactory.newDirectoryInstance(name, provider);
-
         GenericNodeConnector connector = directory.getConnector();
         GenericConnection connection = connector.getConnection();
-
-
         connection.addDirectoryNode(connector);
 
         for (ExecutionEnvironment e : executionEnvironments.values()) {
+            e.getConnector().getConnection().addDirectoryNode(connector);
             connection.addExecutionNode(e.getConnector());
-            //I need to add this directory instance to all the other EE
-            e.getConnector().getConnection().addDirectoryNode(connector);
         }
-        for (DirectoryInstance d : directories.values()) {
+        for (DirectoryInstance d : directoryInstances.values()) {
             connection.addDirectoryNode(d.getConnector());
-            //I need to add this directory Instance to all the other DI
             d.getConnector().getConnection().addDirectoryNode(connector);
         }
 
+
         
-        directories.put(name, directory);
-        registerResourceInCurrentDirectories(name, provider.getId());
 
+        directoryInstances.put(name, directory);
+
+        registerResourceInDirectories(name, provider.getId());
+        
     }
-
+    /*
+     * Get the Best DirectoryInstance based on a DirectoryInstanceSelectionStrategy
+     * @param strategy it's the strategy used to choose the best DirectoryInstance available
+     */
     public DirectoryInstance getBestDirectoryInstance(DirectoryInstanceSelectionStrategy strategy) {
-        return (DirectoryInstance) strategy.getBestDirectoryInstance(directories);
+        return (DirectoryInstance) strategy.getBestDirectoryInstance(directoryInstances);
     }
 
+    /*
+     * Get the Directory Instance based on a default strategy
+     */
     public DirectoryInstance getDirectoryInstance() {
-        return DEFAULT_DIRECTORY_STRATEGY.getBestDirectoryInstance(directories);
+        return DEFAULT_DIRECTORY_STRATEGY.getBestDirectoryInstance(directoryInstances);
     }
 
+    /*
+     * Get a Directory Instance by Name
+     */
     public DirectoryInstance getDirectoryInstance(String name) {
-        return directories.get(name);
+        return directoryInstances.get(name);
     }
 
-     public void unregisterDirectoryInstance(String name) {
-        //Remove Directory Instance
-        directories.remove(name);
-        //UnRegister Directory from current Directories
-        unregisterResourceInCurrentDirectories(name);
+    /*
+     * Unregister a Directroy Instance from this running GridTopology
+     * This method unregister the Directory Instance from this running instance of the grid topology
+     * based on the name. The following steps are executed in order to unregister
+     * a DirectoryInstance from the GridTopology:
+     *  1) Get the Directory Instance from the directoryInstances Map
+     *  2) Get the DirectoryInstance connector
+     *     2.1) Disconnect the connector
+     *  3) Unregister the DirectoryInstance from the running Directory Instances
+     *  4) Remove the DirectoryInstance from the directoryInstances Map
+     */
+    public void unregisterDirectoryInstance(String name) {
+        DirectoryInstance dir = directoryInstances.get(name);
+        GenericNodeConnector connector = dir.getConnector();
+        
+        try {
+            connector.getConnection().dispose();
+            connector.disconnect();
+        } catch (ConnectorException ex) {
+            Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+        } catch (RemoteException ex) {
+            Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+        }
 
+        unregisterResourceFromDirectories(name);
 
+        directoryInstances.remove(name);
+
     }
 
-
-    // Task Server Instance Methods
-
+     /*
+     * This method will register a new Task Server  based on the configured Provider.
+     * The provider will contain all the information to be able to establish a connection with the Task Server.
+     * The following steps are executed inside this method:
+     *  1) Create the new ExecutionEnvironment object that will represent a remote host for our knowledge sessions.
+     *  2) Each ExecutionEnvironment will have an underlaying connection to support remote/distribtued interactions
+     *  3) for each Execution Environment registered in this topology
+     *     3.1) We need to inject the reference from the newly created Execution Enviroment to the existing ones
+     *     3.2) We need to inject the reference from all the existing Execution Environments to the newly created
+     *  4) for each Directory Instance registered in this topology
+     *     4.1) We need to inject the reference from the newly created Execution Environment in each exisiting Directory
+     *     4.2) We need to inject a reference from each existing directory to the newly created Execution Environment
+     *  5) Add the newly created Execution Environment to the topology maps. We keep to maps for Execution Environments
+     *     to be able to look based on the underlaying connector and based on the defined Execution Environment name.
+     *  6) Register the Execution Environment inside all the currently available Directory Instances
+     */
     public void registerTaskServerInstance(String name, GenericProvider provider) {
 
         TaskServerInstance taskServer = TaskServerInstanceFactory.newTaskServerInstance(name, provider);
         GenericHumanTaskConnector connector = taskServer.getConnector();
         GenericConnection connection = connector.getConnection();
         connection.addHumanTaskNode(connector);
-        
-        taskServerInstance.put(name, taskServer);
-        registerResourceInCurrentDirectories(name, provider.getId());
 
+        taskServerInstances.put(name, taskServer);
+        registerResourceInDirectories(name, provider.getId());
+
     }
 
-    
-   
-
     public TaskServerInstance getTaskServerInstance(String name) {
-        return taskServerInstance.get(name);
+        return taskServerInstances.get(name);
     }
 
     public TaskServerInstance getBestTaskServerInstance(TaskServerInstanceSelectionStrategy strategy) {
-        return (TaskServerInstance) strategy.getBestTaskServerInstance(taskServerInstance);
+        return (TaskServerInstance) strategy.getBestTaskServerInstance(taskServerInstances);
     }
 
     public TaskServerInstance getBestTaskServerInstance() {
-        return DEFAULT_TASK_STRATEGY.getBestTaskServerInstance(taskServerInstance);
+        return DEFAULT_TASK_STRATEGY.getBestTaskServerInstance(taskServerInstances);
     }
 
     public void unregisterTaskServerInstance(String name) {
         //Remove Task Server Instance
-        taskServerInstance.remove(name);
+        taskServerInstances.remove(name);
         //UnRegister task Server instance from current Directories
-        unregisterResourceInCurrentDirectories(name);
+        unregisterResourceFromDirectories(name);
 
 
     }
 
-    
+    public void disconnect() throws ConnectorException, RemoteException {
 
+        for (String key : executionEnvironments.keySet()) {
+            ExecutionEnvironment ee = executionEnvironments.get(key);
+            GenericNodeConnector connector = ee.getConnector();
+            connector.getConnection().dispose();
+            connector.disconnect();
+        }
+        for (String key : directoryInstances.keySet()) {
+            DirectoryInstance dir = directoryInstances.get(key);
+            GenericNodeConnector connector = dir.getConnector();
+            connector.getConnection().dispose();
+            connector.disconnect();
+        }
+        for (String key : taskServerInstances.keySet()) {
+            TaskServerInstance taskServer = taskServerInstances.get(key);
+            GenericHumanTaskConnector connector = taskServer.getConnector();
+            connector.getConnection().dispose();
+            connector.disconnect();
+        }
+    }
+
+
     public void dispose() throws ConnectorException, RemoteException {
-        
+
         for (String key : executionEnvironments.keySet()) {
-            executionEnvironments.get(key).getConnector().disconnect();
-            executionEnvironments.get(key).getConnector().getConnection().dispose();
+            unregisterExecutionEnvironment(key);
         }
-        for (String key : directories.keySet()) {
-            directories.get(key).getConnector().disconnect();
-            directories.get(key).getConnector().getConnection().dispose();
+        for (String key : directoryInstances.keySet()) {
+            unregisterDirectoryInstance(key);
         }
-        for (String key : taskServerInstance.keySet()) {
-            taskServerInstance.get(key).getConnector().disconnect();
-            taskServerInstance.get(key).getConnector().getConnection().dispose();
+        for (String key : taskServerInstances.keySet()) {
+            unregisterTaskServerInstance(key);
         }
     }
 
-    private void registerResourceInCurrentDirectories(String name, String resourceId) {
-        for (DirectoryInstance directory : directories.values()) {
+    private void registerResourceInDirectories(String name, String resourceId) {
+        for (DirectoryInstance directory : directoryInstances.values()) {
 
             try {
                 DirectoryNodeService directoryNode = directory.getDirectoryService().get(DirectoryNodeService.class);
@@ -247,8 +399,8 @@
         }
     }
 
-    private void unregisterResourceInCurrentDirectories(String name) {
-        for (DirectoryInstance directory : directories.values()) {
+    private void unregisterResourceFromDirectories(String name) {
+        for (DirectoryInstance directory : directoryInstances.values()) {
 
             try {
                 DirectoryNodeService directoryNode = directory.getDirectoryService().get(DirectoryNodeService.class);

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -23,6 +23,7 @@
 import org.drools.builder.KnowledgeBuilderFactoryService;
 import org.drools.builder.ResourceType;
 import org.drools.grid.ConnectorException;
+import org.drools.grid.DirectoryNode;
 import org.drools.grid.DirectoryNodeService;
 import org.drools.grid.ExecutionNode;
 import org.drools.grid.services.DirectoryInstance;
@@ -30,10 +31,8 @@
 import org.drools.grid.services.GridTopology;
 import org.drools.grid.services.configuration.DirectoryInstanceConfiguration;
 import org.drools.grid.services.configuration.ExecutionEnvironmentConfiguration;
-import org.drools.grid.services.configuration.GenericProvider;
 import org.drools.grid.services.configuration.GridTopologyConfiguration;
 import org.drools.grid.services.configuration.LocalProvider;
-import org.drools.grid.services.configuration.MinaProvider;
 import org.drools.grid.services.factory.GridTopologyFactory;
 import org.drools.grid.services.strategies.DirectoryInstanceByPrioritySelectionStrategy;
 import org.drools.grid.services.strategies.ExecutionEnvByPrioritySelectionStrategy;
@@ -87,9 +86,10 @@
         Assert.assertNotNull("Directory Instance null", directory);
 
         DirectoryNodeService dir = directory.getDirectoryService().get(DirectoryNodeService.class);
+        directory.getConnector().disconnect();
 
         Assert.assertNotNull("Dir Null", dir);
-
+        System.out.println("Dir = " + dir.getExecutorsMap());
         Assert.assertEquals(2, dir.getExecutorsMap().size());
 
         grid.dispose();
@@ -212,7 +212,7 @@
 
         kbase = dirService.lookupKBase("DoctorsKBase");
         Assert.assertNotNull(kbase);
-
+        directory.getConnector().disconnect();
         grid.dispose();
 
 

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -20,6 +20,7 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.rmi.RemoteException;
+import java.util.Map;
 import org.apache.mina.transport.socket.SocketAcceptor;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.drools.KnowledgeBase;
@@ -93,7 +94,7 @@
         this.serverDir = new MinaAcceptor(dirAcceptor, dirAddress);
         this.serverDir.start();
         System.out.println("Dir Server 1 Started! at = " + dirAddress.toString());
-        Thread.sleep(5000);
+
         // End Execution Server
 
         //Execution Node related stuff
@@ -111,31 +112,28 @@
         serverNode.start();
         System.out.println("Exec Server 1 Started! at = " + address.toString());
 
-        Thread.sleep(5000);
+        
 
     }
 
     @After
     public void tearDown() throws InterruptedException, ConnectorException, RemoteException {
 
-        grid.dispose();
-
-        Thread.sleep(5000);
-
-        Assert.assertEquals(0, serverDir.getCurrentSessions());
-        serverDir.stop();
-        System.out.println("Dir Server Stopped!");
         
+        
         Assert.assertEquals(0, serverNode.getCurrentSessions());
         serverNode.stop();
         System.out.println("Execution Server Stopped!");
 
+        Assert.assertEquals(0, serverDir.getCurrentSessions());
+        serverDir.stop();
+        System.out.println("Dir Server Stopped!");
 
+
     }
 
     @Test
      public void directoryRemoteTest() throws ConnectorException, RemoteException {
-
         GridTopologyConfiguration gridTopologyConfiguration = new GridTopologyConfiguration("MyTopology");
         gridTopologyConfiguration.addDirectoryInstance(new DirectoryInstanceConfiguration("MyMinaDir", new MinaProvider("127.0.0.1", 9123)));
         gridTopologyConfiguration.addExecutionEnvironment(new ExecutionEnvironmentConfiguration("MyLocalEnv", new LocalProvider()));
@@ -144,7 +142,6 @@
 
         grid = GridTopologyFactory.build(gridTopologyConfiguration);
 
-
         Assert.assertNotNull(grid);
        
 
@@ -154,24 +151,24 @@
 
         DirectoryNodeService dir = directory.getDirectoryService().get(DirectoryNodeService.class);
         Assert.assertNotNull(dir);
+        Map<String, String> dirMap = dir.getExecutorsMap();
+        
+        directory.getConnector().disconnect();
 
-        Assert.assertNotNull("Dir Null", dir.getExecutorsMap());
 
-        Assert.assertEquals(3, dir.getExecutorsMap().size());
+        Assert.assertNotNull("Dir Null", dirMap);
 
-        System.out.println("dir.getDirectoryMap() = "+dir.getExecutorsMap());
+        Assert.assertEquals(3, dirMap.size());
 
-        Assert.assertEquals(3, dir.getExecutorsMap().size());
+        System.out.println("dir.getDirectoryMap() = "+dirMap);
 
-        directory.getConnector().disconnect();
+        Assert.assertEquals(3, dirMap.size());
+        Assert.assertEquals(0, serverNode.getCurrentSessions());
+        //Then we can get the registered Execution Environments by Name
 
-
-
-                //Then we can get the registered Execution Environments by Name
-
         ExecutionEnvironment ee = grid.getExecutionEnvironment("MyRemoteEnv");
         Assert.assertNotNull(ee);
-
+        
         // Give me an ExecutionNode in the selected environment
         // For the Mina we have just one Execution Node per server instance
         ExecutionNode node = ee.getExecutionNode();
@@ -220,12 +217,15 @@
         ksession = (StatefulKnowledgeSession) node.get(DirectoryLookupFactoryService.class).lookup("ksession1");
 
         int fired = ksession.fireAllRules();
+
         Assert.assertEquals(2, fired);
 
+        
 
+        grid.dispose();
 
+        
 
-
      }
 
      @Test
@@ -293,13 +293,16 @@
         Assert.assertNotNull(ksession);
 
         node.get(DirectoryLookupFactoryService.class).register("sessionName", ksession);
+       
+        grid.disconnect();
+
         DirectoryInstance directoryInstance = grid.getDirectoryInstance();
         DirectoryNodeService directory = directoryInstance.getDirectoryService().get(DirectoryNodeService.class);
         GenericNodeConnector connector = directory.lookup("sessionName");
         
         directoryInstance.getConnector().disconnect();
 
-        grid.dispose();
+        
         //System.out.println("Connector -->"+connector.getId());
 
        node = grid.getExecutionEnvironment(connector).getExecutionNode();
@@ -307,6 +310,8 @@
        ksession = (StatefulKnowledgeSession) node.get(DirectoryLookupFactoryService.class).lookup("sessionName");
        Assert.assertNotNull(ksession);
 
+       grid.dispose();
+
     }
 
 

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -194,13 +194,17 @@
 
 
         handler.dispose();
+
         Assert.assertEquals(0, serverNode.getCurrentSessions());
         serverNode.stop();
         System.out.println("Execution Server Stopped!");
+
         Assert.assertEquals(0, serverTask.getCurrentSessions());
         serverTask.stop();
         System.out.println("Task Server Stopped!");
+        
 
+
         taskSession.dispose();
         emf.close();
 
@@ -296,7 +300,7 @@
         Assert.assertTrue(manager.waitTillCompleted(DEFAULT_WAIT_TIME));
         Thread.sleep(500);
 
-
+        taskServer.getConnector().disconnect();
     }
 
     private Object eval(Reader reader, Map<String, Object> vars) {

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -2,11 +2,13 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.rmi.RemoteException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import org.apache.mina.core.future.CloseFuture;
 import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
 import org.apache.mina.core.session.IoSession;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
@@ -57,8 +59,8 @@
 
     public void connect() throws ConnectorException {
         if (session != null && session.isConnected()) {
-            //throw new IllegalStateException("Already connected. Disconnect first.");
-            return;
+            throw new IllegalStateException("Already connected. Disconnect first.");
+            
         }
 
         try {
@@ -82,9 +84,22 @@
     public void disconnect() throws ConnectorException {
 
         if (session != null && session.isConnected()) {
-            session.close();
-            session.getCloseFuture().join();
+
+            CloseFuture future = session.getCloseFuture();
+
+            future.addListener((IoFutureListener<?>) new IoFutureListener<IoFuture>() {
+
+                public void operationComplete(IoFuture future) {
+                    System.out.println("The human task connector session is now closed");
+                }
+            });
+
+            session.close(false);
+            future.awaitUninterruptibly();
+
+            connector.dispose();
         }
+
     }
 
     private void addResponseHandler(int id,

Modified: labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java	2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java	2010-08-08 17:06:46 UTC (rev 34589)
@@ -151,7 +151,6 @@
                 9124,
                 SystemEventListenerFactory.getSystemEventListener());
 
-         htMinaClient.connect();
          
 
         // setup RemoteService client



More information about the jboss-svn-commits mailing list