[jboss-cvs] JBoss Messaging SVN: r1809 - in trunk: src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/tools/jmx tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 17 22:53:12 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-12-17 22:53:08 -0500 (Sun, 17 Dec 2006)
New Revision: 1809
Modified:
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java
Log:
Eliminated redundancies, other minor refactorin
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-17 04:41:26 UTC (rev 1808)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-18 03:53:08 UTC (rev 1809)
@@ -46,9 +46,9 @@
import org.jboss.jms.server.remoting.JMSDispatcher;
import org.jboss.jms.util.JNDIUtil;
import org.jboss.logging.Logger;
-import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.contract.ReplicationListener;
import org.jboss.messaging.core.plugin.contract.Replicator;
+import org.jboss.messaging.core.plugin.contract.FailoverMapper;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
/**
@@ -63,17 +63,17 @@
implements ConnectionFactoryManager, ReplicationListener
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(ConnectionFactoryJNDIMapper.class);
-
+
// Static --------------------------------------------------------
-
+
private static final String CF_PREFIX = "CF_";
-
+
// Attributes ----------------------------------------------------
private boolean trace = log.isTraceEnabled();
-
+
protected Context initialContext;
protected ServerPeer serverPeer;
@@ -82,25 +82,25 @@
// Map<uniqueName<String> - ClientConnectionFactoryDelegate> (not just clustered delegates)
protected Map delegates;
-
- protected Replicator replicator;
-
+
+ private Replicator replicator;
+
/*
We cache the map of node->failover node in here.
This is then updated when node joins or leaves the cluster via the replicationListener
When new cfs are deployed we use the cached map
*/
protected Map failoverMap;
-
+
// Constructors --------------------------------------------------
-
+
public ConnectionFactoryJNDIMapper(ServerPeer serverPeer) throws Exception
{
this.serverPeer = serverPeer;
endpoints = new HashMap();
delegates = new HashMap();
}
-
+
// ConnectionFactoryManager implementation -----------------------
public synchronized void registerConnectionFactory(String uniqueName,
@@ -116,15 +116,14 @@
throws Exception
{
log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
-
+
// Sanity check
if (delegates.containsKey(uniqueName))
{
throw new IllegalArgumentException("There's already a connection factory registered with name " + uniqueName);
}
-
+
int id = serverPeer.getNextObjectID();
-
Version version = serverPeer.getVersion();
ServerConnectionFactoryEndpoint endpoint =
@@ -134,7 +133,7 @@
defaultTempQueuePageSize,
defaultTempQueueDownCacheSize);
endpoints.put(uniqueName, endpoint);
-
+
ClientConnectionFactoryDelegate delegate = null;
if (clustered)
@@ -145,65 +144,64 @@
{
log.info("ConnectionFactoryJNDIMapper is non clustered");
}
-
+
boolean creatingClustered = clustered && replicator != null;
-
+
ClientConnectionFactoryDelegate localDelegate =
new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
locatorURI, version, clientPing);
-
- /*
- * When registering a new clustered connection factory i should first create it with the available delegates
- * then send the replication message.
- * We then listen for connection factories added to global state using the replication listener
- * and then update their connection factory list.
- * This will happen locally too, so we will get the replication message locally - to avoid updating it again
- * we can ignore any "add" replication updates that originate from the current node.
- */
-
+
+ // When registering a new clustered connection factory I should first create it with the
+ // available delegates then send the replication message. We then listen for connection
+ // factories added to global state using the replication listener and then update their
+ // connection factory list. This will happen locally too, so we will get the replication
+ // message locally - to avoid updating it again we can ignore any "add" replication updates
+ // that originate from the current node.
+
if (creatingClustered)
{
- //Replicate the change - we will ignore this locally
-
+ // Replicate the change - we will ignore this locally
+
replicator.put(CF_PREFIX + uniqueName, localDelegate);
-
- //Create a clustered delegate
-
+
+ // Create a clustered delegate
+
Map localDelegates = replicator.get(CF_PREFIX + uniqueName);
-
- delegate = createClusteredDelegate(localDelegates);
-
+ delegate = createClusteredDelegate(localDelegates.values());
+
}
else
{
delegate = localDelegate;
}
-
- log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
-
+
+ log.trace(this + " adding delegates factory " + uniqueName + " pointing to " + delegate);
+
delegates.put(uniqueName, delegate);
- //Now bind it in JNDI
+ // Now bind it in JNDI
rebindConnectionFactory(initialContext, jndiBindings, delegate);
-
- //Registering with the dispatcher should always be the last thing otherwise a client could use
- //a partially initialised object
- JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
+
+ // Registering with the dispatcher should always be the last thing otherwise a client could
+ // use a partially initialised object
+ JMSDispatcher.instance.
+ registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
}
-
- public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
+
+ public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered)
+ throws Exception
{
log.trace("ConnectionFactory " + uniqueName + " being unregistered");
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)endpoints.remove(uniqueName);
-
+
if (endpoint == null)
{
throw new IllegalArgumentException("Cannot find endpoint with name " + uniqueName);
}
-
+
JNDIBindings jndiBindings = endpoint.getJNDIBindings();
-
+
if (jndiBindings != null)
{
List jndiNames = jndiBindings.getNames();
@@ -220,65 +218,70 @@
ClientConnectionFactoryDelegate delegate =
(ClientConnectionFactoryDelegate)delegates.remove(uniqueName);
-
+
if (delegate == null)
{
throw new IllegalArgumentException("Cannot find factory with name " + uniqueName);
}
-
+
if (clustered)
{
setupReplicator();
-
- // Remove from replicants
+ // Remove from replicants
if (replicator != null)
- {
+ {
//There may be no clustered post office deployed
if (!replicator.remove(CF_PREFIX + uniqueName))
{
throw new IllegalStateException("Cannot find replicant to remove: " + CF_PREFIX + uniqueName);
}
}
-
+
}
-
+
JMSDispatcher.instance.unregisterTarget(new Integer(endpoint.getID()));
}
-
+
// MessagingComponent implementation -----------------------------
-
+
public void start() throws Exception
{
initialContext = new InitialContext();
-
+
log.debug("started");
}
-
+
public void stop() throws Exception
{
initialContext.close();
-
+
if (replicator != null)
{
replicator.unregisterListener(this);
}
-
+
log.debug("stopped");
}
-
+
// ReplicationListener interface ----------------------------------
-
+
+ /**
+ * @param updatedReplicantMap Map<Integer(nodeID)-Map<>>
+ */
public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap,
- boolean added, int originatingNodeId)
+ boolean added, int originatorNodeID)
{
+ log.debug(this + " received " + key + " replication change from node " + originatorNodeID +
+ ": " + updatedReplicantMap);
+
try
- {
+ {
if (!(key instanceof String))
{
return;
}
-
+
String sKey = (String)key;
if (sKey.equals(DefaultClusteredPostOffice.ADDRESS_INFO_KEY))
@@ -291,7 +294,7 @@
// reference a single map since the objects are bound in JNDI in serialized form.
recalculateFailoverMap(updatedReplicantMap);
-
+
// Rebind
for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
@@ -300,20 +303,20 @@
String uniqueName = (String)entry.getKey();
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)entry.getValue();
-
+
ClusteredClientConnectionFactoryDelegate del =
(ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
-
+
if (del == null)
{
throw new IllegalStateException("Cannot find cf with name " + uniqueName);
}
-
+
del.setFailoverMap(failoverMap);
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
}
}
- else if (sKey.startsWith(CF_PREFIX) && originatingNodeId != serverPeer.getServerPeerID())
+ else if (sKey.startsWith(CF_PREFIX) && originatorNodeID != serverPeer.getServerPeerID())
{
// A connection factory has been deployed / undeployed - we need to update the local
// delegate arrays inside the clustered connection factories with the same name. We
@@ -327,28 +330,28 @@
ClusteredClientConnectionFactoryDelegate del =
(ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
-
+
if (del == null)
{
throw new IllegalStateException("Cannot find cf with name " + uniqueName);
}
-
+
List newDels = sortCFS(updatedReplicantMap.values());
-
- ClientConnectionFactoryDelegate[] delArr =
+
+ ClientConnectionFactoryDelegate[] delArr =
(ClientConnectionFactoryDelegate[])newDels.
toArray(new ClientConnectionFactoryDelegate[newDels.size()]);
del.setDelegates(delArr);
-
+
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-
+
if (endpoint == null)
{
throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
}
-
+
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
}
}
@@ -357,13 +360,12 @@
log.error("Failed to rebind connection factory", e);
}
}
-
+
// Public --------------------------------------------------------
-
+
public void injectReplicator(Replicator replicator)
{
this.replicator = replicator;
-
replicator.registerListener(this);
}
@@ -371,61 +373,50 @@
{
return "Server[" + serverPeer.getServerPeerID() + "].ConnFactoryJNDIMapper";
}
-
+
// Package protected ---------------------------------------------
-
+
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
-
+
private void setupReplicator() throws Exception
{
this.serverPeer.getPostOfficeInstance();
}
private void recalculateFailoverMap(Map nodeAddressMap) throws Exception
- {
+ {
FailoverMapper mapper = replicator.getFailoverMapper();
failoverMap = mapper.generateMapping(nodeAddressMap.keySet());
- }
-
+ }
+
/**
* @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
*/
- private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates)
- throws Exception
+ private ClusteredClientConnectionFactoryDelegate
+ createClusteredDelegate(Collection localDelegates) throws Exception
{
- if (trace) { log.trace(this + " updating failover delegates, map size " + localDelegates.size()); }
-
- //First sort the local delegates in order of server id
- List localDels = sortCFS(localDelegates.values());
-
- Collections.sort(localDels,
- new Comparator()
- {
- public int compare(Object obj1, Object obj2)
- {
- ClientConnectionFactoryDelegate del1 = (ClientConnectionFactoryDelegate)obj1;
- ClientConnectionFactoryDelegate del2 = (ClientConnectionFactoryDelegate)obj2;
- return del1.getServerID() - del2.getServerID();
- }
- });
+ if (trace) { log.trace(this + " updating failover delegates with " + localDelegates); }
- ClientConnectionFactoryDelegate[] delArr =
- (ClientConnectionFactoryDelegate[])localDels.
- toArray(new ClientConnectionFactoryDelegate[localDelegates.size()]);
+ // First sort the local delegates in order of server id
+ List sortedLocalDelegates = sortCFS(localDelegates);
+ ClientConnectionFactoryDelegate[] delegates =
+ (ClientConnectionFactoryDelegate[])sortedLocalDelegates.
+ toArray(new ClientConnectionFactoryDelegate[sortedLocalDelegates.size()]);
+
// If the map is not cached - generate it now
-
+
if (failoverMap == null)
{
Map nodeAddressMap = replicator.get(DefaultClusteredPostOffice.ADDRESS_INFO_KEY);
-
+
if (nodeAddressMap == null)
{
throw new IllegalStateException("Cannot find address node mapping!");
}
-
+
recalculateFailoverMap(nodeAddressMap);
}
@@ -437,12 +428,12 @@
// this loop.
ClientConnectionFactoryDelegate mainDelegate = null;
-
- for(Iterator i = localDels.iterator(); i.hasNext();)
+
+ for(Iterator i = localDelegates.iterator(); i.hasNext();)
{
ClientConnectionFactoryDelegate del = (ClientConnectionFactoryDelegate)i.next();
-
- if (del.getServerID() == this.serverPeer.getServerPeerID())
+
+ if (del.getServerID() == serverPeer.getServerPeerID())
{
// sanity check
if (mainDelegate != null)
@@ -453,12 +444,11 @@
mainDelegate = del;
}
}
-
- return new ClusteredClientConnectionFactoryDelegate(mainDelegate, delArr, failoverMap);
+
+ return new ClusteredClientConnectionFactoryDelegate(mainDelegate, delegates, failoverMap);
}
-
- private void rebindConnectionFactory(Context ic,
- JNDIBindings jndiBindings,
+
+ private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,
ClientConnectionFactoryDelegate delegate)
throws NamingException
{
@@ -475,25 +465,25 @@
}
}
}
-
- /*
+
+ /**
* Sort the collection of delegates in order of server id
*/
private List sortCFS(Collection delegates)
{
List localDels = new ArrayList(delegates);
-
+
Collections.sort(localDels,
- new Comparator()
- {
- public int compare(Object obj1, Object obj2)
- {
- ClientConnectionFactoryDelegate del1 = (ClientConnectionFactoryDelegate)obj1;
- ClientConnectionFactoryDelegate del2 = (ClientConnectionFactoryDelegate)obj2;
- return del1.getServerID() - del2.getServerID();
- }
- });
-
+ new Comparator()
+ {
+ public int compare(Object obj1, Object obj2)
+ {
+ ClientConnectionFactoryDelegate del1 = (ClientConnectionFactoryDelegate)obj1;
+ ClientConnectionFactoryDelegate del2 = (ClientConnectionFactoryDelegate)obj2;
+ return del1.getServerID() - del2.getServerID();
+ }
+ });
+
return localDels;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-17 04:41:26 UTC (rev 1808)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-18 03:53:08 UTC (rev 1809)
@@ -106,13 +106,47 @@
private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
// Key for looking up node id -> address info mapping from replicated data
- public static final String ADDRESS_INFO_KEY = "address_info";
+ public static final String ADDRESS_INFO_KEY = "ADDRESS_INFO";
// Key for looking up node id -> failed over for node id mapping from replicated data
- public static final String FAILED_OVER_FOR_KEY = "failed_over_for";
+ public static final String FAILED_OVER_FOR_KEY = "FAILED_OVER_FOR";
// Static --------------------------------------------------------
+ /**
+ * @param map - Map<Integer(nodeID)-Integer(failoverNodeID)>
+ */
+ public static String dumpFailoverMap(Map map)
+ {
+ StringBuffer sb = new StringBuffer("\n");
+
+ for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)i.next();
+ Integer primary = (Integer)entry.getKey();
+ Integer secondary = (Integer)entry.getValue();
+ sb.append(" ").append(primary).append("->").append(secondary).append("\n");
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @param map - Map<Integer(nodeID)-PostOfficeAddressInfo>
+ */
+ public static String dumpClusterMap(Map map)
+ {
+ StringBuffer sb = new StringBuffer("\n");
+
+ for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)i.next();
+ Integer nodeID = (Integer)entry.getKey();
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
+ sb.append(" ").append(nodeID).append("->").append(info).append("\n");
+ }
+ return sb.toString();
+ }
+
// Attributes ----------------------------------------------------
// Used for failure testing
@@ -151,7 +185,7 @@
private Map holdingArea;
- //Map < node id , failover node id>
+ // Map <Integer(nodeID)->Integer(failoverNodeID)>
private Map failoverMap;
private Set leftSet;
@@ -2260,35 +2294,6 @@
log.debug(this + " sent " + notificationType + " JMX notification");
}
- private String dumpClusterMap(Map map)
- {
- StringBuffer sb = new StringBuffer("\n");
-
- for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
- {
- Map.Entry entry = (Map.Entry)i.next();
- Integer nodeID = (Integer)entry.getKey();
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
- sb.append(" ").append("nodeID ").append(nodeID).append(" - ").append(info).append("\n");
- }
- return sb.toString();
- }
-
- private String dumpFailoverMap(Map map)
- {
- StringBuffer sb = new StringBuffer("\n");
-
- for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
- {
- Map.Entry entry = (Map.Entry)i.next();
- Integer primary = (Integer)entry.getKey();
- Integer secondary = (Integer)entry.getValue();
- sb.append(" ").append(primary).append("->").append(secondary).append("\n");
- }
- return sb.toString();
- }
-
-
// Inner classes -------------------------------------------------------------------
/*
@@ -2308,7 +2313,7 @@
}
try
{
- if (trace) { log.trace(this + " got state"); }
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener got state"); }
return getStateAsBytes();
}
catch (Exception e)
@@ -2343,7 +2348,7 @@
try
{
processStateBytes(bytes);
- if (trace) { log.trace(this + " has set state"); }
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".ControlMessageListener has set state"); }
}
catch (Exception e)
{
@@ -2549,18 +2554,20 @@
private class NodeAddressMapListener implements ReplicationListener
{
public void onReplicationChange(Serializable key, Map updatedReplicantMap,
- boolean added, int originatorNodeId)
+ boolean added, int originatorNodeID)
{
+ log.debug(DefaultClusteredPostOffice.this + " received " + key +
+ " replication change from node " + originatorNodeID + ": " + updatedReplicantMap);
+
if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
{
log.debug("Cluster map:\n" + dumpClusterMap(updatedReplicantMap));
- // A node-address mapping has been added/removed from global state, we need to update
- // the failover map.
- failoverMap = failoverMapper.generateMapping(updatedReplicantMap.keySet());
-
- log.debug("Failover map:\n" + dumpFailoverMap(failoverMap));
+ // A node-address mapping has been added/removed from global state, we need to update
+ // the failover map.
+ failoverMap = failoverMapper.generateMapping(updatedReplicantMap.keySet());
+ log.debug("Failover map:\n" + dumpFailoverMap(failoverMap));
+ }
}
}
- }
}
\ No newline at end of file
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-17 04:41:26 UTC (rev 1808)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java 2006-12-18 03:53:08 UTC (rev 1809)
@@ -64,7 +64,6 @@
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.ServerInvocationHandler;
import org.jboss.remoting.transport.PortUtil;
-import org.jboss.remoting.serialization.SerializationStreamFactory;
import org.jboss.resource.adapter.jdbc.local.LocalManagedConnectionFactory;
import org.jboss.resource.adapter.jdbc.remote.WrapperDataSourceService;
import org.jboss.resource.adapter.jms.JmsManagedConnectionFactory;
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-17 04:41:26 UTC (rev 1808)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-18 03:53:08 UTC (rev 1809)
@@ -37,7 +37,6 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.remoting.ServerInvocationHandler;
/**
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java 2006-12-17 04:41:26 UTC (rev 1808)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RemoteTestServer.java 2006-12-18 03:53:08 UTC (rev 1809)
@@ -24,7 +24,6 @@
import org.jboss.jms.server.DestinationManager;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.contract.PostOffice;
/**
*
More information about the jboss-cvs-commits
mailing list