[jboss-cvs] JBoss Messaging SVN: r3014 - in trunk/src/main/org/jboss: messaging/core/impl/postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 15 11:25:52 EDT 2007
Author: timfox
Date: 2007-08-15 11:25:52 -0400 (Wed, 15 Aug 2007)
New Revision: 3014
Modified:
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
cleanup groupmember
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-08-15 15:25:52 UTC (rev 3014)
@@ -48,7 +48,6 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.MessagingComponent;
import org.jboss.tm.TransactionManagerLocator;
-import org.jboss.tm.TxManager;
/**
*
@@ -808,11 +807,9 @@
}
//Sanity check
- if (tm instanceof TxManager)
+ if (!(tm instanceof com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple))
{
- log.warn("WARNING! The old JBoss transaction manager is being used. " +
- "This does not have XA transaction recovery functionality. " +
- "For XA transaction recovery please deploy the JBoss Transactions JTA/JTS implementation.");
+ log.warn("WARNING! The JBoss Transactions JTA transaction manager is not be used!");
}
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java 2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupListener.java 2007-08-15 15:25:52 UTC (rev 3014)
@@ -21,6 +21,8 @@
*/
package org.jboss.messaging.core.impl.postoffice;
+import java.util.List;
+
import org.jgroups.Address;
/**
@@ -33,7 +35,7 @@
*/
interface GroupListener
{
- void nodeLeft(Address address) throws Throwable;
+ void nodesLeft(List addresses) throws Throwable;
void nodeJoined(Address address) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-08-15 15:25:52 UTC (rev 3014)
@@ -25,7 +25,9 @@
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.JChannelFactory;
@@ -446,15 +448,19 @@
if (oldView != null)
{
+ List leftNodes = new ArrayList();
for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
{
Address address = (Address)i.next();
if (!newView.containsMember(address))
{
- // this is where the failover happens, if necessary
- groupListener.nodeLeft(address);
+ leftNodes.add(address);
}
}
+ if (!leftNodes.isEmpty())
+ {
+ groupListener.nodesLeft(leftNodes);
+ }
}
for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-15 12:08:02 UTC (rev 3013)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-15 15:25:52 UTC (rev 3014)
@@ -841,85 +841,88 @@
// Currently does nothing
}
-
- /*
- * A node has left the group
- */
- public void nodeLeft(Address address) throws Throwable
+
+ public void nodesLeft(List addresses) throws Throwable
{
- log.debug(this + ": " + address + " left");
+ if (trace) { log.trace("Nodes left " + addresses.size()); }
+
+ Map oldFailoverMap = new HashMap(this.failoverMap);
+
+ int oldFailoverNodeID = failoverNodeID;
+
+ if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+
+ calculateFailoverMap();
+
+ if (trace) { log.trace("First node is now " + firstNode); }
+
+ Iterator iter = addresses.iterator();
+
+ while (iter.hasNext())
+ {
+ Address address = (Address)iter.next();
- Integer leftNodeID = getNodeIDForSyncAddress(address);
+ log.debug(this + ": " + address + " left");
- if (leftNodeID == null)
- {
- throw new IllegalStateException(this + " cannot find node ID for address " + address);
- }
-
- boolean crashed = !leaveMessageReceived(leftNodeID);
-
- log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+ Integer leftNodeID = getNodeIDForSyncAddress(address);
+
+ if (leftNodeID == null)
+ {
+ throw new IllegalStateException(this + " cannot find node ID for address " + address);
+ }
+
+ boolean crashed = !leaveMessageReceived(leftNodeID);
+
+ log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
- // Need to evaluate this before we regenerate the failover map
+ Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
- Integer fnodeID = (Integer)failoverMap.get(leftNodeID);
-
- log.debug(this + " the failover node for the crashed node is " + fnodeID);
+ log.debug(this + " the failover node for the crashed node is " + fnodeID);
- //Recalculate the failover map
+ boolean doneFailover = false;
+
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+
+ clusterNotifier.sendNotification(notification);
- int oldFailoverNodeID = failoverNodeID;
+ if (crashed && isSupportsFailover())
+ {
+ if (fnodeID == null)
+ {
+ throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ }
+
+ if (fnodeID.intValue() == thisNodeID)
+ {
+ // The node crashed and we are the failover node so let's perform failover
+
+ log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+ performFailover(leftNodeID);
+
+ doneFailover = true;
+ }
+ }
- if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
-
- calculateFailoverMap();
-
- if (trace) { log.trace("First node is now " + firstNode); }
-
- boolean doneFailover = false;
-
- ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
-
- clusterNotifier.sendNotification(notification);
-
- if (crashed && isSupportsFailover())
- {
-
- if (fnodeID == null)
+ if (!doneFailover)
{
- throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+ // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
+ // recalculate the connection factory delegates and failover delegates.
+
+ cleanDataForNode(leftNodeID);
}
+
+ if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
- if (fnodeID.intValue() == thisNodeID)
+ if (oldFailoverNodeID != failoverNodeID)
{
- // The node crashed and we are the failover node so let's perform failover
-
- log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-
- performFailover(leftNodeID);
-
- doneFailover = true;
+ //Failover node for this node has changed
+
+ failoverNodeChanged(oldFailoverNodeID, firstNode, false);
}
- }
-
- if (!doneFailover)
- {
- // Remove any replicant data and non durable bindings for the node - This will notify any listeners which will
- // recalculate the connection factory delegates and failover delegates.
-
- cleanDataForNode(leftNodeID);
- }
-
- if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-
- if (oldFailoverNodeID != failoverNodeID)
- {
- //Failover node for this node has changed
-
- failoverNodeChanged(oldFailoverNodeID, firstNode, false);
- }
-
- sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ }
+
+ sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
}
// RequestTarget implementation ------------------------------------------------------------
@@ -1855,66 +1858,63 @@
return removed;
}
- private void calculateFailoverMap()
+ private synchronized void calculateFailoverMap()
{
- synchronized (failoverMap)
+ failoverMap.clear();
+
+ View view = groupMember.getCurrentView();
+
+ Vector members = view.getMembers();
+
+ for (int i = 0; i < members.size(); i++)
{
- failoverMap.clear();
-
- View view = groupMember.getCurrentView();
-
- Vector members = view.getMembers();
-
- for (int i = 0; i < members.size(); i++)
- {
- Address address = (Address)members.get(i);
-
- Integer theNodeID = findNodeIDForAddress(address);
-
- if (theNodeID == null)
- {
- throw new IllegalStateException("Cannot find node id for address " + address);
- }
-
- int j;
-
- if (i != members.size() - 1)
- {
- j = i + 1;
- }
- else
- {
- j = 0;
- }
-
- Address failoverAddress = (Address)members.get(j);
-
- Integer failoverNodeID = this.findNodeIDForAddress(failoverAddress);
-
- if (failoverNodeID == null)
- {
- throw new IllegalStateException("Cannot find node id for address " + failoverAddress);
- }
-
- failoverMap.put(theNodeID, failoverNodeID);
- }
-
- int fid = ((Integer)failoverMap.get(new Integer(thisNodeID))).intValue();
-
- //if we are the first node in the cluster we don't want to be our own failover node!
-
- if (fid == thisNodeID)
- {
- firstNode = true;
- failoverNodeID = -1;
- }
- else
- {
- failoverNodeID = fid;
- firstNode = false;
- }
- }
+ Address address = (Address)members.get(i);
+
+ Integer theNodeID = findNodeIDForAddress(address);
+
+ if (theNodeID == null)
+ {
+ throw new IllegalStateException("Cannot find node id for address " + address);
+ }
+
+ int j;
+
+ if (i != members.size() - 1)
+ {
+ j = i + 1;
+ }
+ else
+ {
+ j = 0;
+ }
+
+ Address failoverAddress = (Address)members.get(j);
+
+ Integer failoverNodeID = this.findNodeIDForAddress(failoverAddress);
+
+ if (failoverNodeID == null)
+ {
+ throw new IllegalStateException("Cannot find node id for address " + failoverAddress);
+ }
+
+ failoverMap.put(theNodeID, failoverNodeID);
+ }
+ int fid = ((Integer)failoverMap.get(new Integer(thisNodeID))).intValue();
+
+ //if we are the first node in the cluster we don't want to be our own failover node!
+
+ if (fid == thisNodeID)
+ {
+ firstNode = true;
+ failoverNodeID = -1;
+ }
+ else
+ {
+ failoverNodeID = fid;
+ firstNode = false;
+ }
+
log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));
}
More information about the jboss-cvs-commits
mailing list