[jboss-cvs] JBoss Messaging SVN: r8220 - in branches/port1842: src/main/org/jboss/messaging/core/impl/postoffice and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Feb 14 11:29:07 EST 2011


Author: gaohoward
Date: 2011-02-14 11:29:07 -0500 (Mon, 14 Feb 2011)
New Revision: 8220

Modified:
   branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
fix regression, NPE from past patch, refactor a bit


Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -137,7 +137,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -133,7 +133,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -133,7 +133,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = CURRENT_TIMESTAMP WHERE NODE_ID = ?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?
       ]]></attribute>

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -135,7 +135,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = NDBCLUSTER
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -137,7 +137,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATE, STATE INTEGER, PRIMARY KEY(NODE_ID))
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -133,7 +133,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP TIMESTAMP, STATE INTEGER, PRIMARY KEY(NODE_ID))
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-02-14 16:29:07 UTC (rev 8220)
@@ -138,7 +138,7 @@
 CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
 UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
 UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
-LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
 INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
 DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-02-14 16:29:07 UTC (rev 8220)
@@ -204,7 +204,21 @@
       //Now connect the data channel.
       future.get();
    }
+   
+   public boolean requestState() throws Exception
+   {
+      boolean stateGot = controlChannel.getState(null, stateTimeout);
+      waitForState();
+      return stateGot;
+   }
 
+   public boolean requestState(Address addr) throws Exception
+   {
+      boolean stateGot = controlChannel.getState(addr, stateTimeout);
+      waitForState();
+      return stateGot;
+   }
+
    private Future<String> connectDataChannel()
    {
       Callable<String> dataRunner = new Callable<String> () {

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-10 14:10:59 UTC (rev 8219)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-02-14 16:29:07 UTC (rev 8220)
@@ -262,13 +262,9 @@
    private boolean keepOldFailoverModel = true;
 
    private Object jgroupsLock = new Object();
-   
-   private boolean autoRestarted = false;
-   
-   private boolean jgroupsBack = false;
-   
-   private Address justJoined;
 
+   private List<Address> newNodes;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -418,20 +414,9 @@
 	      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
 	      if (knowAboutNodeId(thisNodeID))
 	      {
-            if (this.keepOldFailoverModel)
-            {
-               throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " + 
+            throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " + 
                                                   "cluster with the same node id (" + thisNodeID + "). " +
                                                   "Are you sure you have given each node a unique node id during installation?");
-            }
-            else
-            {
-               log.info("The node id already in the state. This could happen in case of an auto-restart where JGroups is already" + " normal when post office restarts.");
-               synchronized (jgroupsLock)
-               {
-                  jgroupsBack = true;
-               }
-            }
 	      }
 	
 	      PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(), groupMember.getDataChannelAddress());
@@ -746,8 +731,10 @@
                         Timestamp t = res.getTimestamp(2);
                         long timestamp = t.getTime();
                         int nodeState = res.getInt(3);
+                        t = res.getTimestamp(4);
+                        long checkPoint = t.getTime();
    
-                        clusterState.addNode(nodeID, timestamp, nodeState);
+                        clusterState.addNode(nodeID, timestamp, nodeState, checkPoint);
                         log.debug("Added cluster node state: nodeID = " + nodeID + ", timestamp = " + timestamp + ", nodeState = " + nodeState);
                      }
                   }
@@ -765,7 +752,7 @@
          {
             r.executeOnlyOnce();
          }
-         catch (Exception e)
+         catch (Throwable e)
          {
             // ignore;
          }
@@ -1429,73 +1416,51 @@
          return;
       Integer newNode = findNodeIDForAddress(address);
 
-      synchronized (jgroupsLock)
+      if (newNode == null)
       {
-         if (autoRestarted)
+         //node not already joined. This could happen when a node is auto restarted
+         //when the jgroups is not working properly
+         synchronized (jgroupsLock)
          {
-            justJoined = address;
-            autoRestarted = false;
+            newNodes.add(address);
+            jgroupsLock.notify();
          }
-      }
-
-      if (newNode == null)
-      {
-         // newly Joined node not added yet.
          return;
       }
-
-      QuarantinedNode qNode = suspectedNodes.remove(newNode);
-
-      if (qNode == null)
-         return;
-
-      log.debug("A quarantined node " + qNode + " re-joined cluster.");
-
-      // if I am the quarantined node, I do rejoin.
-      if (clusterState.isQuarantined(thisNodeID))
+      else
       {
-         new Thread()
+         QuarantinedNode qNode = suspectedNodes.remove(newNode);
+         
+         if (qNode == null)
+            return;
+         
+         log.debug("A quarantined node " + qNode + " re-joined cluster.");
+         
+         // if I am the quarantined node, I do rejoin.
+         if (clusterState.isQuarantined(thisNodeID))
          {
-            public void run()
+            new Thread()
             {
-               PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
-                                                                      groupMember.getDataChannelAddress());
-               try
+               public void run()
                {
-                  groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
-                  updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
+                  PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+                                                                         groupMember.getDataChannelAddress());
+                  try
+                  {
+                     groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+                     calculateFailoverMap();
+                     updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("error sending node join request!", e);
+                  }
                }
-               catch (Exception e)
-               {
-                  log.error("error sending node join request!", e);
-               }
-            }
-         }.start();
+            }.start();
+         }
       }
    }
 
-   private void requestState(final Address address) throws Exception
-   {
-      Thread requester = new Thread()
-      {
-         public void run()
-         {
-            StateRequest request = new StateRequest();
-            try
-            {
-               byte[] state = (byte[])groupMember.unicastRequest(request, address);
-               setState(state);
-            }
-            catch (Exception e)
-            {
-               log.error("Error getting state from " + address, e);
-            }
-         }
-      };
-      requester.start();
-      requester.join();
-   }
-
    public void nodesLeft(List addresses) throws Throwable
    {
       if (trace)
@@ -1558,6 +1523,10 @@
             log.debug(this + " the failover node for the crashed node is " + fnodeID);
 
             boolean doneFailover = false;
+            
+            ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+            
+            clusterNotifier.sendNotification(notification);
 
             if (crashed && isSupportsFailover())
             {
@@ -1687,7 +1656,7 @@
 
          suspectedNodes.put(leftNodeID, qNode);
 
-         if (isFirstNode())
+         if (isFirstNode() && (clusterState.liveNodeNum() > 1))
          {
             try
             {
@@ -1843,52 +1812,56 @@
    
    public void handleNodeJoined(int nodeId, PostOfficeAddressInfo info) throws Exception
    {	   	   	   	
-   	nodeIDAddressMap.put(new Integer(nodeId), info);
-   	
-   	log.debug(this + " handleNodeJoined: " + nodeId + " size: " + nodeIDAddressMap.size());
-   	   
-   	final int oldFailoverNodeID = this.failoverNodeID;
-   	
-   	boolean wasFirstNode = this.firstNode;
-   	
-   	calculateFailoverMap();
-   	
-   	if (wasFirstNode && useJGroupsWorkaround)
-   	{
-   		//If we were the first node but now another node has joined - we need to re-enable the semaphore
-   		replicateSemaphore.enable();
-   	}
-   	
-   	//Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's queues
-   	//the data is requested by the new node when it deploys its queues      
-   	
-   	if (!wasFirstNode && oldFailoverNodeID != this.failoverNodeID)
-   	{
-   		//Need to execute this on it's own thread since it uses the MessageDispatcher
-   		
-   		new Thread(
-	   		new Runnable() { 
-	   			public void run()
-	   			{
-	   				try
-	   				{
-	   					failoverNodeChanged(oldFailoverNodeID, firstNode, true);
-	   				}
-	   				catch (Exception e)
-	   				{
-	   					log.error("Failed to process failover node changed", e);
-	   				}
-	   			}
-	   		}).start();   		   		
-   	}
-   	
-      // Send a notification
-      
-      ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_JOIN, nodeId, null);
-      
-      clusterNotifier.sendNotification(notification);
-      
-      sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+      synchronized (jgroupsLock)
+      {
+         nodeIDAddressMap.put(new Integer(nodeId), info);
+
+         log.debug(this + " handleNodeJoined: " + nodeId + " size: " + nodeIDAddressMap.size());
+
+         final int oldFailoverNodeID = this.failoverNodeID;
+
+         boolean wasFirstNode = this.firstNode;
+
+         calculateFailoverMap();
+
+         if (wasFirstNode && useJGroupsWorkaround)
+         {
+            // If we were the first node but now another node has joined - we need to re-enable the semaphore
+            replicateSemaphore.enable();
+         }
+
+         // Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's
+         // queues
+         // the data is requested by the new node when it deploys its queues
+
+         if (!wasFirstNode && oldFailoverNodeID != this.failoverNodeID)
+         {
+            // Need to execute this on it's own thread since it uses the MessageDispatcher
+
+            new Thread(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     failoverNodeChanged(oldFailoverNodeID, firstNode, true);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to process failover node changed", e);
+                  }
+               }
+            }).start();
+         }
+
+         // Send a notification
+
+         ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_JOIN, nodeId, null);
+
+         clusterNotifier.sendNotification(notification);
+
+         sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+      }
    }
 
    //some node is finally dead and failed over
@@ -2279,7 +2252,7 @@
               "UPDATE JBM_CLUSTER SET PING_TIMESTAMP = CURRENT_TIMESTAMP WHERE NODE_ID = ?");
 
       map.put("LOAD_CLUSTER_STATE",
-              "SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER");
+              "SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER");
 
       map.put("INSERT_NODE_STATE",
               "INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
@@ -2434,6 +2407,8 @@
          failoverMap = new ConcurrentHashMap();
 
          leftSet = new ConcurrentHashSet();
+         
+         newNodes = new ArrayList<Address>();
       }
       
       replyExecutor = executorFactory.getExecutor("jbm-reply-executor");
@@ -2458,6 +2433,8 @@
          failoverMap = null;
 
          leftSet = null;
+         
+         newNodes = null;
       }
    	
    	replyExecutor.shutdownNow();   
@@ -3948,12 +3925,10 @@
       //Now clean the data for the failed node
       
       //TODO - does this need to be inside the lock above?
-      if (keepOldFailoverModel)
+      cleanDataForNode(failedNodeID);
+      
+      if (!keepOldFailoverModel)
       {
-         cleanDataForNode(failedNodeID);
-      }
-      else
-      {
          notification = new ClusterNotification(ClusterNotification.TYPE_NODE_FAILEDOVER, failedNodeID.intValue(), null);
       
          clusterNotifier.sendNotification(notification);
@@ -4208,9 +4183,9 @@
       {
       }
 
-      public void addNode(int nodeID, long timestamp, int nodeState)
+      public void addNode(int nodeID, long timestamp, int nodeState, long checkPoint)
       {
-         states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
+         states.put(nodeID, new NodeState(nodeID, timestamp, nodeState, checkPoint));
       }
 
       public ClusterState copy()
@@ -4237,9 +4212,19 @@
          return nState.isDead();
       }
 
-      public int nodeNum()
+      public int liveNodeNum()
       {
-         return states.size();
+         int liveNum = 0;
+         Iterator<NodeState> iter = states.values().iterator();
+         while (iter.hasNext())
+         {
+            NodeState s = iter.next();
+            if (!s.isDead())
+            {
+               liveNum++;
+            }
+         }
+         return liveNum;
       }
    }
 
@@ -4250,12 +4235,15 @@
       private long timestamp;
 
       private int state;
+      
+      private long checkPoint;
 
-      public NodeState(int nodeID, long timestamp, int state)
+      public NodeState(int nodeID, long timestamp, int state, long checkPoint)
       {
          this.nodeID = nodeID;
          this.timestamp = timestamp;
          this.state = state;
+         this.checkPoint = checkPoint;
       }
 
       public void setState(int newState)
@@ -4276,8 +4264,7 @@
       // don't rely on state, use timestamp
       public boolean isDead()
       {
-         long currentTime = System.currentTimeMillis();
-         long stampAge = currentTime - timestamp;
+         long stampAge = checkPoint - timestamp;
          if (stampAge > (2 * nodeStateRefreshInterval))
          {
             log.debug("Timestamp age of " + stampAge + "ms exceeds limit of " + (2 * nodeStateRefreshInterval) + "ms; treating node as dead.");
@@ -4325,28 +4312,24 @@
 
    public void waitForJGroups()
    {
+      log.info("Waiting for JGroups...");
       synchronized (jgroupsLock)
       {
-         autoRestarted = true;
          //if I am alone but there are still others there
-         while (isFirstNode() && (clusterState.nodeNum() > 1))
+         while (isFirstNode())
          {
-            try
+            int n = clusterState.liveNodeNum();
+            if (n == 1)
             {
-               View v = groupMember.getCurrentView();
-
-               if ((v != null) && v.size() > 1)
-                  break;
-
-               jgroupsLock.wait(5000);
+               //only me in the cluster!
+               break;
             }
-            catch (InterruptedException e)
+            
+            if (newNodes.size() == n)
             {
+               //all nodes joined
+               break;
             }
-         }
-         log.info("JGroups starts to work again, waiting for state.");
-         while (autoRestarted && (!jgroupsBack))
-         {
             try
             {
                jgroupsLock.wait(5000);
@@ -4356,43 +4339,51 @@
             }
          }
 
-         if (!jgroupsBack)
+         log.info("JGroups starts to work again, initializing state");
+         
+         //if we have new nodes, we need to
+         // 1) request states
+         // 2) announce myself
+         // 3) clear newNodes list
+         // then we are ready to go
+         if (isFirstNode() && (newNodes.size() > 1))
          {
             try
             {
                PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
                                                                       groupMember.getDataChannelAddress());
 
-               nodeIDAddressMap.put(new Integer(thisNodeID), info);
-
-               String clientVMId = JMSClientVMIdentifier.instance;
-
-               log.info("putting replicat");
-
-               // add our vm identifier to the replicator
-               put(Replicator.JVM_ID_KEY, clientVMId);
-
-               log.info("multicast...");
-
-               groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
-
-               log.info("requesting new state from failover node " + failoverNodeID + " address: " + justJoined);
-
-               if (justJoined != null)
+               // requestState
+               if (!groupMember.requestState())
                {
-                  requestState(justJoined);
+                  log.info("couldn't get state, we are the first  (coordinator).");
+                  for (Address addr : newNodes)
+                  {
+                     if (!addr.equals(info.getControlChannelAddress()))
+                     {
+                        groupMember.requestState(addr);
+                        break;
+                     }
+                  }
                }
+               nodeIDAddressMap.put(new Integer(thisNodeID), info);
 
-               // calculate the failover map
+               // calculate failover map
                calculateFailoverMap();
 
-               log.info("new failover map: " + this.dumpFailoverMap(this.failoverMap));
-
-               log.info("state request got.");
+               // announce myself again
+               groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+               
+               String clientVMId = JMSClientVMIdentifier.instance;
+               
+               // we do it again as the jgroups was bad before
+               put(Replicator.JVM_ID_KEY, clientVMId);
+               // clear new nodes
+               newNodes.clear();
             }
             catch (Exception e)
             {
-               log.error("Error rejoining the cluster", e);
+               log.error("Error initializing state", e);
             }
          }
          log.info("Now node is ready for work.");



More information about the jboss-cvs-commits mailing list