[jboss-cvs] JBoss Messaging SVN: r8220 - in branches/port1842: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 14 11:29:07 EST 2011
Author: gaohoward
Date: 2011-02-14 11:29:07 -0500 (Mon, 14 Feb 2011)
New Revision: 8220
Modified:
branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix regression, NPE from past patch, refactor a bit
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -137,7 +137,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -133,7 +133,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -133,7 +133,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = CURRENT_TIMESTAMP WHERE NODE_ID = ?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?
]]></attribute>
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -135,7 +135,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = NDBCLUSTER
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -137,7 +137,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATE, STATE INTEGER, PRIMARY KEY(NODE_ID))
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -133,7 +133,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP TIMESTAMP, STATE INTEGER, PRIMARY KEY(NODE_ID))
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2011-02-14 16:29:07 UTC (rev 8220)
@@ -138,7 +138,7 @@
CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
]]></attribute>
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2011-02-14 16:29:07 UTC (rev 8220)
@@ -204,7 +204,21 @@
//Now connect the data channel.
future.get();
}
+
+ public boolean requestState() throws Exception
+ {
+ boolean stateGot = controlChannel.getState(null, stateTimeout);
+ waitForState();
+ return stateGot;
+ }
+ public boolean requestState(Address addr) throws Exception
+ {
+ boolean stateGot = controlChannel.getState(addr, stateTimeout);
+ waitForState();
+ return stateGot;
+ }
+
private Future<String> connectDataChannel()
{
Callable<String> dataRunner = new Callable<String> () {
Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2011-02-14 16:29:07 UTC (rev 8220)
@@ -262,13 +262,9 @@
private boolean keepOldFailoverModel = true;
private Object jgroupsLock = new Object();
-
- private boolean autoRestarted = false;
-
- private boolean jgroupsBack = false;
-
- private Address justJoined;
+ private List<Address> newNodes;
+
// Constructors ---------------------------------------------------------------------------------
public boolean isFailoverOnNodeLeave()
@@ -418,20 +414,9 @@
//Sanity check - we check there aren't any other nodes already in the cluster with the same node id
if (knowAboutNodeId(thisNodeID))
{
- if (this.keepOldFailoverModel)
- {
- throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+ throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
"cluster with the same node id (" + thisNodeID + "). " +
"Are you sure you have given each node a unique node id during installation?");
- }
- else
- {
- log.info("The node id already in the state. This could happen in case of an auto-restart where JGroups is already" + " normal when post office restarts.");
- synchronized (jgroupsLock)
- {
- jgroupsBack = true;
- }
- }
}
PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
@@ -746,8 +731,10 @@
Timestamp t = res.getTimestamp(2);
long timestamp = t.getTime();
int nodeState = res.getInt(3);
+ t = res.getTimestamp(4);
+ long checkPoint = t.getTime();
- clusterState.addNode(nodeID, timestamp, nodeState);
+ clusterState.addNode(nodeID, timestamp, nodeState, checkPoint);
log.debug("Added cluster node state: nodeID = " + nodeID + ", timestamp = " + timestamp + ", nodeState = " + nodeState);
}
}
@@ -765,7 +752,7 @@
{
r.executeOnlyOnce();
}
- catch (Exception e)
+ catch (Throwable e)
{
// ignore;
}
@@ -1429,73 +1416,51 @@
return;
Integer newNode = findNodeIDForAddress(address);
- synchronized (jgroupsLock)
+ if (newNode == null)
{
- if (autoRestarted)
+ //node not already joined. This could happen when a node is auto restarted
+ //when the jgroups is not working properly
+ synchronized (jgroupsLock)
{
- justJoined = address;
- autoRestarted = false;
+ newNodes.add(address);
+ jgroupsLock.notify();
}
- }
-
- if (newNode == null)
- {
- // newly Joined node not added yet.
return;
}
-
- QuarantinedNode qNode = suspectedNodes.remove(newNode);
-
- if (qNode == null)
- return;
-
- log.debug("A quarantined node " + qNode + " re-joined cluster.");
-
- // if I am the quarantined node, I do rejoin.
- if (clusterState.isQuarantined(thisNodeID))
+ else
{
- new Thread()
+ QuarantinedNode qNode = suspectedNodes.remove(newNode);
+
+ if (qNode == null)
+ return;
+
+ log.debug("A quarantined node " + qNode + " re-joined cluster.");
+
+ // if I am the quarantined node, I do rejoin.
+ if (clusterState.isQuarantined(thisNodeID))
{
- public void run()
+ new Thread()
{
- PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
- groupMember.getDataChannelAddress());
- try
+ public void run()
{
- groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
- updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
+ PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+ groupMember.getDataChannelAddress());
+ try
+ {
+ groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+ calculateFailoverMap();
+ updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
+ }
+ catch (Exception e)
+ {
+ log.error("error sending node join request!", e);
+ }
}
- catch (Exception e)
- {
- log.error("error sending node join request!", e);
- }
- }
- }.start();
+ }.start();
+ }
}
}
- private void requestState(final Address address) throws Exception
- {
- Thread requester = new Thread()
- {
- public void run()
- {
- StateRequest request = new StateRequest();
- try
- {
- byte[] state = (byte[])groupMember.unicastRequest(request, address);
- setState(state);
- }
- catch (Exception e)
- {
- log.error("Error getting state from " + address, e);
- }
- }
- };
- requester.start();
- requester.join();
- }
-
public void nodesLeft(List addresses) throws Throwable
{
if (trace)
@@ -1558,6 +1523,10 @@
log.debug(this + " the failover node for the crashed node is " + fnodeID);
boolean doneFailover = false;
+
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+
+ clusterNotifier.sendNotification(notification);
if (crashed && isSupportsFailover())
{
@@ -1687,7 +1656,7 @@
suspectedNodes.put(leftNodeID, qNode);
- if (isFirstNode())
+ if (isFirstNode() && (clusterState.liveNodeNum() > 1))
{
try
{
@@ -1843,52 +1812,56 @@
public void handleNodeJoined(int nodeId, PostOfficeAddressInfo info) throws Exception
{
- nodeIDAddressMap.put(new Integer(nodeId), info);
-
- log.debug(this + " handleNodeJoined: " + nodeId + " size: " + nodeIDAddressMap.size());
-
- final int oldFailoverNodeID = this.failoverNodeID;
-
- boolean wasFirstNode = this.firstNode;
-
- calculateFailoverMap();
-
- if (wasFirstNode && useJGroupsWorkaround)
- {
- //If we were the first node but now another node has joined - we need to re-enable the semaphore
- replicateSemaphore.enable();
- }
-
- //Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's queues
- //the data is requested by the new node when it deploys its queues
-
- if (!wasFirstNode && oldFailoverNodeID != this.failoverNodeID)
- {
- //Need to execute this on it's own thread since it uses the MessageDispatcher
-
- new Thread(
- new Runnable() {
- public void run()
- {
- try
- {
- failoverNodeChanged(oldFailoverNodeID, firstNode, true);
- }
- catch (Exception e)
- {
- log.error("Failed to process failover node changed", e);
- }
- }
- }).start();
- }
-
- // Send a notification
-
- ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_JOIN, nodeId, null);
-
- clusterNotifier.sendNotification(notification);
-
- sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ synchronized (jgroupsLock)
+ {
+ nodeIDAddressMap.put(new Integer(nodeId), info);
+
+ log.debug(this + " handleNodeJoined: " + nodeId + " size: " + nodeIDAddressMap.size());
+
+ final int oldFailoverNodeID = this.failoverNodeID;
+
+ boolean wasFirstNode = this.firstNode;
+
+ calculateFailoverMap();
+
+ if (wasFirstNode && useJGroupsWorkaround)
+ {
+ // If we were the first node but now another node has joined - we need to re-enable the semaphore
+ replicateSemaphore.enable();
+ }
+
+ // Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's
+ // queues
+ // the data is requested by the new node when it deploys its queues
+
+ if (!wasFirstNode && oldFailoverNodeID != this.failoverNodeID)
+ {
+ // Need to execute this on it's own thread since it uses the MessageDispatcher
+
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ failoverNodeChanged(oldFailoverNodeID, firstNode, true);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to process failover node changed", e);
+ }
+ }
+ }).start();
+ }
+
+ // Send a notification
+
+ ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_JOIN, nodeId, null);
+
+ clusterNotifier.sendNotification(notification);
+
+ sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+ }
}
//some node is finally dead and failed over
@@ -2279,7 +2252,7 @@
"UPDATE JBM_CLUSTER SET PING_TIMESTAMP = CURRENT_TIMESTAMP WHERE NODE_ID = ?");
map.put("LOAD_CLUSTER_STATE",
- "SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER");
+ "SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER");
map.put("INSERT_NODE_STATE",
"INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
@@ -2434,6 +2407,8 @@
failoverMap = new ConcurrentHashMap();
leftSet = new ConcurrentHashSet();
+
+ newNodes = new ArrayList<Address>();
}
replyExecutor = executorFactory.getExecutor("jbm-reply-executor");
@@ -2458,6 +2433,8 @@
failoverMap = null;
leftSet = null;
+
+ newNodes = null;
}
replyExecutor.shutdownNow();
@@ -3948,12 +3925,10 @@
//Now clean the data for the failed node
//TODO - does this need to be inside the lock above?
- if (keepOldFailoverModel)
+ cleanDataForNode(failedNodeID);
+
+ if (!keepOldFailoverModel)
{
- cleanDataForNode(failedNodeID);
- }
- else
- {
notification = new ClusterNotification(ClusterNotification.TYPE_NODE_FAILEDOVER, failedNodeID.intValue(), null);
clusterNotifier.sendNotification(notification);
@@ -4208,9 +4183,9 @@
{
}
- public void addNode(int nodeID, long timestamp, int nodeState)
+ public void addNode(int nodeID, long timestamp, int nodeState, long checkPoint)
{
- states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
+ states.put(nodeID, new NodeState(nodeID, timestamp, nodeState, checkPoint));
}
public ClusterState copy()
@@ -4237,9 +4212,19 @@
return nState.isDead();
}
- public int nodeNum()
+ public int liveNodeNum()
{
- return states.size();
+ int liveNum = 0;
+ Iterator<NodeState> iter = states.values().iterator();
+ while (iter.hasNext())
+ {
+ NodeState s = iter.next();
+ if (!s.isDead())
+ {
+ liveNum++;
+ }
+ }
+ return liveNum;
}
}
@@ -4250,12 +4235,15 @@
private long timestamp;
private int state;
+
+ private long checkPoint;
- public NodeState(int nodeID, long timestamp, int state)
+ public NodeState(int nodeID, long timestamp, int state, long checkPoint)
{
this.nodeID = nodeID;
this.timestamp = timestamp;
this.state = state;
+ this.checkPoint = checkPoint;
}
public void setState(int newState)
@@ -4276,8 +4264,7 @@
// don't rely on state, use timestamp
public boolean isDead()
{
- long currentTime = System.currentTimeMillis();
- long stampAge = currentTime - timestamp;
+ long stampAge = checkPoint - timestamp;
if (stampAge > (2 * nodeStateRefreshInterval))
{
log.debug("Timestamp age of " + stampAge + "ms exceeds limit of " + (2 * nodeStateRefreshInterval) + "ms; treating node as dead.");
@@ -4325,28 +4312,24 @@
public void waitForJGroups()
{
+ log.info("Waiting for JGroups...");
synchronized (jgroupsLock)
{
- autoRestarted = true;
//if I am alone but there are still others there
- while (isFirstNode() && (clusterState.nodeNum() > 1))
+ while (isFirstNode())
{
- try
+ int n = clusterState.liveNodeNum();
+ if (n == 1)
{
- View v = groupMember.getCurrentView();
-
- if ((v != null) && v.size() > 1)
- break;
-
- jgroupsLock.wait(5000);
+ //only me in the cluster!
+ break;
}
- catch (InterruptedException e)
+
+ if (newNodes.size() == n)
{
+ //all nodes joined
+ break;
}
- }
- log.info("JGroups starts to work again, waiting for state.");
- while (autoRestarted && (!jgroupsBack))
- {
try
{
jgroupsLock.wait(5000);
@@ -4356,43 +4339,51 @@
}
}
- if (!jgroupsBack)
+ log.info("JGroups starts to work again, initializing state");
+
+ //if we have new nodes, we need to
+ // 1) request states
+ // 2) announce myself
+ // 3) clear newNodes list
+ // then we are ready to go
+ if (isFirstNode() && (newNodes.size() > 1))
{
try
{
PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
groupMember.getDataChannelAddress());
- nodeIDAddressMap.put(new Integer(thisNodeID), info);
-
- String clientVMId = JMSClientVMIdentifier.instance;
-
- log.info("putting replicat");
-
- // add our vm identifier to the replicator
- put(Replicator.JVM_ID_KEY, clientVMId);
-
- log.info("multicast...");
-
- groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
-
- log.info("requesting new state from failover node " + failoverNodeID + " address: " + justJoined);
-
- if (justJoined != null)
+ // requestState
+ if (!groupMember.requestState())
{
- requestState(justJoined);
+ log.info("couldn't get state, we are the first (coordinator).");
+ for (Address addr : newNodes)
+ {
+ if (!addr.equals(info.getControlChannelAddress()))
+ {
+ groupMember.requestState(addr);
+ break;
+ }
+ }
}
+ nodeIDAddressMap.put(new Integer(thisNodeID), info);
- // calculate the failover map
+ // calculate failover map
calculateFailoverMap();
- log.info("new failover map: " + this.dumpFailoverMap(this.failoverMap));
-
- log.info("state request got.");
+ // announce myself again
+ groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+ String clientVMId = JMSClientVMIdentifier.instance;
+
+ // we do it again as the jgroups was bad before
+ put(Replicator.JVM_ID_KEY, clientVMId);
+ // clear new nodes
+ newNodes.clear();
}
catch (Exception e)
{
- log.error("Error rejoining the cluster", e);
+ log.error("Error initializing state", e);
}
}
log.info("Now node is ready for work.");
More information about the jboss-cvs-commits
mailing list