[jboss-cvs] JBoss Messaging SVN: r1780 - in trunk: . src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/bin tests/etc tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/tools/jmx/rmi
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 13 00:30:43 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-12-13 00:30:32 -0500 (Wed, 13 Dec 2006)
New Revision: 1780
Modified:
trunk/messaging.iml
trunk/messaging.ipr
trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java
trunk/tests/bin/runtest
trunk/tests/etc/log4j.xml
trunk/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
Log:
- Fixed a cascade failover bug.
- The PostOffice that completes the failover sends now a JMX FAILOVER_COMPLETED_NOTIFICATION.
- Multiple logging improvments (watch for a logging blog, coming soon).
- Minor refactoring.
Modified: trunk/messaging.iml
===================================================================
--- trunk/messaging.iml 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/messaging.iml 2006-12-13 05:30:32 UTC (rev 1780)
@@ -41,6 +41,10 @@
<orderEntry type="library" name="jgroups" level="project" />
<orderEntry type="library" name="junit" level="project" />
<orderEntry type="library" name="jboss-system" level="project" />
+ <orderEntry type="library" name="jboss-common-jdbc-wrapper" level="project" />
+ <orderEntry type="library" name="jboss-jca" level="project" />
+ <orderEntry type="library" name="jboss-local-jdbc" level="project" />
+ <orderEntry type="library" name="jms-ra" level="project" />
<orderEntryProperties />
</component>
<component name="VcsManagerConfiguration">
Modified: trunk/messaging.ipr
===================================================================
--- trunk/messaging.ipr 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/messaging.ipr 2006-12-13 05:30:32 UTC (rev 1780)
@@ -308,6 +308,34 @@
<JAVADOC />
<SOURCES />
</library>
+ <library name="jboss-common-jdbc-wrapper">
+ <CLASSES>
+ <root url="jar://$PROJECT_DIR$/tests/lib/jboss-common-jdbc-wrapper.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="jboss-jca">
+ <CLASSES>
+ <root url="jar://$PROJECT_DIR$/tests/lib/jboss-jca.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="jboss-local-jdbc">
+ <CLASSES>
+ <root url="jar://$PROJECT_DIR$/tests/lib/jboss-local-jdbc.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
+ <library name="jms-ra">
+ <CLASSES>
+ <root url="jar://$PROJECT_DIR$/tests/lib/jms-ra.jar!/" />
+ </CLASSES>
+ <JAVADOC />
+ <SOURCES />
+ </library>
</component>
<component name="uidesigner-configuration">
<option name="INSTRUMENT_CLASSES" value="true" />
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -294,7 +294,7 @@
public String toString()
{
- return "ClientConnectionFactoryDelegate[" + id + "]";
+ return "ClientConnectionFactoryDelegate[ID=" + id + "]";
}
public String getServerLocatorURI()
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -118,7 +118,17 @@
public String toString()
{
- return "ClusteredClientConnectionFactoryDelegate[" + id + "] with delegates.length = " + (delegates==null?"null":Integer.toString(delegates.length));
+ StringBuffer sb = new StringBuffer("ClusteredConnFactoryDelegate[ID=");
+ sb.append(id).append("][");
+ if (delegates == null)
+ {
+ sb.append("0]");
+ }
+ else
+ {
+ sb.append(delegates.length).append("]");
+ }
+ return sb.toString();
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -756,8 +756,6 @@
//TODO we shouldn't have a dependency on DefaultClusteredPostOffice - where should we put the constants?
Map replicants = replicator.get(DefaultClusteredPostOffice.FAILED_OVER_FOR_KEY);
- log.info("Got replicants");
-
boolean foundEntry = false;
if (replicants != null)
@@ -1099,14 +1097,15 @@
private class FailoverListener implements ReplicationListener
{
- public void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added, int originatingNodeId)
+ public void onReplicationChange(Serializable key, Map updatedReplicantMap,
+ boolean added, int originatingNodeId)
{
if (key.equals(DefaultClusteredPostOffice.FAILED_OVER_FOR_KEY))
{
- //We have a failover status change - notify anyone waiting
-
- log.info("Got replication change on failed over map, notifying those waiting on lock");
-
+ // We have a failover status change - notify anyone waiting
+
+ log.debug(ServerPeer.this + ".FailoverListener got failover event, notifying those waiting on lock");
+
synchronized (failoverStatusLock)
{
failoverStatusLock.notifyAll();
Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -67,6 +67,8 @@
private static final String CF_PREFIX = "CF_";
// Attributes ----------------------------------------------------
+
+ private boolean trace = log.isTraceEnabled();
protected Context initialContext;
protected ServerPeer serverPeer;
@@ -109,8 +111,7 @@
boolean clustered)
throws Exception
{
- log.info("Registering connection factory with name " + uniqueName + " " +
- "bindings " + jndiBindings);
+ log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
// Sanity check
if (delegates.containsKey(uniqueName))
@@ -267,7 +268,6 @@
public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap,
boolean added, int originatingNodeId)
{
- log.info("Got replication call " + key + " node=" + serverPeer.getServerPeerID() + " replicants=" + updatedReplicantMap + " added=");
try
{
if (!(key instanceof String))
@@ -279,31 +279,26 @@
if (sKey.equals(DefaultClusteredPostOffice.ADDRESS_INFO_KEY))
{
- /*
- We respond to changes in the node-address mapping
- This will be replicated whan a node joins / leaves the group
- When this happens we need to recalculate the failoverMap
- and rebind all connection factories with the new mapping
- We cannot just reference a single map since the objects are bound in JNDI
- in serialized form
- */
- log.info("responding to node - adress info change. Recalculating mapping and rebinding cfs");
-
+ log.debug(this + " received address mapping change " + updatedReplicantMap);
+
+ // We respond to changes in the node-address mapping. This will be replicated whan a
+ // node joins / leaves the group. When this happens we need to recalculate the
+ // failoverMap and rebind all connection factories with the new mapping. We cannot just
+ // reference a single map since the objects are bound in JNDI in serialized form.
+
recalculateFailoverMap(updatedReplicantMap);
- //rebind
- Iterator iter = endpoints.entrySet().iterator();
-
- while (iter.hasNext())
+ // Rebind
+
+ for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
{
- Map.Entry entry = (Map.Entry)iter.next();
-
+ Map.Entry entry = (Map.Entry)i.next();
String uniqueName = (String)entry.getKey();
-
ServerConnectionFactoryEndpoint endpoint =
(ServerConnectionFactoryEndpoint)entry.getValue();
- ClusteredClientConnectionFactoryDelegate del = (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
+ ClusteredClientConnectionFactoryDelegate del =
+ (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
if (del == null)
{
@@ -311,37 +306,33 @@
}
del.setFailoverMap(failoverMap);
-
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
}
-
}
else if (sKey.startsWith(CF_PREFIX) && originatingNodeId != serverPeer.getServerPeerID())
{
- /*
- A connection factory has been deployed / undeployed - we need to update the local delegate arrays inside the clustered
- connection factories with the same name
- We don't recalculate the failover map since the number of nodes in the group hasn't changed
- We also ignore any local changes since the cf will already be bound locally with the new
- local delegate in the array
- */
-
+ // A connection factory has been deployed / undeployed - we need to update the local
+ // delegate arrays inside the clustered connection factories with the same name. We
+ // don't recalculate the failover map since the number of nodes in the group hasn't
+ // changed. We also ignore any local changes since the cf will already be bound locally
+ // with the new local delegate in the array
+
String uniqueName = sKey.substring(CF_PREFIX.length());
+
+ log.debug(this + " received '" + uniqueName + "' connection factory update " + updatedReplicantMap);
+
+ ClusteredClientConnectionFactoryDelegate del =
+ (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
- log.info("Connection factory with unique name " + uniqueName + " has been added / removed");
-
- ClusteredClientConnectionFactoryDelegate del = (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
-
if (del == null)
{
throw new IllegalStateException("Cannot find cf with name " + uniqueName);
}
ClientConnectionFactoryDelegate[] delArr =
- (ClientConnectionFactoryDelegate[])updatedReplicantMap.values().toArray(new ClientConnectionFactoryDelegate[updatedReplicantMap.size()]);
+ (ClientConnectionFactoryDelegate[])updatedReplicantMap.values().
+ toArray(new ClientConnectionFactoryDelegate[updatedReplicantMap.size()]);
- log.info("Updating delsArr with size " + delArr.length);
-
del.setDelegates(delArr);
ServerConnectionFactoryEndpoint endpoint =
@@ -353,8 +344,7 @@
}
rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
-
- }
+ }
}
catch (Exception e)
{
@@ -370,6 +360,11 @@
replicator.registerListener(this);
}
+
+ public String toString()
+ {
+ return "Server[" + serverPeer.getServerPeerID() + "].ConnFactoryJNDIMapper";
+ }
// Package protected ---------------------------------------------
@@ -385,22 +380,22 @@
private void recalculateFailoverMap(Map nodeAddressMap) throws Exception
{
FailoverMapper mapper = replicator.getFailoverMapper();
-
failoverMap = mapper.generateMapping(nodeAddressMap.keySet());
}
/**
* @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
*/
- private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates) throws Exception
+ private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates)
+ throws Exception
{
- //TODO: make it trace after the code is stable
- log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
+ if (trace) { log.trace(this + " updating failover delegates, map size " + localDelegates.size()); }
ClientConnectionFactoryDelegate[] delArr =
- (ClientConnectionFactoryDelegate[])localDelegates.values().toArray(new ClientConnectionFactoryDelegate[localDelegates.size()]);
+ (ClientConnectionFactoryDelegate[])localDelegates.values().
+ toArray(new ClientConnectionFactoryDelegate[localDelegates.size()]);
- //If the map is not cached - generate it now
+ // If the map is not cached - generate it now
if (failoverMap == null)
{
@@ -414,12 +409,13 @@
recalculateFailoverMap(nodeAddressMap);
}
- // The main delegated is needed for the construction of ClusteredClientConnectionFactoryDelegate
- // ClusteredClientConnectionFactoryDelegate extends ClientConnectionFactoryDelegate and it will
- // need the current server's delegate properties to be bound to ObjectId, ServerLocator and
- // other connection properties.
- //
- // The ClusteredCFDelegate will copy these properties on its contructor defined bellow after this loop
+ // The main delegated is needed for the construction of
+ // ClusteredClientConnectionFactoryDelegate. ClusteredClientConnectionFactoryDelegate extends
+ // ClientConnectionFactoryDelegate and it will need the current server's delegate properties
+ // to be bound to ObjectId, ServerLocator and other connection properties.
+ // The ClusteredCFDelegate will copy these properties on its contructor defined bellow after
+ // this loop.
+
ClientConnectionFactoryDelegate mainDelegate = null;
for(Iterator i = localDelegates.values().iterator(); i.hasNext();)
@@ -431,7 +427,8 @@
// sanity check
if (mainDelegate != null)
{
- throw new IllegalStateException("There are two servers with serverID=" + this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
+ throw new IllegalStateException("There are two servers with serverID=" +
+ this.serverPeer.getServerPeerID() + ", verify your clustering configuration");
}
mainDelegate = del;
}
@@ -453,7 +450,7 @@
for(Iterator i = jndiNames.iterator(); i.hasNext(); )
{
String jndiName = (String)i.next();
- log.info("Rebinding " + jndiName + " CF=" + cf );
+ log.debug(this + " rebinding " + cf + " as " + jndiName);
JNDIUtil.rebind(ic, jndiName, cf);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -41,6 +41,7 @@
public interface ClusteredPostOffice extends PostOffice, Peer
{
public static final String VIEW_CHANGED_NOTIFICATION = "VIEW_CHANGED";
+ public static final String FAILOVER_COMPLETED_NOTIFICATION = "FAILOVER_COMPLETED";
/**
* Bind a queue to the post office under a specific condition
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/FailoverMapper.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -37,5 +37,8 @@
*/
public interface FailoverMapper
{
+ /**
+ * @param nodes Set<Integer>.
+ */
Map generateMapping(Set nodes);
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -635,11 +635,11 @@
public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
throws Exception
{
- log.info("##########putReplicantLocally received, before lock");
synchronized (replicatedData)
{
- log.info("putReplicantLocally received, after lock");
+ log.debug(this + " puts replicant locally: " + key + "->" + replicant);
+
Map m = (Map)replicatedData.get(key);
if (m == null)
@@ -652,9 +652,9 @@
m.put(new Integer(originatorNodeID), replicant);
notifyListeners(key, m, true, originatorNodeID);
- }
- log.info("putReplicantLocally, completed");
+ if (trace) { log.trace(this + " putReplicantLocally completed"); }
+ }
}
/**
@@ -664,7 +664,7 @@
{
synchronized (replicatedData)
{
- log.info(this.currentNodeId + " removing key " + key + " from node " + originatorNodeID);
+ if (trace) { log.trace(this + " removes " + originatorNodeID + "'s replicant locally for key " + key); }
Map m = (Map)replicatedData.get(key);
@@ -1323,9 +1323,9 @@
/**
* Check for any transactions that need to be committed or rolled back
*/
- public void check(Integer nodeId) throws Throwable
+ public void checkTransactions(Integer nodeId) throws Throwable
{
- if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
+ if (trace) { log.trace(this + " checking for any stranded transactions for node " + nodeId); }
synchronized (holdingArea)
{
@@ -1343,11 +1343,11 @@
{
ClusterTransaction tx = (ClusterTransaction)entry.getValue();
- if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
+ if (trace) { log.trace("found transaction " + tx + " in holding area"); }
boolean commit = tx.check(this);
- if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
+ if (trace) { log.trace("transaction " + tx + " will be " + (commit ? "COMMITTED" : "ROLLED BACK")); }
if (commit)
{
@@ -1360,11 +1360,11 @@
toRemove.add(id);
- if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
+ if (trace) { log.trace("resolved " + tx); }
}
}
- //Remove the transactions from the holding area
+ // Remove the transactions from the holding area
iter = toRemove.iterator();
@@ -1375,7 +1375,7 @@
holdingArea.remove(id);
}
}
- if (trace) { log.trace(this.currentNodeId + " check complete"); }
+ if (trace) { log.trace(this + " transaction check complete"); }
}
public int getNodeId()
@@ -1703,7 +1703,7 @@
*/
private void removeDataForNode(Integer nodeToRemove) throws Exception
{
- log.info("Node " + nodeToRemove + " requested to leave cluster");
+ log.debug(this + " cleaning local data for node " + nodeToRemove);
lock.writeLock().acquire();
@@ -1715,26 +1715,22 @@
{
List toRemove = new ArrayList();
- Iterator iter = nameMap.values().iterator();
-
- while (iter.hasNext())
+ for(Iterator i = nameMap.values().iterator(); i.hasNext(); )
{
- Binding binding = (Binding)iter.next();
+ Binding binding = (Binding)i.next();
if (!binding.getQueue().isRecoverable())
{
- //We only remove the non durable bindings - we still need to be able to handle
- //messages for a durable subscription "owned" by a node that is not active any more!
+ // We only remove the non durable bindings - we still need to be able to handle
+ // messages for a durable subscription "owned" by a node that is not active any
+ // more!
toRemove.add(binding);
}
}
- iter = toRemove.iterator();
-
- while (iter.hasNext())
+ for(Iterator i = toRemove.iterator(); i.hasNext(); )
{
- Binding binding = (Binding)iter.next();
-
+ Binding binding = (Binding)i.next();
removeBinding(nodeToRemove.intValue(), binding.getQueue().getName());
}
}
@@ -1750,7 +1746,6 @@
for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
{
Map.Entry entry = (Map.Entry)i.next();
-
String key = (String)entry.getKey();
Map replicants = (Map)entry.getValue();
@@ -1761,7 +1756,6 @@
i.remove();
}
- // Need to trigger listeners
notifyListeners(key, replicants, false, nodeToRemove.intValue());
}
}
@@ -1779,7 +1773,6 @@
for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
{
ReplicationListener listener = (ReplicationListener)i.next();
-
listener.onReplicationChange(key, updatedReplicantMap, added, originatorNodeId);
}
}
@@ -1792,37 +1785,13 @@
{
if (trace) { log.trace(this + " sending synch request " + request); }
- byte[] bytes = writeRequest(request);
+ Message message = new Message(null, null, writeRequest(request));
- Message message = new Message(null, null, bytes);
-
controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
if (trace) { log.trace(this + " request sent OK"); }
}
- //DEBUG ONLY - remove this
- private void dumpNodeIdAddressMap(Map map) throws Exception
- {
- log.info("** DUMPING NODE ADDRESS MAPPING");
-
- Iterator iter = map.entrySet().iterator();
-
- while (iter.hasNext())
- {
- Map.Entry entry = (Map.Entry)iter.next();
-
- Integer theNodeId = (Integer)entry.getKey();
-
- PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
-
- log.info("node id: " + theNodeId +" --------->(async:sync) " + info.getAsyncChannelAddress() + ":" + info.getSyncChannelAddress());
- }
-
- log.info("** END DUMP");
- }
-
-
//TODO - this is a bit tortuous - needs optimising
private Integer getNodeIdForSyncAddress(Address address) throws Exception
{
@@ -1835,33 +1804,20 @@
throw new IllegalStateException("Cannot find node id -> address mapping");
}
- this.dumpNodeIdAddressMap(map);
+ Integer nid = null;
- Iterator iter = map.entrySet().iterator();
-
- log.info("iterating, looking for " + address);
-
- Integer theNodeId = null;
- while (iter.hasNext())
+ for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
{
- Map.Entry entry = (Map.Entry)iter.next();
-
+ Map.Entry entry = (Map.Entry)i.next();
PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
- log.info("info synch channel address: " + info.getSyncChannelAddress());
-
if (info.getSyncChannelAddress().equals(address))
{
- log.info("equal");
- theNodeId = (Integer)entry.getKey();
+ nid = (Integer)entry.getKey();
break;
}
- else
- {
- log.info("Not equal");
- }
}
- return theNodeId;
+ return nid;
}
}
@@ -2061,11 +2017,9 @@
*/
private void nodeJoined(Address address) throws Exception
{
- if (trace) { log.trace(this + ": " + address + " joined"); }
+ log.debug(this + ": " + address + " joined");
- log.info(this.currentNodeId + " Node with address: " + address + " joined");
-
- //Currently does nothing
+ // Currently does nothing
}
/*
@@ -2073,277 +2027,230 @@
*/
private void nodeLeft(Address address) throws Throwable
{
- if (trace) { log.trace(this + ": " + address + " left"); }
+ log.debug(this + ": " + address + " left");
- log.info(this.currentNodeId + " Node with address: " + address + " left");
+ Integer nid = getNodeIdForSyncAddress(address);
- Integer theNodeId = getNodeIdForSyncAddress(address);
-
- if (theNodeId == null)
+ if (nid == null)
{
- throw new IllegalStateException(this.currentNodeId + " Cannot find node id for address " + address);
+ throw new IllegalStateException(this + " cannot find node ID for address " + address);
}
- boolean crashed = !this.leaveMessageReceived(theNodeId);
+ boolean crashed = !this.leaveMessageReceived(nid);
- if (trace) { log.trace("Node " + address + " id: " + theNodeId +" has left the group, crashed = " + crashed); }
+ log.debug(this + ": node " + nid + " has " + (crashed ? "crashed" : "cleanly left the group"));
- //Cleanup any hanging transactions - we do this irrespective of whether we crashed
- check(theNodeId);
+ // Cleanup any hanging transactions - we do this irrespective of whether we crashed
+ checkTransactions(nid);
synchronized (failoverMap)
{
- //Need to evaluate this before we regenerate the failover map
- Integer failoverNode = (Integer)failoverMap.get(theNodeId);
+ // Need to evaluate this before we regenerate the failover map
+ Integer failoverNode = (Integer)failoverMap.get(nid);
if (failoverNode == null)
{
- throw new IllegalStateException("Cannot find failover node for node " + theNodeId);
+ throw new IllegalStateException(this + " cannot find failover node for node " + nid);
}
- //debug dump failover map
+ // Remove any replicant data and non durable bindings for the node - again we need to do
+ // this irrespective of whether we crashed. This will notify any listeners which will
+ // recalculate the connection factory delegates and failover delegates.
- Iterator iter = failoverMap.entrySet().iterator();
+ removeDataForNode(nid);
- log.info("Dumping failover map");
- while (iter.hasNext())
+ if (currentNodeId == failoverNode.intValue() && crashed)
{
- Map.Entry entry = (Map.Entry)iter.next();
+ // The node crashed and we are the failover node so let's perform failover
- Integer nodeId = (Integer)entry.getKey();
+ log.info(this + ": I am the failover node for node " + nid + " that crashed");
- Integer failoverNodeId = (Integer)entry.getValue();
-
- log.info("node->failover node: " + nodeId + " --> " + failoverNodeId);
- }
- log.info("end dump");
-
- //end debug
-
- boolean isFailover = failoverNode.intValue() == this.currentNodeId;
-
- log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
-
- log.info("Crashed: " + crashed);
-
- //Remove any replicant data and non durable bindings for the node - again we need to do this
- //irrespective of whether we crashed
- //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
- removeDataForNode(theNodeId);
-
- if (crashed && isFailover)
- {
- //The node crashed and we are the failover node
- //so let's perform failover
-
//TODO server side valve
- failOver(theNodeId.intValue());
+ performFailover(nid);
}
}
}
/**
- * Verifies changes on the View deciding if a node joined or left the cluster.
- */
- private void verifyMembership(View oldView, View newView) throws Throwable
- {
- if (oldView != null)
- {
- for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (!newView.containsMember(address))
- {
- nodeLeft(address);
- }
- }
- }
-
- for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (oldView == null || !oldView.containsMember(address))
- {
- nodeJoined(address);
- }
- }
- }
-
- /**
- * This method fails over all the queues from node <failedNodeId> onto this node
- * It is triggered when a JGroups view change occurs due to a member leaving and
- * it's determined the member didn't leave cleanly
+ * This method fails over all the queues from node <failedNodeId> onto this node. It is triggered
+ * when a JGroups view change occurs due to a member leaving and it's determined the member
+ * didn't leave cleanly.
*
- * @param failedNodeId
+ * @param failedNodeID
* @throws Exception
*/
- private void failOver(int failedNodeId) throws Exception
+ private void performFailover(Integer failedNodeID) throws Exception
{
- //Need to lock
+ // Need to lock
lock.writeLock().acquire();
try
{
- log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
+ log.debug(this + " performing failover for failed node " + failedNodeID);
- /*
- We make sure a FailoverStatus object is put in the replicated data for the node
- The real failover node will always add this in.
- This means that each node knows which node has really started the failover for another node, and
- which node did failover for other nodes in the past
- We cannot rely on the failoverMap for this, since that will regenerated once failover is done,
- because of the change in membership.
- And clients may failover after that and need to know if they have the correct node.
- Since this is the first thing we do after detecting failover, it should be very quick that
- all nodes know, however there is still a chance that a client tries to failover before
- the information is replicated.
- */
+ // We make sure a FailoverStatus object is put in the replicated data for the node. The
+ // real failover node will always add this in. This means that each node knows which node
+ // has really started the failover for another node, and which node did failover for other
+ // nodes in the past.
+ // We cannot rely on the failoverMap for this, since that will regenerated once failover is
+ // done, because of the change in membership. And clients may failover after that and need
+ // to know if they have the correct node. Since this is the first thing we do after
+ // detecting failover, it should be very quick that all nodes know, however there is still
+ // a chance that a client tries to failover before the information is replicated.
- Map replicants = (Map)get(FAILED_OVER_FOR_KEY);
+ Map failoverData = (Map)get(FAILED_OVER_FOR_KEY);
+ FailoverStatus status = (FailoverStatus)failoverData.get(new Integer(currentNodeId));
- FailoverStatus status = (FailoverStatus)replicants.get(new Integer(currentNodeId));
-
if (status == null)
{
status = new FailoverStatus();
}
- status.startFailingOverForNode(failedNodeId);
+ status.startFailingOverForNode(failedNodeID);
- log.info("Putting state that failover is starting");
+ log.debug(this + " announcing the cluster it is starting failover procedure");
put(FAILED_OVER_FOR_KEY, status);
- log.info("Put state that failover is starting");
+ log.debug(this + " announced the cluster it is starting failover procedure");
- //Get the map of queues for the failed node
+ // Get the map of queues for the failed node
- Map subMaps = (Map)nameMaps.get(new Integer(failedNodeId));
- if (subMaps==null || subMaps.size()==0)
+ Map subMaps = (Map)nameMaps.get(failedNodeID);
+
+ if (subMaps == null || subMaps.size() == 0)
{
- log.warn("Couldn't find any binding to failOver from serverId=" +failedNodeId);
- return;
+ log.warn(this + " couldn't find any binding to fail over from server " + failedNodeID);
}
-
- //Compile a list of the queue names to remove
- //Note that any non durable bindings will already have been removed (in removeDataForNode()) when the
- //node leave was detected, so if there are any non durable bindings left here then
- //this is an error
-
- //We iterate through twice to avoid ConcurrentModificationException
- ArrayList namesToRemove = new ArrayList();
- for (Iterator iterNames = subMaps.entrySet().iterator(); iterNames.hasNext();)
+ else
{
- Map.Entry entry = (Map.Entry)iterNames.next();
+ // Compile a list of the queue names to remove.
+ // Note that any non durable bindings will already have been removed (in
+ // removeDataForNode()) when the node leave was detected, so if there are any non durable
+ // bindings left here then this is an error.
- Binding binding = (Binding )entry.getValue();
+ // We iterate through twice to avoid ConcurrentModificationException
- //Sanity check
- if (!binding.getQueue().isRecoverable())
- {
- throw new IllegalStateException("Find non recoverable queue in map, these should have been removed!");
- }
+ ArrayList namesToRemove = new ArrayList();
- //Sanity check
- if (!binding.getQueue().isClustered())
+ for (Iterator i = subMaps.entrySet().iterator(); i.hasNext();)
{
- throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
- }
+ Map.Entry entry = (Map.Entry)i.next();
+ Binding binding = (Binding )entry.getValue();
- ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+ // Sanity check
+ if (!binding.getQueue().isRecoverable())
+ {
+ throw new IllegalStateException("Found non recoverable queue in map, " +
+ "these should have been removed!");
+ }
- //Sanity check
- if (queue.isLocal())
- {
- throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
- }
- namesToRemove.add(entry);
- }
+ // Sanity check
+ if (!binding.getQueue().isClustered())
+ {
+ throw new IllegalStateException("Queue " + binding.getQueue().getName() +
+ " is not clustered!");
+ }
- log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+ ClusteredQueue queue = (ClusteredQueue)binding.getQueue();
- for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
- {
- Map.Entry entry = (Map.Entry)iterNames.next();
+ // Sanity check
+ if (queue.isLocal())
+ {
+ throw new IllegalStateException("Queue " + binding.getQueue().getName() +
+ " is local!");
+ }
- Binding binding = (Binding)entry.getValue();
+ namesToRemove.add(entry);
+ }
- RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+ if (trace) { log.trace("deleting " + namesToRemove.size() + " bindings from old node"); }
- String queueName = (String)entry.getKey();
+ for (Iterator i = namesToRemove.iterator(); i.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)i.next();
+ Binding binding = (Binding)entry.getValue();
+ RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+ String queueName = (String)entry.getKey();
- //First the binding is removed from the in memory condition and name maps
- this.removeBinding(failedNodeId, queueName);
+ // First the binding is removed from the in memory condition and name maps ...
+ removeBinding(failedNodeID.intValue(), queueName);
- //Then deleted from the database
- this.deleteBinding(failedNodeId, queueName);
+ // ... then deleted from the database
+ deleteBinding(failedNodeID.intValue(), queueName);
- log.info("deleted binding for " + queueName);
+ log.debug(this + " deleted binding for " + queueName);
- //Note we do not need to send an unbind request across the cluster - this is because
- //when the node crashes a view change will hit the other nodes and that will cause
- //all binding data for that node to be removed anyway
+ // Note we do not need to send an unbind request across the cluster - this is because
+ // when the node crashes a view change will hit the other nodes and that will cause all
+ // binding data for that node to be removed anyway.
- //If there is already a queue registered with the same name, then we set a flag "failed" on the
- //binding and then the queue will go into a special list of failed bindings
- //otherwise we treat at as a normal queue
- //This is because we cannot deal with more than one queue with the same name
- //Any new consumers will always only connect to queues in the main name map
- //This may mean that queues in the failed map have messages stranded in them if consumers
- //disconnect (since no more can reconnect)
- //However we message redistribution activated other queues will be able to consume from them.
- //TODO allow message redistribution for queues in the failed list
- boolean failed = this.internalGetBindingForQueueName(queueName) != null;
+ // If there is already a queue registered with the same name, then we set a flag
+ // "failed" on the binding and then the queue will go into a special list of failed
+ // bindings otherwise we treat at as a normal queue.
+ // This is because we cannot deal with more than one queue with the same name. Any new
+ // consumers will always only connect to queues in the main name map. This may mean that
+ // queues in the failed map have messages stranded in them if consumers disconnect
+ // (since no more can reconnect). However we message redistribution activated other
+ // queues will be able to consume from them.
- if (!failed)
- {
- log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
- }
- else
- {
- log.info("There is already a queue with that name so adding to failed map");
- }
+ //TODO allow message redistribution for queues in the failed list
- //Create a new binding
- Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
- stub.getName(), stub.getChannelID(),
- stub.getFilter(), stub.isRecoverable(), failed);
+ boolean failed = internalGetBindingForQueueName(queueName) != null;
- log.info("Created new binding");
+ if (!failed)
+ {
+ log.debug(this + " did not have a " + queueName +
+ " queue so it's assuming it as a regular queue");
+ }
+ else
+ {
+ log.info(this + " has already a " + queueName + " queue so adding to failed map");
+ }
- //Insert it into the database
- insertBinding(newBinding);
+ // Create a new binding
+ Binding newBinding = createBinding(currentNodeId, binding.getCondition(),
+ stub.getName(), stub.getChannelID(),
+ stub.getFilter(), stub.isRecoverable(), failed);
- LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+ // Insert it into the database
+ insertBinding(newBinding);
- clusteredQueue.deactivate();
- clusteredQueue.load();
- clusteredQueue.activate();
+ LocalClusteredQueue clusteredQueue = (LocalClusteredQueue)newBinding.getQueue();
- log.info("Loaded queue");
+ clusteredQueue.deactivate();
+ clusteredQueue.load();
+ clusteredQueue.activate();
- //Add the new binding in memory
- addBinding(newBinding);
+ log.debug(this + " loaded " + clusteredQueue);
- //Send a bind request so other nodes add it too
- sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+ // Add the new binding in memory
+ addBinding(newBinding);
- //FIXME there is a problem in the above code.
- //If the server crashes between deleting the binding from the database
- //and creating the new binding in the database, then the binding will be completely
- //lost from the database when the server is resurrected.
- //To remedy, both db operations need to be done in the same JBDC tx
+ // Send a bind request so other nodes add it too
+ sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+
+ //FIXME there is a problem in the above code.
+ //If the server crashes between deleting the binding from the database
+ //and creating the new binding in the database, then the binding will be completely
+ //lost from the database when the server is resurrected.
+ //To remedy, both db operations need to be done in the same JBDC tx
+ }
}
- log.info("Server side fail over is now complete");
+ log.info(this + ": server side fail over is now complete");
//TODO - should this be in a finally? I'm not sure
status.finishFailingOver();
- log.info("Putting state that failover has completed");
+ log.debug(this + " announcing the cluster that failover procedure is complete");
+
put(FAILED_OVER_FOR_KEY, status);
- log.info("Put state that failover has completed");
+
+ log.debug(this + " announced the cluster that failover procedure is complete");
+
+ sendJMXNotification(FAILOVER_COMPLETED_NOTIFICATION);
}
finally
{
@@ -2369,34 +2276,38 @@
{
Notification n = new Notification(notificationType, "", 0l);
nbSupport.sendNotification(n);
+ log.debug(this + " sent " + notificationType + " JMX notification");
}
- private void handleViewAccepted(View newView)
+ private String dumpClusterMap(Map map)
{
- //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
- //TODO: can't we do the same since this is pretty useful?
- log.info(currentNodeId + " got new view: " + newView + " postOffice:"
- + DefaultClusteredPostOffice.this.getOfficeName());
+ StringBuffer sb = new StringBuffer("\n");
- // JGroups will make sure this method is never called by more than one thread concurrently
+ for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
+ {
+ Map.Entry entry = (Map.Entry)i.next();
+ Integer nodeID = (Integer)entry.getKey();
+ PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
+ sb.append(" ").append("nodeID ").append(nodeID).append(" - ").append(info).append("\n");
+ }
+ return sb.toString();
+ }
- View oldView = currentView;
- currentView = newView;
+ private String dumpFailoverMap(Map map)
+ {
+ StringBuffer sb = new StringBuffer("\n");
- try
+ for(Iterator i = map.entrySet().iterator(); i.hasNext(); )
{
- verifyMembership(oldView, newView);
- sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ Map.Entry entry = (Map.Entry)i.next();
+ Integer primary = (Integer)entry.getKey();
+ Integer secondary = (Integer)entry.getValue();
+ sb.append(" ").append(primary).append("->").append(secondary).append("\n");
}
- catch (Throwable e)
- {
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
+ return sb.toString();
}
+
// Inner classes -------------------------------------------------------------------
/*
@@ -2416,8 +2327,7 @@
}
try
{
- // TODO: Make it trace
- log.info("getState:" + DefaultClusteredPostOffice.this.getOfficeName());
+ if (trace) { log.trace(this + " got state"); }
return getStateAsBytes();
}
catch (Exception e)
@@ -2451,9 +2361,8 @@
}
try
{
- // TODO: Make it trace
- log.info("setState:" + DefaultClusteredPostOffice.this.getOfficeName());
processStateBytes(bytes);
+ if (trace) { log.trace(this + " has set state"); }
}
catch (Exception e)
{
@@ -2526,7 +2435,50 @@
public void run()
{
- handleViewAccepted(newView);
+ //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+ //TODO: can't we do the same since this is pretty useful?
+ log.info(DefaultClusteredPostOffice.this + " got new view: " + newView);
+
+ // JGroups will make sure this method is never called by more than one thread concurrently
+
+ View oldView = currentView;
+ currentView = newView;
+
+ try
+ {
+ // Act on membership change, on both cases when an old member left or a new member joined
+
+ if (oldView != null)
+ {
+ for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (!newView.containsMember(address))
+ {
+ // this is where the failover happens, if necessary
+ nodeLeft(address);
+ }
+ }
+ }
+
+ for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (oldView == null || !oldView.containsMember(address))
+ {
+ nodeJoined(address);
+ }
+ }
+
+ sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
}
}
@@ -2558,14 +2510,13 @@
public void receive(Message message)
{
- if (trace) { log.trace(currentNodeId + " received message " + message + " on async channel"); }
+ if (trace) { log.trace(this + " received " + message + " on the ASYNC channel"); }
try
{
byte[] bytes = message.getBuffer();
ClusterRequest request = readRequest(bytes);
-
request.execute(DefaultClusteredPostOffice.this);
}
catch (Throwable e)
@@ -2590,7 +2541,8 @@
{
public Object handle(Message message)
{
- if (trace) { log.info(currentNodeId + " received message " + message + " on sync channel"); }
+ if (trace) { log.trace(DefaultClusteredPostOffice.this + ".RequestHandler received " + message + " on the SYNC channel"); }
+
try
{
byte[] bytes = message.getBuffer();
@@ -2611,37 +2563,23 @@
/*
* We use this class to respond to node address mappings being added or removed from the cluster
- * and then recalculate the node->failover node mapping
- *
+ * and then recalculate the node->failover node mapping.
*/
private class NodeAddressMapListener implements ReplicationListener
{
-
- public void onReplicationChange(Serializable key, Map updatedReplicantMap, boolean added,
- int originatorNodeId)
+ public void onReplicationChange(Serializable key, Map updatedReplicantMap,
+ boolean added, int originatorNodeId)
{
if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
{
- log.info(currentNodeId + " got node address change");
+ log.debug("Cluster map:\n" + dumpClusterMap(updatedReplicantMap));
- try
- {
- //DEBUG only
- dumpNodeIdAddressMap(updatedReplicantMap);
- }
- catch (Exception ignore)
- {
- }
+ // A node-address mapping has been added/removed from global state, we need to update
+ // the failover map.
+ failoverMap = failoverMapper.generateMapping(updatedReplicantMap.keySet());
- //A node-address mapping has been added/removed from global state-
- //We need to update the failover map
- generateFailoverMap(updatedReplicantMap);
+ log.debug("Failover map:\n" + dumpFailoverMap(failoverMap));
}
}
-
- private void generateFailoverMap(Map nodeAddressMap)
- {
- failoverMap = failoverMapper.generateMapping(nodeAddressMap.keySet());
- }
}
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -45,22 +45,21 @@
{
private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
- /*
- * Generate a mapping given a set of nodes - nodes will be sorted by the method
+ /**
+ * Generate a mapping given a set of nodes - nodes will be sorted by the method.
+ *
* @see org.jboss.messaging.core.plugin.contract.FailoverMapper#generateMapping(java.util.Set)
*/
public Map generateMapping(Set nodes)
{
Integer[] nodesArr = (Integer[])nodes.toArray(new Integer[nodes.size()]);
- //First sort them so every node has a consistent view
+ // First sort them so every node has a consistent view
Arrays.sort(nodesArr);
int s = nodes.size();
- log.info("Generating failover mapping, node size= "+ s);
-
- //There is no need for the map to be linked
+ // There is no need for the map to be linked
Map failoverNodes = new HashMap(s);
for (int i = 0; i < s; i++)
@@ -74,7 +73,7 @@
failoverNodes.put(nodesArr[i], nodesArr[j]);
}
-
+
return failoverNodes;
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -24,7 +24,6 @@
import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashSet;
-import java.util.List;
import java.util.Set;
/**
@@ -40,59 +39,81 @@
*/
public class FailoverStatus implements Serializable
{
+ // Constants -----------------------------------------------------
+
private static final long serialVersionUID = -2668162690753929133L;
- //The set of nodes the server has completed failover for since it was last restarted
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // The set of nodes the server has completed failover for since it was last restarted
private Set failedOverForNodes;
-
- //The node the server is currently failing over for (if any)
+
+ // The node the server is currently failing over for (if any)
private int currentlyFailingOverForNode;
-
- //Is the server currently failing over?
+
+ // Is the server currently failing over?
private boolean failingOver;
-
+
+ // Constructors --------------------------------------------------
+
public FailoverStatus()
- {
+ {
failedOverForNodes = new LinkedHashSet();
}
-
- public void startFailingOverForNode(int nodeId)
+
+ // Public --------------------------------------------------------
+
+ public void startFailingOverForNode(Integer nodeID)
{
if (failingOver)
{
throw new IllegalStateException("Already failing over for node " + currentlyFailingOverForNode);
}
-
- currentlyFailingOverForNode = nodeId;
-
+
+ currentlyFailingOverForNode = nodeID.intValue();
failingOver = true;
}
-
+
public void finishFailingOver()
{
if (!failingOver)
{
throw new IllegalStateException("The node is not currently failing over");
}
-
+
failedOverForNodes.add(new Integer(currentlyFailingOverForNode));
-
+
failingOver = false;
}
-
+
public Set getFailedOverForNodes()
{
return Collections.unmodifiableSet(failedOverForNodes);
}
-
+
public boolean isFailedOverForNode(int nodeId)
{
return failedOverForNodes.contains(new Integer(nodeId));
}
-
+
public boolean isFailingOverForNode(int nodeId)
{
return failingOver && currentlyFailingOverForNode == nodeId;
}
-
+
+ public String toString()
+ {
+ return "FailoverStatus[" + currentlyFailingOverForNode + "]";
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
}
Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/tests/bin/runtest 2006-12-13 05:30:32 UTC (rev 1780)
@@ -48,8 +48,6 @@
TEST_REMOTING=$ENV_TEST_REMOTING
fi
-TEST_REMOTING=http
-
#
# We should use the same test execution classpath as the ant <junit> task, so we run ant to get
# it from there.
Modified: trunk/tests/etc/log4j.xml
===================================================================
--- trunk/tests/etc/log4j.xml 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/tests/etc/log4j.xml 2006-12-13 05:30:32 UTC (rev 1780)
@@ -19,7 +19,7 @@
<param name="Append" value="true"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5r %-5p [%c] @%t %m%n"/>
+ <param name="ConversionPattern" value="%d{ABSOLUTE} %-5p @%t [%c{1}] %m%n"/>
</layout>
</appender>
@@ -27,8 +27,9 @@
<errorHandler class="org.jboss.logging.util.OnlyOnceErrorHandler"/>
<param name="Target" value="System.out"/>
<param name="Threshold" value="INFO"/>
+ <!-- <param name="Threshold" value="TRACE#org.jboss.logging.XLevel"/> -->
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%t %d{ABSOLUTE} %-5p [%c{1}] %m%n"/>
+ <param name="ConversionPattern" value="@%t %d{ABSOLUTE} %-5p [%c{1}] %m%n"/>
</layout>
</appender>
@@ -37,7 +38,7 @@
</category>
<category name="org.jgroups">
- <priority value="TRACE"/>
+ <priority value="WARN"/>
</category>
<category name="org.jboss.remoting">
@@ -66,6 +67,10 @@
<priority value="DEBUG"/>
</category>
+ <category name="org.jboss.messaging.core.plugin.JDBCSupport">
+ <priority value="INFO"/>
+ </category>
+
<category name="org.jboss.test.messaging.tools.jmx.MockJBossSecurityManager">
<priority value="DEBUG"/>
</category>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -59,7 +59,7 @@
public void testJoinNotification() throws Exception
{
- ViewChangeNotificationListener listener = new ViewChangeNotificationListener();
+ ClusterEventNotificationListener listener = new ClusterEventNotificationListener();
ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
try
@@ -257,7 +257,7 @@
public void testDirtyLeaveOneNode() throws Exception
{
- ViewChangeNotificationListener viewChange = new ViewChangeNotificationListener();
+ ClusterEventNotificationListener clusterEvent = new ClusterEventNotificationListener();
ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
try
@@ -273,7 +273,7 @@
assertTrue(view.contains(new Integer(0)));
assertTrue(view.contains(new Integer(1)));
- ServerManagement.addNotificationListener(0, postOfficeObjectName, viewChange);
+ ServerManagement.addNotificationListener(0, postOfficeObjectName, clusterEvent);
// Make node 1 to "dirty" leave the cluster, by killing the VM running it.
@@ -285,7 +285,7 @@
// Wait for membership change notification
- if (!viewChange.viewChanged(120000))
+ if (!clusterEvent.viewChanged(120000))
{
fail("Did not receive view change after killing server 2!");
}
@@ -297,7 +297,7 @@
}
finally
{
- ServerManagement.removeNotificationListener(0, postOfficeObjectName, viewChange);
+ ServerManagement.removeNotificationListener(0, postOfficeObjectName, clusterEvent);
ServerManagement.stop(1);
ServerManagement.stop(0);
@@ -306,7 +306,7 @@
public void testDirtyLeaveTwoNodes() throws Exception
{
- ViewChangeNotificationListener viewChange = new ViewChangeNotificationListener();
+ ClusterEventNotificationListener clusterEvent = new ClusterEventNotificationListener();
ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
try
@@ -324,7 +324,7 @@
assertTrue(view.contains(new Integer(1)));
assertTrue(view.contains(new Integer(2)));
- ServerManagement.addNotificationListener(0, postOfficeObjectName, viewChange);
+ ServerManagement.addNotificationListener(0, postOfficeObjectName, clusterEvent);
// Make node 2 to "dirty" leave the cluster, by killing the VM running it.
@@ -334,13 +334,15 @@
log.info("######## KILLED 2");
log.info("########");
- // Wait for membership change notification
+ // Wait for FAILOVER_COMPLETED notification
- if (!viewChange.viewChanged(120000))
+ if (!clusterEvent.failoverCompleted(120000))
{
- fail("Did not receive view change after killing server 2!");
+ fail("Did not receive a FAILOVER_COMPLETED event after killing server 2!");
}
+ log.info("received FAILOVER_COMPLETED");
+
view = ServerManagement.getServer(1).getNodeIDView();
assertEquals(2, view.size());
@@ -355,13 +357,15 @@
log.info("######## KILLED 1");
log.info("########");
- // Wait for membership change notification
+ // Wait for FAILOVER_COMPLETED notification
- if (!viewChange.viewChanged(120000))
+ if (!clusterEvent.failoverCompleted(120000))
{
- fail("Did not receive view change after killing server 1!");
+ fail("Did not receive a FAILOVER_COMPLETED event after killing server 1!");
}
+ log.info("received FAILOVER_COMPLETED");
+
view = ServerManagement.getServer(0).getNodeIDView();
assertEquals(1, view.size());
@@ -370,7 +374,7 @@
}
finally
{
- ServerManagement.removeNotificationListener(0, postOfficeObjectName, viewChange);
+ ServerManagement.removeNotificationListener(0, postOfficeObjectName, clusterEvent);
ServerManagement.stop(2);
ServerManagement.stop(1);
@@ -398,45 +402,71 @@
// Inner classes -------------------------------------------------
- private class ViewChangeNotificationListener implements NotificationListener
+ private class ClusterEventNotificationListener implements NotificationListener
{
- private Slot slot;
+ private Slot viewChange;
+ private Slot failoverCompleted;
- ViewChangeNotificationListener()
+ ClusterEventNotificationListener()
{
- slot = new Slot();
+ viewChange = new Slot();
+ failoverCompleted = new Slot();
}
public void handleNotification(Notification notification, Object object)
{
+ String type = notification.getType();
- if (!ClusteredPostOffice.VIEW_CHANGED_NOTIFICATION.equals(notification.getType()))
+ log.info("received " + type + " notification");
+
+ if (ClusteredPostOffice.VIEW_CHANGED_NOTIFICATION.equals(type))
{
- // ignore it
- return;
+ try
+ {
+ viewChange.put(Boolean.TRUE);
+ }
+ catch(InterruptedException e)
+ {
+ log.error(e);
+ }
}
-
- log.info("received VIEW_CHANGED notification");
-
- try
+ else if (ClusteredPostOffice.FAILOVER_COMPLETED_NOTIFICATION.equals(type))
{
- slot.put(Boolean.TRUE);
+ try
+ {
+ failoverCompleted.put(Boolean.TRUE);
+ }
+ catch(InterruptedException e)
+ {
+ log.error(e);
+ }
}
- catch(InterruptedException e)
+ else
{
- log.error(e);
+ log.info("Ignoring notification " + type);
}
}
public boolean viewChanged(long timeout) throws InterruptedException
{
- Boolean result = (Boolean)slot.poll(timeout);
+ Boolean result = (Boolean)viewChange.poll(timeout);
if (result == null)
{
return false;
}
return result.booleanValue();
}
+
+ public boolean failoverCompleted(long timeout) throws InterruptedException
+ {
+ Boolean result = (Boolean)failoverCompleted.poll(timeout);
+ if (result == null)
+ {
+ return false;
+ }
+ return result.booleanValue();
+ }
+
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -136,7 +136,7 @@
startServerPeer(serverIndex, null, null, sc.isClustered());
- log.info("server started");
+ log.info("Server " + serverIndex + " started");
}
finally
{
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-13 02:01:42 UTC (rev 1779)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java 2006-12-13 05:30:32 UTC (rev 1780)
@@ -56,7 +56,7 @@
public static final String RMI_SERVER_PREFIX = "messaging_rmi_server_";
public static final String NAMING_SERVER_PREFIX = "naming_rmi_server_";
- public static final int DEFAULT_REGISTRY_PORT = 22555;
+ public static final int DEFAULT_REGISTRY_PORT = 33777;
public static final int DEFAULT_SERVER_INDEX = 0;
public static final String DEFAULT_SERVER_HOST = "localhost";
More information about the jboss-cvs-commits
mailing list