[jboss-svn-commits] JBL Code SVN: r34589 - in labs/jbossrules/trunk/drools-grid: drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina and 4 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Sun Aug 8 13:06:48 EDT 2010
Author: salaboy21
Date: 2010-08-08 13:06:46 -0400 (Sun, 08 Aug 2010)
New Revision: 34589
Modified:
labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java
labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java
labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java
labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java
labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java
labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java
labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java
labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java
Log:
JBRULES-2615: Drools Grid Services remove unnessary dependencies
- now the test should pass always. Adding some comments + documentation. Now I need to clean the solution
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/DirectoryNodeRemoteClient.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -21,14 +21,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.drools.KnowledgeBase;
import org.drools.grid.ConnectorException;
-import org.drools.grid.ConnectorType;
import org.drools.grid.DirectoryNodeService;
-import org.drools.grid.GenericConnection;
import org.drools.grid.GenericNodeConnector;
import org.drools.grid.NodeConnectionType;
import org.drools.grid.internal.Message;
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-remote-dir-mina/src/main/java/org/drools/grid/remote/directory/RemoteMinaDirectoryConnector.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -14,7 +14,6 @@
* limitations under the License.
* under the License.
*/
-
package org.drools.grid.remote.directory;
import java.net.InetSocketAddress;
@@ -24,7 +23,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
@@ -46,7 +48,8 @@
*
* @author salaboy
*/
-public class RemoteMinaDirectoryConnector implements GenericNodeConnector{
+public class RemoteMinaDirectoryConnector implements GenericNodeConnector {
+
private GridConnection connection;
private String providerName;
private SystemEventListener eventListener;
@@ -54,13 +57,11 @@
private AtomicInteger counter;
private SocketConnector connector;
private SocketAddress address;
-
public RemoteMinaDirectoryConnector(String providerName, String providerAddress,
- Integer providerPort, SystemEventListener systemEventListener) {
- SocketConnector minaconnector = new NioSocketConnector();
- minaconnector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+ Integer providerPort, SystemEventListener systemEventListener) {
+
if (providerName == null) {
throw new IllegalArgumentException("Name can not be null");
}
@@ -68,19 +69,19 @@
this.providerName = providerName;
this.eventListener = systemEventListener;
this.address = new InetSocketAddress(providerAddress, providerPort);
- this.connector = minaconnector;
this.connection = new GridConnection();
}
-
public void connect() throws ConnectorException {
if (session != null && session.isConnected()) {
throw new IllegalStateException("Already connected. Disconnect first.");
}
try {
- this.connector.getFilterChain().addLast("codec"+UUID.randomUUID().toString(),
+ this.connector = new NioSocketConnector();
+ this.connector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+ this.connector.getFilterChain().addLast("codec" + UUID.randomUUID().toString(),
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
ConnectFuture future1 = this.connector.connect(this.address);
@@ -92,7 +93,7 @@
}
eventListener.info("connected : " + address);
this.session = future1.getSession();
-
+
} catch (Exception e) {
throw new ConnectorException(e);
}
@@ -101,21 +102,34 @@
public void disconnect() throws ConnectorException {
if (session != null && session.isConnected()) {
- session.close();
- session.getCloseFuture().join();
+
+ CloseFuture future = session.getCloseFuture();
+ future.addListener((IoFutureListener<?>) new IoFutureListener<IoFuture>() {
+
+ public void operationComplete(IoFuture future) {
+ System.out.println("The remote directory session is now closed");
+ }
+ });
+ session.close(false);
+
+ future.awaitUninterruptibly();
+
+ connector.dispose();
}
+
}
public String getId() {
- String hostName = ((InetSocketAddress)this.address).getHostName();
- int hostPort = ((InetSocketAddress)this.address).getPort();
- return "Remote:Mina:Directory:"+hostName+":"+hostPort;
+ String hostName = ((InetSocketAddress) this.address).getHostName();
+ int hostPort = ((InetSocketAddress) this.address).getPort();
+ return "Remote:Mina:Directory:" + hostName + ":" + hostPort;
}
public int getSessionId() {
return (int) session.getId();
}
- public IoSession getSession(){
+
+ public IoSession getSession() {
return session;
}
@@ -123,7 +137,7 @@
return this.connection;
}
- public ConnectorType getConnectorType() {
+ public ConnectorType getConnectorType() {
return ConnectorType.REMOTE;
}
@@ -133,7 +147,7 @@
public NodeConnectionType getNodeConnectionType() throws ConnectorException, RemoteException {
return new RemoteMinaConnectionDirectory();
-
+
}
public void write(Message msg, MessageResponseHandler responseHandler) {
@@ -143,8 +157,4 @@
public AtomicInteger getCounter() {
return counter;
}
-
-
}
-
-
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-remote-mina/src/main/java/org/drools/grid/remote/mina/RemoteMinaNodeConnector.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -2,11 +2,13 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.rmi.RemoteException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
@@ -41,27 +43,27 @@
String providerAddress, Integer providerPort,
SystemEventListener eventListener) {
- SocketConnector minaconnector = new NioSocketConnector();
- minaconnector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+
if (name == null) {
throw new IllegalArgumentException("Name can not be null");
}
this.name = name;
this.counter = new AtomicInteger();
this.address = new InetSocketAddress(providerAddress, providerPort);
- this.connector = minaconnector;
+
this.eventListener = eventListener;
this.connection = new GridConnection();
}
public void connect() throws ConnectorException {
if (session != null && session.isConnected()) {
- return;
- //throw new IllegalStateException("Already connected. Disconnect first.");
+ throw new IllegalStateException("Already connected. Disconnect first.");
}
try {
- this.connector.getFilterChain().addLast(this.name+"codec",
+ this.connector = new NioSocketConnector();
+ this.connector.setHandler(new MinaIoHandler(SystemEventListenerFactory.getSystemEventListener()));
+ this.connector.getFilterChain().addLast(this.name + "codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
ConnectFuture future1 = this.connector.connect(this.address);
@@ -74,17 +76,30 @@
eventListener.info("connected : " + address);
this.session = future1.getSession();
} catch (Exception e) {
- throw new ConnectorException(e);
+ throw new ConnectorException(e);
}
}
public void disconnect() throws ConnectorException {
- this.connector.getFilterChain().clear();
+
if (session != null && session.isConnected()) {
+
+ CloseFuture future = session.getCloseFuture();
+
+ future.addListener((IoFutureListener<?>) new IoFutureListener<IoFuture>() {
+
+ public void operationComplete(IoFuture future) {
+ System.out.println("The remote node session is now closed");
+ }
+ });
+
session.close(false);
- session.getCloseFuture().join();
+ future.awaitUninterruptibly();
+
+ connector.dispose();
}
- //this.connector.dispose();
+
+
}
private void addResponseHandler(int id,
@@ -125,13 +140,11 @@
return "Remote:Mina:Node:" + hostName + ":" + hostPort;
}
-
public GenericConnection getConnection() {
return this.connection;
}
-
- public NodeConnectionType getNodeConnectionType() throws ConnectorException{
+ public NodeConnectionType getNodeConnectionType() throws ConnectorException {
return new RemoteConnectionNode();
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/main/java/org/drools/grid/services/GridTopology.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -1,8 +1,8 @@
package org.drools.grid.services;
import java.rmi.RemoteException;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -23,31 +23,72 @@
import org.drools.grid.services.strategies.TaskServerInstanceSelectionStrategy;
/**
- * @author salaboy
+ * @author salaboy
+ *
+ * This class will represent a Grid Topology where we can execute and manager our
+ * knowledge sessions.
+ * A Grid Topology can be conformed using different types of nodes/resources depending on
+ * your business requirements and architecture.
+ * The grid topology can contains a set of the following type of nodes:
+ * - Execution Environments: we will use them to host knowledge sessions (runtime)
+ * - Directory Instances: we will use them to keep track of the current nodes in our topology
+ * - Task Servers Instance: we will use them to execute and manage Human Tasks for business processes
+ *
+ * From the user perspective the GridTopology will be normally constructed with a HelperClass called: GridTopologyFactory
+ * This helper class will be in charge of reading the GridTopology configuration and based on that
+ * create the GridTopology object with all the nodes registered.
+ *
+ * It's important to understand that the GridTopologyConfiguration will represent a static description/configuration of a
+ * runtime environment. Based on this static description/configuration the GridTopology object will represent a
+ * live status of this runtime environment.
+ *
+ *
+ *
*/
public class GridTopology {
private String topologyName;
- private Map<String, ExecutionEnvironment> executionEnvironments = new HashMap<String, ExecutionEnvironment>();
- private Map<String, String> executionEnvironmentsByConnectorId = new HashMap<String, String>();
- private Map<String, DirectoryInstance> directories = new HashMap<String, DirectoryInstance>();
- private Map<String, TaskServerInstance> taskServerInstance = new HashMap<String, TaskServerInstance>();
-
+ private Map<String, ExecutionEnvironment> executionEnvironments = new ConcurrentHashMap<String, ExecutionEnvironment>();
+ private Map<String, String> executionEnvironmentsByConnectorId = new ConcurrentHashMap<String, String>();
+ private Map<String, DirectoryInstance> directoryInstances = new ConcurrentHashMap<String, DirectoryInstance>();
+ private Map<String, TaskServerInstance> taskServerInstances = new ConcurrentHashMap<String, TaskServerInstance>();
private final ExecutionEnvironmentSelectionStrategy DEFAULT_EXECTUTION_STRATEGY = new ExecutionEnvByPrioritySelectionStrategy();
private final DirectoryInstanceSelectionStrategy DEFAULT_DIRECTORY_STRATEGY = new DirectoryInstanceByPrioritySelectionStrategy();
private final TaskServerInstanceSelectionStrategy DEFAULT_TASK_STRATEGY = new TaskServerInstanceByPrioritySelectionStrategy();
-
+
+ /*
+ * Create a new Grid Topology
+ * @param topologyName
+ */
public GridTopology(String topologyName) {
this.topologyName = topologyName;
}
+ /*
+ * Get the GridTopology name
+ */
public String getTopologyName() {
return topologyName;
}
- //Execution Environments Methods
+ /*
+ * This method will register a new Execution Environment based on the configured Provider.
+ * The provider will contain all the information to be able to establish a connection with it.
+ * The following steps are executed inside this method:
+ * 1) Create the new ExecutionEnvironment object that will represent a remote host for our knowledge sessions.
+ * 2) Each ExecutionEnvironment will have an underlaying connection to support remote/distribtued interactions
+ * 3) for each Execution Environment registered in this topology
+ * 3.1) We need to inject the reference from the newly created Execution Enviroment to the existing ones
+ * 3.2) We need to inject the reference from all the existing Execution Environments to the newly created
+ * 4) for each Directory Instance registered in this topology
+ * 4.1) We need to inject the reference from the newly created Execution Environment in each exisiting Directory
+ * 4.2) We need to inject a reference from each existing directory to the newly created Execution Environment
+ * 5) Add the newly created Execution Environment to the topology maps. We keep to maps for Execution Environments
+ * to be able to look based on the underlaying connector and based on the defined Execution Environment name.
+ * 6) Register the Execution Environment inside all the currently available Directory Instances
+ */
public void registerExecutionEnvironment(String name, GenericProvider provider) {
//Create the executionEnvironment using the provider
@@ -65,13 +106,14 @@
e.getConnector().getConnection().addExecutionNode(connector);
}
//We need to add all the other directory connectors inside this connection
- for (DirectoryInstance d : directories.values()) {
+ for (DirectoryInstance d : directoryInstances.values()) {
connection.addDirectoryNode(d.getConnector());
//I need to add this execution node to all the other DI
d.getConnector().getConnection().addExecutionNode(connector);
}
//Adding the env to the local cache
executionEnvironments.put(name, environment);
+
try {
executionEnvironmentsByConnectorId.put(connector.getId(), environment.getName());
} catch (ConnectorException ex) {
@@ -80,28 +122,59 @@
Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
}
//Register all the Execution Environments into the current directories
- registerResourceInCurrentDirectories(name, provider.getId());
+ registerResourceInDirectories(name, provider.getId());
}
+ /*
+ * This method unregister the Execution Environment from this running instance of the grid topology
+ * based on the name. The following steps are executed in order to unregister
+ * an ExecutionEnvironment from the GridTopology:
+ * 1) Get the ExecutionEnvironment from the executionEnvironmentMap
+ * 2) Get the ExecutionEnvironment connector
+ * 2.1) Remove the ExecutionEnvironment from the executionEnvironmentByConnectorId map
+ * 2.2) Disconnect the connector
+ * 3) Remove the executionEnvironment from the executionEnvironmentMap
+ * 4) Unregister the ExecutionEnvironment from the running Directory Instances
+ *
+ */
public void unregisterExecutionEnvironment(String name) {
+ ExecutionEnvironment ee = executionEnvironments.get(name);
+ try {
+
+ GenericNodeConnector connector = ee.getConnector();
+ connector.getConnection().dispose();
+ executionEnvironmentsByConnectorId.remove(connector.getId());
+ connector.disconnect();
+ } catch (ConnectorException ex) {
+ Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+ } catch (RemoteException ex) {
+ Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+ }
+
+
//Remove Execution Environment
executionEnvironments.remove(name);
//UnRegister EE from current Directories
- unregisterResourceInCurrentDirectories(name);
+ unregisterResourceFromDirectories(name);
-
}
-
+ /*
+ * Get Execution Environment by Name
+ * @param name: String Execution Environment Name
+ */
public ExecutionEnvironment getExecutionEnvironment(String name) {
return executionEnvironments.get(name);
}
- public ExecutionEnvironment getExecutionEnvironment(GenericNodeConnector connector){
+ /*
+ * Get ExecutionEnvironment by connector
+ */
+ public ExecutionEnvironment getExecutionEnvironment(GenericNodeConnector connector) {
ExecutionEnvironment ee = null;
try {
- String eeName = executionEnvironmentsByConnectorId.get(connector.getId());
- ee = executionEnvironments.get(eeName);
+ String eeName = executionEnvironmentsByConnectorId.get(connector.getId());
+ ee = executionEnvironments.get(eeName);
} catch (ConnectorException ex) {
Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
} catch (RemoteException ex) {
@@ -110,122 +183,201 @@
return ee;
}
+ /*
+ * Get the Best ExecutionEnvironment available based on a ExecutionEnvironmentSelectionStrategy
+ * @param strategy: it's an implementation of the ExecutionEnvironmentSelectionStrategy interface
+ */
public ExecutionEnvironment getBestExecutionEnvironment(ExecutionEnvironmentSelectionStrategy strategy) {
return strategy.getBestExecutionEnvironment(executionEnvironments);
}
+ /*
+ * Get the Best ExecutionEnvironment available based on the default ExecutionEnvironmentSelectionStrategy
+ */
public ExecutionEnvironment getExecutionEnvironment() {
return DEFAULT_EXECTUTION_STRATEGY.getBestExecutionEnvironment(executionEnvironments);
}
- // DirectoryInstances Methods
+ /*
+ * This method register a new Directory instance based on the information provided by the GenericProvider
+ * The provider will contain all the information to be able to establish a connection with it.
+ * The following steps are executed inside this method:
+ * 1) Create the new DirectoryInstance object that will represent map that will store information about
+ * the resources that are currently running in our topology.
+ * 2) Each DirectoryInstance will have an underlaying connection to support remote/distribtued interactions
+ * 3) for each Execution Environment registered in this topology
+ * 3.1) We need to inject the reference from the newly created Directory to the existing ExecutionEnvironments
+ * 3.2) We need to inject the reference from all the existing Execution Environments to the newly created Directory Instance
+ * 4) for each Directory Instance registered in this topology
+ * 4.1) We need to inject the reference from the newly created Directory Instance in each exisiting Directory
+ * 4.2) We need to inject a reference from each existing directory to the newly created DirectoryInstance
+ * 5) Add the newly created Directory Instance to the topology map. We keep a map for Directory Instances
+ * to be able to lookup based on the Directory Instance name.
+ * 6) Register the Directory Instance inside all the currently available Directory Instances
+ */
public void registerDirectoryInstance(String name, GenericProvider provider) {
DirectoryInstance directory = DirectoryInstanceFactory.newDirectoryInstance(name, provider);
-
GenericNodeConnector connector = directory.getConnector();
GenericConnection connection = connector.getConnection();
-
-
connection.addDirectoryNode(connector);
for (ExecutionEnvironment e : executionEnvironments.values()) {
+ e.getConnector().getConnection().addDirectoryNode(connector);
connection.addExecutionNode(e.getConnector());
- //I need to add this directory instance to all the other EE
- e.getConnector().getConnection().addDirectoryNode(connector);
}
- for (DirectoryInstance d : directories.values()) {
+ for (DirectoryInstance d : directoryInstances.values()) {
connection.addDirectoryNode(d.getConnector());
- //I need to add this directory Instance to all the other DI
d.getConnector().getConnection().addDirectoryNode(connector);
}
+
- directories.put(name, directory);
- registerResourceInCurrentDirectories(name, provider.getId());
+ directoryInstances.put(name, directory);
+
+ registerResourceInDirectories(name, provider.getId());
+
}
-
+ /*
+ * Get the Best DirectoryInstance based on a DirectoryInstanceSelectionStrategy
+ * @param strategy it's the strategy used to choose the best DirectoryInstance available
+ */
public DirectoryInstance getBestDirectoryInstance(DirectoryInstanceSelectionStrategy strategy) {
- return (DirectoryInstance) strategy.getBestDirectoryInstance(directories);
+ return (DirectoryInstance) strategy.getBestDirectoryInstance(directoryInstances);
}
+ /*
+ * Get the Directory Instance based on a default strategy
+ */
public DirectoryInstance getDirectoryInstance() {
- return DEFAULT_DIRECTORY_STRATEGY.getBestDirectoryInstance(directories);
+ return DEFAULT_DIRECTORY_STRATEGY.getBestDirectoryInstance(directoryInstances);
}
+ /*
+ * Get a Directory Instance by Name
+ */
public DirectoryInstance getDirectoryInstance(String name) {
- return directories.get(name);
+ return directoryInstances.get(name);
}
- public void unregisterDirectoryInstance(String name) {
- //Remove Directory Instance
- directories.remove(name);
- //UnRegister Directory from current Directories
- unregisterResourceInCurrentDirectories(name);
+ /*
+ * Unregister a Directroy Instance from this running GridTopology
+ * This method unregister the Directory Instance from this running instance of the grid topology
+ * based on the name. The following steps are executed in order to unregister
+ * a DirectoryInstance from the GridTopology:
+ * 1) Get the Directory Instance from the directoryInstances Map
+ * 2) Get the DirectoryInstance connector
+ * 2.1) Disconnect the connector
+ * 3) Unregister the DirectoryInstance from the running Directory Instances
+ * 4) Remove the DirectoryInstance from the directoryInstances Map
+ */
+ public void unregisterDirectoryInstance(String name) {
+ DirectoryInstance dir = directoryInstances.get(name);
+ GenericNodeConnector connector = dir.getConnector();
+
+ try {
+ connector.getConnection().dispose();
+ connector.disconnect();
+ } catch (ConnectorException ex) {
+ Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+ } catch (RemoteException ex) {
+ Logger.getLogger(GridTopology.class.getName()).log(Level.SEVERE, null, ex);
+ }
+ unregisterResourceFromDirectories(name);
+ directoryInstances.remove(name);
+
}
-
- // Task Server Instance Methods
-
+ /*
+ * This method will register a new Task Server based on the configured Provider.
+ * The provider will contain all the information to be able to establish a connection with the Task Server.
+ * The following steps are executed inside this method:
+ * 1) Create the new ExecutionEnvironment object that will represent a remote host for our knowledge sessions.
+ * 2) Each ExecutionEnvironment will have an underlaying connection to support remote/distribtued interactions
+ * 3) for each Execution Environment registered in this topology
+ * 3.1) We need to inject the reference from the newly created Execution Enviroment to the existing ones
+ * 3.2) We need to inject the reference from all the existing Execution Environments to the newly created
+ * 4) for each Directory Instance registered in this topology
+ * 4.1) We need to inject the reference from the newly created Execution Environment in each exisiting Directory
+ * 4.2) We need to inject a reference from each existing directory to the newly created Execution Environment
+ * 5) Add the newly created Execution Environment to the topology maps. We keep to maps for Execution Environments
+ * to be able to look based on the underlaying connector and based on the defined Execution Environment name.
+ * 6) Register the Execution Environment inside all the currently available Directory Instances
+ */
public void registerTaskServerInstance(String name, GenericProvider provider) {
TaskServerInstance taskServer = TaskServerInstanceFactory.newTaskServerInstance(name, provider);
GenericHumanTaskConnector connector = taskServer.getConnector();
GenericConnection connection = connector.getConnection();
connection.addHumanTaskNode(connector);
-
- taskServerInstance.put(name, taskServer);
- registerResourceInCurrentDirectories(name, provider.getId());
+ taskServerInstances.put(name, taskServer);
+ registerResourceInDirectories(name, provider.getId());
+
}
-
-
-
public TaskServerInstance getTaskServerInstance(String name) {
- return taskServerInstance.get(name);
+ return taskServerInstances.get(name);
}
public TaskServerInstance getBestTaskServerInstance(TaskServerInstanceSelectionStrategy strategy) {
- return (TaskServerInstance) strategy.getBestTaskServerInstance(taskServerInstance);
+ return (TaskServerInstance) strategy.getBestTaskServerInstance(taskServerInstances);
}
public TaskServerInstance getBestTaskServerInstance() {
- return DEFAULT_TASK_STRATEGY.getBestTaskServerInstance(taskServerInstance);
+ return DEFAULT_TASK_STRATEGY.getBestTaskServerInstance(taskServerInstances);
}
public void unregisterTaskServerInstance(String name) {
//Remove Task Server Instance
- taskServerInstance.remove(name);
+ taskServerInstances.remove(name);
//UnRegister task Server instance from current Directories
- unregisterResourceInCurrentDirectories(name);
+ unregisterResourceFromDirectories(name);
}
-
+ public void disconnect() throws ConnectorException, RemoteException {
+ for (String key : executionEnvironments.keySet()) {
+ ExecutionEnvironment ee = executionEnvironments.get(key);
+ GenericNodeConnector connector = ee.getConnector();
+ connector.getConnection().dispose();
+ connector.disconnect();
+ }
+ for (String key : directoryInstances.keySet()) {
+ DirectoryInstance dir = directoryInstances.get(key);
+ GenericNodeConnector connector = dir.getConnector();
+ connector.getConnection().dispose();
+ connector.disconnect();
+ }
+ for (String key : taskServerInstances.keySet()) {
+ TaskServerInstance taskServer = taskServerInstances.get(key);
+ GenericHumanTaskConnector connector = taskServer.getConnector();
+ connector.getConnection().dispose();
+ connector.disconnect();
+ }
+ }
+
+
public void dispose() throws ConnectorException, RemoteException {
-
+
for (String key : executionEnvironments.keySet()) {
- executionEnvironments.get(key).getConnector().disconnect();
- executionEnvironments.get(key).getConnector().getConnection().dispose();
+ unregisterExecutionEnvironment(key);
}
- for (String key : directories.keySet()) {
- directories.get(key).getConnector().disconnect();
- directories.get(key).getConnector().getConnection().dispose();
+ for (String key : directoryInstances.keySet()) {
+ unregisterDirectoryInstance(key);
}
- for (String key : taskServerInstance.keySet()) {
- taskServerInstance.get(key).getConnector().disconnect();
- taskServerInstance.get(key).getConnector().getConnection().dispose();
+ for (String key : taskServerInstances.keySet()) {
+ unregisterTaskServerInstance(key);
}
}
- private void registerResourceInCurrentDirectories(String name, String resourceId) {
- for (DirectoryInstance directory : directories.values()) {
+ private void registerResourceInDirectories(String name, String resourceId) {
+ for (DirectoryInstance directory : directoryInstances.values()) {
try {
DirectoryNodeService directoryNode = directory.getDirectoryService().get(DirectoryNodeService.class);
@@ -247,8 +399,8 @@
}
}
- private void unregisterResourceInCurrentDirectories(String name) {
- for (DirectoryInstance directory : directories.values()) {
+ private void unregisterResourceFromDirectories(String name) {
+ for (DirectoryInstance directory : directoryInstances.values()) {
try {
DirectoryNodeService directoryNode = directory.getDirectoryService().get(DirectoryNodeService.class);
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterDirectoryTest.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -23,6 +23,7 @@
import org.drools.builder.KnowledgeBuilderFactoryService;
import org.drools.builder.ResourceType;
import org.drools.grid.ConnectorException;
+import org.drools.grid.DirectoryNode;
import org.drools.grid.DirectoryNodeService;
import org.drools.grid.ExecutionNode;
import org.drools.grid.services.DirectoryInstance;
@@ -30,10 +31,8 @@
import org.drools.grid.services.GridTopology;
import org.drools.grid.services.configuration.DirectoryInstanceConfiguration;
import org.drools.grid.services.configuration.ExecutionEnvironmentConfiguration;
-import org.drools.grid.services.configuration.GenericProvider;
import org.drools.grid.services.configuration.GridTopologyConfiguration;
import org.drools.grid.services.configuration.LocalProvider;
-import org.drools.grid.services.configuration.MinaProvider;
import org.drools.grid.services.factory.GridTopologyFactory;
import org.drools.grid.services.strategies.DirectoryInstanceByPrioritySelectionStrategy;
import org.drools.grid.services.strategies.ExecutionEnvByPrioritySelectionStrategy;
@@ -87,9 +86,10 @@
Assert.assertNotNull("Directory Instance null", directory);
DirectoryNodeService dir = directory.getDirectoryService().get(DirectoryNodeService.class);
+ directory.getConnector().disconnect();
Assert.assertNotNull("Dir Null", dir);
-
+ System.out.println("Dir = " + dir.getExecutorsMap());
Assert.assertEquals(2, dir.getExecutorsMap().size());
grid.dispose();
@@ -212,7 +212,7 @@
kbase = dirService.lookupKBase("DoctorsKBase");
Assert.assertNotNull(kbase);
-
+ directory.getConnector().disconnect();
grid.dispose();
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterMinaDirectoryTest.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -20,6 +20,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.rmi.RemoteException;
+import java.util.Map;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.drools.KnowledgeBase;
@@ -93,7 +94,7 @@
this.serverDir = new MinaAcceptor(dirAcceptor, dirAddress);
this.serverDir.start();
System.out.println("Dir Server 1 Started! at = " + dirAddress.toString());
- Thread.sleep(5000);
+
// End Execution Server
//Execution Node related stuff
@@ -111,31 +112,28 @@
serverNode.start();
System.out.println("Exec Server 1 Started! at = " + address.toString());
- Thread.sleep(5000);
+
}
@After
public void tearDown() throws InterruptedException, ConnectorException, RemoteException {
- grid.dispose();
-
- Thread.sleep(5000);
-
- Assert.assertEquals(0, serverDir.getCurrentSessions());
- serverDir.stop();
- System.out.println("Dir Server Stopped!");
+
Assert.assertEquals(0, serverNode.getCurrentSessions());
serverNode.stop();
System.out.println("Execution Server Stopped!");
+ Assert.assertEquals(0, serverDir.getCurrentSessions());
+ serverDir.stop();
+ System.out.println("Dir Server Stopped!");
+
}
@Test
public void directoryRemoteTest() throws ConnectorException, RemoteException {
-
GridTopologyConfiguration gridTopologyConfiguration = new GridTopologyConfiguration("MyTopology");
gridTopologyConfiguration.addDirectoryInstance(new DirectoryInstanceConfiguration("MyMinaDir", new MinaProvider("127.0.0.1", 9123)));
gridTopologyConfiguration.addExecutionEnvironment(new ExecutionEnvironmentConfiguration("MyLocalEnv", new LocalProvider()));
@@ -144,7 +142,6 @@
grid = GridTopologyFactory.build(gridTopologyConfiguration);
-
Assert.assertNotNull(grid);
@@ -154,24 +151,24 @@
DirectoryNodeService dir = directory.getDirectoryService().get(DirectoryNodeService.class);
Assert.assertNotNull(dir);
+ Map<String, String> dirMap = dir.getExecutorsMap();
+
+ directory.getConnector().disconnect();
- Assert.assertNotNull("Dir Null", dir.getExecutorsMap());
- Assert.assertEquals(3, dir.getExecutorsMap().size());
+ Assert.assertNotNull("Dir Null", dirMap);
- System.out.println("dir.getDirectoryMap() = "+dir.getExecutorsMap());
+ Assert.assertEquals(3, dirMap.size());
- Assert.assertEquals(3, dir.getExecutorsMap().size());
+ System.out.println("dir.getDirectoryMap() = "+dirMap);
- directory.getConnector().disconnect();
+ Assert.assertEquals(3, dirMap.size());
+ Assert.assertEquals(0, serverNode.getCurrentSessions());
+ //Then we can get the registered Execution Environments by Name
-
-
- //Then we can get the registered Execution Environments by Name
-
ExecutionEnvironment ee = grid.getExecutionEnvironment("MyRemoteEnv");
Assert.assertNotNull(ee);
-
+
// Give me an ExecutionNode in the selected environment
// For the Mina we have just one Execution Node per server instance
ExecutionNode node = ee.getExecutionNode();
@@ -220,12 +217,15 @@
ksession = (StatefulKnowledgeSession) node.get(DirectoryLookupFactoryService.class).lookup("ksession1");
int fired = ksession.fireAllRules();
+
Assert.assertEquals(2, fired);
+
+ grid.dispose();
+
-
}
@Test
@@ -293,13 +293,16 @@
Assert.assertNotNull(ksession);
node.get(DirectoryLookupFactoryService.class).register("sessionName", ksession);
+
+ grid.disconnect();
+
DirectoryInstance directoryInstance = grid.getDirectoryInstance();
DirectoryNodeService directory = directoryInstance.getDirectoryService().get(DirectoryNodeService.class);
GenericNodeConnector connector = directory.lookup("sessionName");
directoryInstance.getConnector().disconnect();
- grid.dispose();
+
//System.out.println("Connector -->"+connector.getId());
node = grid.getExecutionEnvironment(connector).getExecutionNode();
@@ -307,6 +310,8 @@
ksession = (StatefulKnowledgeSession) node.get(DirectoryLookupFactoryService.class).lookup("sessionName");
Assert.assertNotNull(ksession);
+ grid.dispose();
+
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-services/src/test/java/org/drools/services/RegisterTaskTest.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -194,13 +194,17 @@
handler.dispose();
+
Assert.assertEquals(0, serverNode.getCurrentSessions());
serverNode.stop();
System.out.println("Execution Server Stopped!");
+
Assert.assertEquals(0, serverTask.getCurrentSessions());
serverTask.stop();
System.out.println("Task Server Stopped!");
+
+
taskSession.dispose();
emf.close();
@@ -296,7 +300,7 @@
Assert.assertTrue(manager.waitTillCompleted(DEFAULT_WAIT_TIME));
Thread.sleep(500);
-
+ taskServer.getConnector().disconnect();
}
private Object eval(Reader reader, Map<String, Object> vars) {
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-task/src/main/java/org/drools/grid/task/RemoteMinaHumanTaskConnector.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -2,11 +2,13 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.rmi.RemoteException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.future.ConnectFuture;
+import org.apache.mina.core.future.IoFuture;
+import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
@@ -57,8 +59,8 @@
public void connect() throws ConnectorException {
if (session != null && session.isConnected()) {
- //throw new IllegalStateException("Already connected. Disconnect first.");
- return;
+ throw new IllegalStateException("Already connected. Disconnect first.");
+
}
try {
@@ -82,9 +84,22 @@
public void disconnect() throws ConnectorException {
if (session != null && session.isConnected()) {
- session.close();
- session.getCloseFuture().join();
+
+ CloseFuture future = session.getCloseFuture();
+
+ future.addListener((IoFutureListener<?>) new IoFutureListener<IoFuture>() {
+
+ public void operationComplete(IoFuture future) {
+ System.out.println("The human task connector session is now closed");
+ }
+ });
+
+ session.close(false);
+ future.awaitUninterruptibly();
+
+ connector.dispose();
}
+
}
private void addResponseHandler(int id,
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java 2010-08-07 03:14:31 UTC (rev 34588)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-task/src/test/java/org/drools/grid/task/CommandBasedServicesWSHumanTaskHandlerTest.java 2010-08-08 17:06:46 UTC (rev 34589)
@@ -151,7 +151,6 @@
9124,
SystemEventListenerFactory.getSystemEventListener());
- htMinaClient.connect();
// setup RemoteService client
More information about the jboss-svn-commits
mailing list