[jboss-cvs] JBoss Messaging SVN: r8611 - branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 18 03:48:10 EDT 2013
Author: raggz
Date: 2013-09-18 03:48:09 -0400 (Wed, 18 Sep 2013)
New Revision: 8611
Modified:
branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Back port of JBMessaging-1896, JBMessaging-1901,JBMessaging-1931
Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2013-09-13 06:40:04 UTC (rev 8610)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP14_JBMESSAGING-1896_JBMESSAGING-1901_JBMESSAGING-1931/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2013-09-18 07:48:09 UTC (rev 8611)
@@ -257,6 +257,7 @@
private Object viewUpdateLock;
private boolean stopUpdate = false;
private boolean updateInProcess = false;
+ private Thread updateThread;
private StateMonitor stateMonitor = null;
@@ -277,6 +278,7 @@
private Set<Condition> inactiveConditions = new HashSet<Condition>();
+ private int initMembers;
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -458,7 +460,7 @@
started = true;
- log.debug(this + " started");
+ log.debug(this + " started with initMember " + initMembers);
}
//this method will trigger a dedicated thread to write time stamp
@@ -497,6 +499,7 @@
if (row == 0)
{
+ log.trace("inserting this node " + thisNodeID + " to the DB table");
ps1 = conn.prepareStatement(getSQLStatement("INSERT_NODE_STATE"));
ps1.setInt(1, thisNodeID);
ps1.setInt(2, STATE_CLUSTERED);
@@ -735,6 +738,7 @@
if (result > 0)
{
+ timestampDone = true;
log.debug("Successfully updated cluster health timestamp for node " + thisNodeID);
}
else if (result == 0)
@@ -742,8 +746,6 @@
log.debug("Cluster health timestamp update for node " + thisNodeID + " failed!");
}
- timestampDone = true;
-
synchronized (clusterState)
{
clusterState.clear();
@@ -839,8 +841,12 @@
{
if (stopUpdate) return;
stopUpdate = true;
- while (updateInProcess)
+ if (trace)
{
+ log.trace("updateThread is " + updateThread);
+ }
+ while (updateInProcess && (updateThread != Thread.currentThread()))
+ {
try
{
log.info("Waiting for view update finish before stop post office " + this);
@@ -860,15 +866,31 @@
{
if (stopUpdate) return false;
updateInProcess = true;
+ updateThread = Thread.currentThread();
}
return true;
}
+ //force view update stop, don't wait
+ public void disableViewUpdate()
+ {
+ if (!clustered) return;
+ synchronized (viewUpdateLock)
+ {
+ if (stopUpdate) return;
+ stopUpdate = true;
+ updateThread = null;
+ viewUpdateLock.notify();
+ }
+ log.trace("View update disabled");
+ }
+
public void endProcessView()
{
synchronized (viewUpdateLock)
{
updateInProcess = false;
+ updateThread = null;
viewUpdateLock.notify();
}
}
@@ -1740,7 +1762,7 @@
catch (Exception e)
{
log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
- this.serverPeer.stopJBMNodeForRecovery();
+ stateMonitor.stopJBMNodeForRecovery();
return;
}
}
@@ -2906,8 +2928,17 @@
//This is ok - it wil be shortly followed by another calculation of the map
}
+ initMembers = initMembers < failoverMap.size() ? failoverMap.size() : initMembers;
+
+ log.trace("initMembers updated to: " + initMembers);
log.debug("Updated failover map:\n" + dumpFailoverMap(failoverMap));
}
+
+ private boolean iAmAlone()
+ {
+ if (trace) { log.trace("If I am alone in failover Map: " + failoverMap.size() + " initMembers: " + initMembers); }
+ return (failoverMap.size() == 1) && (initMembers > 1);
+ }
private Integer findNodeIDForAddress(Address address)
{
@@ -4228,11 +4259,16 @@
}
}
+ public synchronized void stopJBMNodeForRecovery()
+ {
+ disableViewUpdate();
+ serverPeer.stopJBMNodeForRecovery();
+ }
+
public synchronized void run()
{
do
{
- ClusterState newState;
boolean timeStampDone = false;
try
{
@@ -4242,10 +4278,10 @@
{
processClusterState();
}
- else if (clusterState.isQuarantined(thisNodeID))
+ else if (clusterState.isQuarantined(thisNodeID) || iAmAlone())
{
log.error("I'm orphaned and now I can't tell others that I'm alive. Shutdown node: " + thisNodeID);
- serverPeer.stopJBMNodeForRecovery();
+ stopJBMNodeForRecovery();
working = false;
nodeStateRefreshInterval = 1; //let the thread quite quickly.
}
More information about the jboss-cvs-commits
mailing list