[jboss-cvs] JBoss Messaging SVN: r8197 - in branches/port1842: integration/EAP4/etc/xmdesc and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jan 26 02:03:20 EST 2011


Author: gaohoward
Date: 2011-01-26 02:03:19 -0500 (Wed, 26 Jan 2011)
New Revision: 8197

Added:
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
Modified:
   branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
   branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
   branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
   branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
Log:
1842


Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -134,6 +134,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP INTEGER, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -130,6 +130,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP INTEGER, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -130,6 +130,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -132,6 +132,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = NDBCLUSTER
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is clustered. If you don't want a clustered post office then set to false -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -134,6 +134,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP INTEGER, STATE INTEGER, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -130,6 +130,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -135,6 +135,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP INTEGER, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=? WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
===================================================================
--- branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml	2011-01-26 07:03:19 UTC (rev 8197)
@@ -155,6 +155,18 @@
       <type>boolean</type>
    </attribute>
 
+   <attribute access="read-write" getMethod="getKeepOldFailoverModel" setMethod="setKeepOldFailoverModel">
+      <description>If JBM cluster failover should be in old style, default true</description>
+      <name>KeepOldFailoverModel</name>
+      <type>boolean</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getNodeStateRefreshInterval" setMethod="setNodeStateRefreshInterval">
+      <description>Time (milliseconds) between two consecutive timestamping, default 30000</description>
+      <name>NodeStateRefreshInterval</name>
+      <type>long</type>
+   </attribute>
+
    <!-- Managed operations -->
 
    <operation>

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-01-26 07:03:19 UTC (rev 8197)
@@ -27,6 +27,7 @@
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MessageDispatcher;
 import org.jgroups.blocks.RequestHandler;
+import org.jgroups.jmx.JmxConfigurator;
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
@@ -37,6 +38,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -46,6 +48,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 /**
  * 
  * This class handles the interface with JGroups
@@ -58,6 +63,10 @@
  */
 public class GroupMember
 {
+   public static final String JGROUPS_JMX_DOMAIN = "jboss.jgroups";
+   public static final String CHANNEL_JMX_ATTRIBUTES = "type=channel,cluster=";
+   public static final String PROTOCOL_JMX_ATTRIBUTES = "type=protocol,cluster=";
+   
    public static final String DATA_SUFFIX = "-DATA";
 
    public static final String CONTROL_SUFFIX = "-CTRL";
@@ -93,6 +102,10 @@
    private CountDownLatch latch;
    
    private volatile boolean starting;
+
+   //jmx
+   protected boolean channelRegistered;   
+   protected boolean protocolsRegistered;
    
    //We need to process view changes on a different thread, since if we have more than one node running
    //in the same VM then the thread that sends the leave message ends up executing the view change on the other node
@@ -264,6 +277,11 @@
    
    public void multicastControl(ClusterRequest request, boolean sync) throws Exception
    {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+      
    	if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -296,6 +314,11 @@
    
    public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
    {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+      
    	if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
@@ -327,6 +350,11 @@
    
    public void multicastData(ClusterRequest request) throws Exception
    {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
    	if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
@@ -339,7 +367,12 @@
    
    public void unicastData(ClusterRequest request, Address address) throws Exception
    {
-   	if (ready.get())
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
    	{
 	   	if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
 	
@@ -658,4 +691,71 @@
          }
       }
    }
+   
+   public void registerChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      try
+      {
+         String protocolPrefix = JGROUPS_JMX_DOMAIN + ":" + PROTOCOL_JMX_ATTRIBUTES + channelPartitionName;
+         JmxConfigurator.registerProtocols(server, (JChannel)controlChannel, protocolPrefix);
+         protocolsRegistered = true;
+
+         log.debug("Registed protocol: " + protocolPrefix);
+
+         String name = JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + channelPartitionName;
+         JmxConfigurator.registerChannel((JChannel)controlChannel, server, name);
+         channelRegistered = true;
+
+         log.debug("Registed channel: " + name);
+      }
+      catch (Exception e)
+      {
+         log.error("Caught exception registering channel in JXM", e);
+      }
+   }
+
+   public void unregisterChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      ObjectName on = null;
+      if (channelRegistered)
+      {
+         // Unregister the channel itself
+         try
+         {
+            on = new ObjectName(JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + channelPartitionName);
+            server.unregisterMBean(on);
+         }
+         catch (Exception e)
+         {
+            if (on != null)
+               log.error("Caught exception unregistering channel at " + on, e);
+            else
+               log.error("Caught exception unregistering channel", e);
+         }
+      }
+
+      if (protocolsRegistered)
+      {
+         // Unregister the protocols
+         try
+         {
+            on = new ObjectName(JGROUPS_JMX_DOMAIN + ":*," + PROTOCOL_JMX_ATTRIBUTES + channelPartitionName);
+            Set mbeans = server.queryNames(on, null);
+            if (mbeans != null)
+            {
+               for (Iterator it = mbeans.iterator(); it.hasNext();)
+               {
+                  server.unregisterMBean((ObjectName)it.next());
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            if (on != null)
+               log.error("Caught exception unregistering protocols at " + on, e);
+            else
+               log.error("Caught exception unregistering protocols", e);
+         }
+      }
+   }
 }

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-26 07:03:19 UTC (rev 8197)
@@ -41,6 +41,7 @@
 
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationFilter;
@@ -108,6 +109,14 @@
 
    private static final Logger log = Logger.getLogger(MessagingPostOffice.class);
 
+   public static final int STATE_STANDALONE = 1;
+ 
+   public static final int STATE_CLUSTERED = 2;
+
+   public static final int STATE_QUARANTINED = 3;
+
+   public static final int STATE_DEAD = 4;
+
    //This are only used in testing
    
    public static final String VIEW_CHANGED_NOTIFICATION = "VIEW_CHANGED";
@@ -240,7 +249,17 @@
    private boolean useJGroupsWorkaround;
    
    private boolean failoverOnNodeLeave;
-      
+
+   private StateMonitor stateMonitor = null;
+
+   private Map<Integer, QuarantinedNode> suspectedNodes = new java.util.concurrent.ConcurrentHashMap<Integer, QuarantinedNode>();
+
+   private ClusterState clusterState = new ClusterState();
+
+   private long nodeStateRefreshInterval = 30000;
+
+   private boolean keepOldFailoverModel = true;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -325,7 +344,9 @@
                               boolean failoverOnNodeLeave,
                               int maxRetry,
                               int retryInterval,
-                              boolean retryOnConnectionFailure)
+                              boolean retryOnConnectionFailure,
+                              boolean keepOldFailoverModel,
+                              long nodeStateRefreshInterval)
       throws Exception
    {
    	this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
@@ -339,6 +360,10 @@
 
       this.supportsFailover = supportsFailover;
       
+      this.keepOldFailoverModel = keepOldFailoverModel;
+
+      this.nodeStateRefreshInterval = nodeStateRefreshInterval;
+
       nbSupport = new NotificationBroadcasterSupport();
       
       replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
@@ -402,6 +427,8 @@
 	      put(Replicator.JVM_ID_KEY, clientVMId);
 	      
 	      groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+         initClusterState();
       }
    
       //Now load the bindings for this node
@@ -413,8 +440,260 @@
       log.debug(this + " started");      
    }
 
+   //this method will trigger a dedicated thread to write time stamp
+   private synchronized void initClusterState() throws Exception
+   {
+      if (this.keepOldFailoverModel) return;
+      
+         this.addThisNodeStateInStorage();
+            
+         stateMonitor = new StateMonitor();
+         stateMonitor.start();
+      }
+      
+      private void addThisNodeStateInStorage() throws Exception
+      {
+         if (ds == null)
+         {
+            return;
+         }
+    
+         class AddNodeState extends JDBCTxRunner
+         {
+            public Object doTransaction() throws Exception
+            {
+               PreparedStatement ps  = null;
+               PreparedStatement ps1  = null;
+   
+               try
+               {
+                  ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+   
+                  ps.setInt(1, STATE_CLUSTERED);
+                  ps.setInt(2, thisNodeID);
+   
+                  int row = ps.executeUpdate();
+                  
+                  if (row == 0)
+                  {
+                     ps1 = conn.prepareStatement(getSQLStatement("INSERT_NODE_STATE"));
+                     ps1.setInt(1, thisNodeID);
+                     ps1.setLong(2, System.currentTimeMillis());
+                     ps1.setInt(3, STATE_CLUSTERED);
+   
+                     ps1.executeUpdate();
+                  }
+               }
+               finally
+               {
+                  closeStatement(ps);
+                  closeStatement(ps1);
+               }
+               return null;
+            }
+         }
+         new AddNodeState().executeWithRetry();
+      }
+      
+      private Integer updateStateInStorage(final int nID, final int newState) throws Exception
+      {
+         if (ds == null)
+         {
+            return 0;
+         }
+   
+         class UpdateState extends JDBCTxRunner
+         {
+            public Object doTransaction() throws Exception
+            {
+               PreparedStatement ps  = null;
+               int row = 0;
+               
+               try
+               {
+                  ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+   
+                  ps.setInt(1, newState);
+                  ps.setInt(2, nID);
+   
+                  row = ps.executeUpdate();
+               }
+               finally
+               {
+                  closeStatement(ps);
+               }
+               return new Integer(row);
+            }
+         }
+         return (Integer)new UpdateState().executeWithRetry();
+      }
+      
+      private void cleanUpSuspectedNode(final Integer qNode) throws Exception
+      {
+         QuarantinedNode node = suspectedNodes.remove(qNode);
+         if (node == null)
+         {
+            log.warn("Cannot find the suspected node.");
+         }
+         
+         //clean from DB
+         if (ds != null)
+         {
+            class CleanupDeadNode extends JDBCTxRunner
+            {
+               public Object doTransaction() throws Exception
+               {
+                  PreparedStatement ps = null;
+   
+                  try
+                  {
+                     ps = conn.prepareStatement(getSQLStatement("DELETE_DEAD_NODE"));
+                     ps.setInt(1, qNode);
+   
+                     ps.executeUpdate();
+                  }
+                  finally
+                  {
+                     closeStatement(ps);
+                  }
+                  return null;
+               }
+            }
+            new CleanupDeadNode().executeWithRetry();
+         }
+      }
+      
+      /*
+       * 1. for each dead node, clean up.
+       * 2. for its dead buddy, failover
+       * 3. if itself being quarantined, becomes standalone.
+       */
+      private void processClusterState() throws Exception
+      {
+         Iterator<Integer> iter = suspectedNodes.keySet().iterator();
+         
+         ClusterState clusterStateCopy = clusterState.copy();
+         
+         while (iter.hasNext())
+         {
+            Integer qNodeID = iter.next();
+            
+            Boolean isDead = clusterStateCopy.isNodeDead(qNodeID);
+            
+            if (isDead == null)
+            {
+               //someone else done the job.
+               cleanDataForNode(qNodeID);
+               suspectedNodes.remove(qNodeID);
+            }
+            else if (isDead)
+            {
+   
+               QuarantinedNode qNode = suspectedNodes.get(qNodeID);
+               Integer fNodeID = qNode.getFailover();
+   
+               Integer foNodeID = (Integer)failoverMap.get(fNodeID);
+               if (foNodeID == null)
+               {
+                  throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
+               }
+   
+               //every suspected node must have a one and only failover node.
+               //however, each dead node may have multiple failover nodes that are not in one 'brain'
+               //if a node is quarantined, it suspects all other nodes which could be still in the cluster.
+               //so this left node will be failover node for all of them, at the same time cluster nodes
+               //have a strict failover map. If a node in cluster dies, this left node and it's 'legitimate' failover
+               //node all have chance to do the failover. 
+               if (fNodeID.intValue() == thisNodeID)
+               {
+                  ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, qNodeID.intValue(), null);
+   
+                  clusterNotifier.sendNotification(notification);
+   
+                  // I am the failover node for the dead, perform failover now
+                  if (suspectedNodes.get(qNodeID).shouldFailover() && isSupportsFailover())
+                  {
+                     log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
+   
+                     //update node status. this definitely not me!
+                     this.updateStateInStorage(qNodeID, STATE_DEAD);
+   
+                     performFailover(qNodeID);
+   
+                     // now clean up the quarantined set
+                     cleanUpSuspectedNode(qNodeID);
+                  }
+               }
+               else
+               {
+                  //now clean up the node from this node.
+                  cleanDataForNode(qNodeID);
+                  suspectedNodes.remove(qNodeID);
+               }
+            }
+         }
+      }
+      
+      //timestamp and query the new state from db
+      private void refreshNodeState() throws Exception
+      {
+         if (ds == null)
+         {
+            return;
+         }
+         
+         class RefreshNodeState extends JDBCTxRunner
+         {
+            public Object doTransaction() throws Exception
+            {
+               PreparedStatement ps  = null;
+   
+               try
+               {
+                  //writing timestamp
+                  ps = conn.prepareStatement(getSQLStatement("UPDATE_TIMESTAMP"));
+   
+                  ps.setLong(1, System.currentTimeMillis());
+                  ps.setInt(2, thisNodeID);
+   
+                  ps.executeUpdate();
+                  
+                  synchronized (clusterState)
+                  {
+                     clusterState.clear();
+                     // collect states
+                     ps = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
+                     ResultSet result = ps.executeQuery();
+   
+                     while (result.next())
+                     {
+                        int nodeID = result.getInt(1);
+                        long timestamp = result.getLong(2);
+                        int nodeState = result.getInt(3);
+   
+                        clusterState.addNode(nodeID, timestamp, nodeState);
+                     }
+                  }
+               }
+               finally
+               {
+                  closeStatement(ps);
+               }
+               return null;
+            }
+         }
+         new RefreshNodeState().executeWithRetry();
+      }
+   
+
    public synchronized void stop() throws Exception
-   {      
+   {
+      if (this.stateMonitor != null)
+      {
+         stateMonitor.shutdown();
+         stateMonitor = null;
+      }
+
       if (!started)
    	{
    		log.warn(this + " is not started");
@@ -1058,97 +1337,264 @@
     */
    public void nodeJoined(Address address) throws Exception
    {
-      log.debug(this + ": " + address + " joined");      
+      log.debug(this + ": " + address + " joined");
+      if (this.keepOldFailoverModel)
+         return;
+      Integer newNode = findNodeIDForAddress(address);
+
+      if (newNode == null)
+      {
+         // newly Joined node not added yet.
+         return;
+      }
+
+      QuarantinedNode qNode = suspectedNodes.remove(newNode);
+
+      if (qNode == null)
+         return;
+
+      log.debug("A quarantined node " + qNode + " re-joined cluster.");
+
+      // if I am the quarantined node, I do rejoin.
+      if (clusterState.isQuarantined(thisNodeID))
+      {
+         updateStateInStorage(thisNodeID, STATE_CLUSTERED);
+         new Thread()
+         {
+            public void run()
+            {
+               PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+                                                                      groupMember.getDataChannelAddress());
+               try
+               {
+                  groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+               }
+               catch (Exception e)
+               {
+                  log.error("error sending node join request!", e);
+               }
+            }
+         }.start();
+      }
    }
    
    public void nodesLeft(List addresses) throws Throwable
    {
-   	if (trace) { log.trace("Nodes left " + addresses.size()); }
-   	  	
-   	Map oldFailoverMap = new HashMap(this.failoverMap);
-   	
-   	int oldFailoverNodeID = failoverNodeID;
-   	
-      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }      
-   	
-   	calculateFailoverMap();
-   	
-      if (trace) { log.trace("First node is now " + firstNode); }
-      
+      if (trace)
+      {
+         log.trace("Nodes left " + addresses.size());
+      }
+
+      if (!keepOldFailoverModel)
+      {
+         // quarantine the nodes
+         quarantine(addresses);
+      }
+      else
+      {
+         Map oldFailoverMap = new HashMap(this.failoverMap);
+
+         int oldFailoverNodeID = failoverNodeID;
+
+         if (trace)
+         {
+            log.trace("Old failover node id: " + oldFailoverNodeID);
+         }
+
+         calculateFailoverMap();
+
+         if (trace)
+         {
+            log.trace("First node is now " + firstNode);
+         }
+
+         if (firstNode && this.useJGroupsWorkaround)
+         {
+            // If we are now the first node in the cluster then any outstanding replication requests will not get
+            // responses
+            // so we must release these and we have no more need of a semaphore until another node joins
+            replicateSemaphore.disable();
+         }
+
+         Iterator iter = addresses.iterator();
+
+         while (iter.hasNext())
+         {
+            Address address = (Address)iter.next();
+
+            log.debug(this + ": " + address + " left");
+
+            Integer leftNodeID = getNodeIDForSyncAddress(address);
+
+            if (leftNodeID == null)
+            {
+               throw new IllegalStateException(this + " cannot find node ID for address " + address);
+            }
+
+            boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+
+            log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+
+            Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+
+            log.debug(this + " the failover node for the crashed node is " + fnodeID);
+
+            boolean doneFailover = false;
+
+            if (crashed && isSupportsFailover())
+            {
+               if (fnodeID == null)
+               {
+                  throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+               }
+
+               if (fnodeID.intValue() == thisNodeID)
+               {
+                  // The node crashed and we are the failover node so let's perform failover
+
+                  log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+
+                  performFailover(leftNodeID);
+
+                  doneFailover = true;
+               }
+            }
+
+            if (!doneFailover)
+            {
+               // Remove any replicant data and non durable bindings for the node - This will notify any listeners which
+               // will
+               // recalculate the connection factory delegates and failover delegates.
+
+               cleanDataForNode(leftNodeID);
+            }
+
+            if (trace)
+            {
+               log.trace("First node: " + firstNode +
+                         " oldFailoverNodeID: " +
+                         oldFailoverNodeID +
+                         " failoverNodeID: " +
+                         failoverNodeID);
+            }
+
+            if (oldFailoverNodeID != failoverNodeID)
+            {
+               // Failover node for this node has changed
+
+               failoverNodeChanged(oldFailoverNodeID, firstNode, false);
+            }
+         }
+      }
+      sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+   }
+
+   /**
+    * For each node, update its state to be STATE_QUARANTINED
+    * then store the information for later failover.
+    * @throws Exception 
+    */
+   private void quarantine(List addresses) throws Exception
+   {
+      Map oldFailoverMap = new HashMap(this.failoverMap);
+
+      int oldFailoverNodeID = failoverNodeID;
+
+      if (trace)
+      {
+         log.trace("Old failover node id: " + oldFailoverNodeID);
+      }
+
+      calculateFailoverMap();
+
+      if (trace)
+      {
+         log.trace("First node is now " + firstNode);
+      }
+
       if (firstNode && this.useJGroupsWorkaround)
       {
-      	//If we are now the first node in the cluster then any outstanding replication requests will not get responses
-      	//so we must release these and we have no more need of a semaphore until another node joins
-      	replicateSemaphore.disable();
+         // If we are now the first node in the cluster then any outstanding replication requests will not get responses
+         // so we must release these and we have no more need of a semaphore until another node joins
+         replicateSemaphore.disable();
       }
-            
-   	Iterator iter = addresses.iterator();
-   	
-   	while (iter.hasNext())
-   	{
-   		Address address = (Address)iter.next();
 
-         log.debug(this + ": " + address + " left");
+      Iterator iter = addresses.iterator();
+      while (iter.hasNext())
+      {
+         Address addr = (Address)iter.next();
 
-	      Integer leftNodeID = getNodeIDForSyncAddress(address);
-	
-	      if (leftNodeID == null)
-	      {
-	         throw new IllegalStateException(this + " cannot find node ID for address " + address);
-	      }
-	      
-	      boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
-	
-	      log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
-      
-	      Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
-      
+         Integer leftNodeID = getNodeIDForSyncAddress(addr);
+
+         if (leftNodeID == null)
+         {
+            throw new IllegalStateException(this + " cannot find node ID for address " + addr);
+         }
+
+         boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+
+         log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+
+         Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+
          log.debug(this + " the failover node for the crashed node is " + fnodeID);
-	         
-	      boolean doneFailover = false;
-	      
-	      ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
-	      
-	      clusterNotifier.sendNotification(notification);
-      
-	      if (crashed && isSupportsFailover())
-	      {	      
-		      if (fnodeID == null)
-		      {
-		      	throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
-		      }
-		      
-		      if (fnodeID.intValue() == thisNodeID)
-		      {
-		         // The node crashed and we are the failover node so let's perform failover
-		
-		         log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-		
-		         performFailover(leftNodeID);
-		         
-		         doneFailover = true;
-		      }
-	      }
-      
-	      if (!doneFailover)
-	      {
-		      // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
-		      // recalculate the connection factory delegates and failover delegates.
-		
-		      cleanDataForNode(leftNodeID);
-	      }
-      
-	      if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-	      
-	      if (oldFailoverNodeID != failoverNodeID)
-	      {
-	      	//Failover node for this node has changed
-	      	
-	      	failoverNodeChanged(oldFailoverNodeID, firstNode, false);      	
-	      }
-   	}
-	      
-	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+
+         // if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+         Integer ffNode = (Integer)failoverMap.get(fnodeID);
+         Integer currentFNode;
+         while (ffNode == null)
+         {
+            // Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+            currentFNode = fnodeID;
+            // search old map
+            fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+            // then new map
+            ffNode = (Integer)failoverMap.get(fnodeID);
+         }
+
+         // now the fnodeID is an active node
+         QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+
+         // now we need take care of those already quarantined nodes
+         Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+         while (iterQ.hasNext())
+         {
+            QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+            if (aNode.getFailover().equals(leftNodeID))
+            {
+               // We need to transfer the failover responsibility to the new active one
+               aNode.setFailover(fnodeID);
+            }
+         }
+
+         suspectedNodes.put(leftNodeID, qNode);
+
+         if (isFirstNode())
+         {
+            clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+         }
+
+         if (trace)
+         {
+            log.trace("Quarantined node: " + qNode);
+         }
+      }
+
+      if (trace)
+      {
+         log.trace("First node: " + firstNode +
+                   " oldFailoverNodeID: " +
+                   oldFailoverNodeID +
+                   " failoverNodeID: " +
+                   failoverNodeID);
+      }
+
+      if (oldFailoverNodeID != failoverNodeID)
+      {
+         // Failover node for this node has changed
+         failoverNodeChanged(oldFailoverNodeID, firstNode, false);
+      }
+      stateMonitor.newQuarantined();
    }
    
    // RequestTarget implementation ------------------------------------------------------------
@@ -1318,6 +1764,14 @@
       sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
    }
 
+   //some node is finally dead and failed over
+   //so it's safe to remove it now.
+   public void handleNodeDead(int nodeId)
+   {
+      QuarantinedNode node = suspectedNodes.remove(nodeId);
+      log.info("Quarantined node " + node + " is finally dead.");
+   }
+
    /**
     * @param originatorNodeID - the ID of the node that initiated the modification.
     */
@@ -1688,7 +2142,24 @@
                  "CLUSTERED, " +
                  "ALL_NODES " +
                  "FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?");
+      map.put("UPDATE_STATE",
+                    "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
 
+      map.put("UPDATE_STATE",
+              "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+      map.put("UPDATE_TIMESTAMP",
+              "UPDATE JBM_CLUSTER SET PING_TIMESTAMP = ? WHERE NODE_ID = ?");
+
+      map.put("LOAD_CLUSTER_STATE",
+              "SELECT NODE_ID, PING_TIMESTAMP, STATE FROM JBM_CLUSTER");
+
+      map.put("INSERT_NODE_STATE",
+              "INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
+      
+      map.put("DELETE_DEAD_NODE",
+              "DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?");
+
       return map;
    }
 
@@ -1700,6 +2171,9 @@
               "QUEUE_NAME VARCHAR(255), CONDITION VARCHAR(1023), " +
               "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, " +
               "CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))");
+      map.put("CREATE_CLUSTER_STATE_TABLE", "CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, " + 
+              "PING_TIMESTAMP BIGINT, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB");
+
       return map;
    }
 
@@ -3341,7 +3815,10 @@
       //Now clean the data for the failed node
       
       //TODO - does this need to be inside the lock above?
-      cleanDataForNode(failedNodeID);
+      if (keepOldFailoverModel)
+      {
+         cleanDataForNode(failedNodeID);
+      }
       
       log.debug(this + " announcing that failover procedure is complete");
 
@@ -3476,4 +3953,205 @@
    	
    }
 
+   /*
+    * This thread does the following:
+    * 
+    * Periodically update the timestamp of this node
+    * and monitor its status. If it becomes quarantined, it will
+    * make itself to be a standalone
+    * it also monitors its buddy's state, if it is quarantined, see if it will really die.
+    * When it dies, trigger failover then.
+    */
+   private class StateMonitor extends Thread
+   {
+      private boolean working = true;
+
+      public synchronized void run()
+      {
+         do
+         {
+            ClusterState newState;
+            try
+            {
+               refreshNodeState();
+               processClusterState();
+            }
+            catch (Exception e)
+            {
+               log.error("Error refreshing state of node: " + thisNodeID, e);
+            }
+
+            try
+            {
+               wait(nodeStateRefreshInterval);
+            }
+            catch (InterruptedException e)
+            {
+            }
+
+         }
+         while (working);
+         log.debug("Stop monitoring the stats at node " + thisNodeID);
+      }
+
+      public synchronized void shutdown()
+      {
+         working = false;
+         notify();
+      }
+
+      public synchronized void newQuarantined()
+      {
+         notify();
+      }
+   }
+
+   // not for concurrent use!
+   private class ClusterState
+   {
+      Map<Integer, NodeState> states = new java.util.concurrent.ConcurrentHashMap<Integer, NodeState>();
+
+      private ClusterState(Map<Integer, NodeState> copy)
+      {
+         states = new HashMap<Integer, NodeState>(copy);
+      }
+
+      public void updateNodeState(int nodeID, int newState) throws Exception
+      {
+         NodeState node = states.get(nodeID);
+         node.setState(newState);
+         updateStateInStorage(nodeID, newState);
+      }
+
+      public boolean allDeadButMe(int nodeID)
+      {
+         Iterator<NodeState> iter = states.values().iterator();
+         while (iter.hasNext())
+         {
+            NodeState node = iter.next();
+            if (!node.isDead() && (node.getID() != nodeID))
+            {
+               return false;
+            }
+         }
+         return true;
+      }
+
+      public void clear()
+      {
+         states.clear();
+      }
+
+      public ClusterState()
+      {
+      }
+
+      public void addNode(int nodeID, long timestamp, int nodeState)
+      {
+         states.put(nodeID, new NodeState(nodeID, timestamp, nodeState));
+      }
+
+      public ClusterState copy()
+      {
+         return new ClusterState(states);
+      }
+
+      public boolean isQuarantined(int qNodeID)
+      {
+         NodeState nState = states.get(qNodeID);
+         if (nState == null)
+            return false;
+         return nState.isQurarntined();
+      }
+
+      public Boolean isNodeDead(Integer qNodeID)
+      {
+         NodeState nState = states.get(qNodeID);
+
+         // if doesn't exists, it is already dead.
+         if (nState == null)
+            return null;
+
+         return nState.isDead();
+      }
+   }
+
+   private class NodeState
+   {
+      private int nodeID;
+
+      private long timestamp;
+
+      private int state;
+
+      public NodeState(int nodeID, long timestamp, int state)
+      {
+         this.nodeID = nodeID;
+         this.timestamp = timestamp;
+         this.state = state;
+      }
+
+      public void setState(int newState)
+      {
+         state = newState;
+      }
+
+      public int getID()
+      {
+         return nodeID;
+      }
+
+      public boolean isQurarntined()
+      {
+         return state == STATE_QUARANTINED;
+      }
+
+      // don't rely on state, use timestamp
+      public boolean isDead()
+      {
+         long currentTime = System.currentTimeMillis();
+         long stampAge = currentTime - timestamp;
+         if (stampAge > (2 * nodeStateRefreshInterval))
+         {
+            return true;
+         }
+         return false;
+      }
+   }
+
+   public boolean isAvailable()
+   {
+      return !clusterState.isQuarantined(thisNodeID);
+   }
+
+   public boolean isKeepOldFailoverModel()
+   {
+      return keepOldFailoverModel;
+   }
+
+   public void setKeepOldFailoverModel(boolean isKeep)
+   {
+      keepOldFailoverModel = isKeep;
+   }
+
+   public long getNodeStateRefreshInterval()
+   {
+      return nodeStateRefreshInterval;
+   }
+
+   public void setNodeStateRefreshInterval(long newValue)
+   {
+      nodeStateRefreshInterval = newValue;
+   }
+
+   public void registerChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      groupMember.registerChannelInJmx(server, channelPartitionName);
+   }
+
+   public void unregisterChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      groupMember.unregisterChannelInJmx(server, channelPartitionName);
+   }
+    
 }

Added: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java	                        (rev 0)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java	2011-01-26 07:03:19 UTC (rev 8197)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+/**
+ * A QuarantinedNode
+ *
+ * @author howard
+ * 
+ * Created Jan 18, 2011 11:02:55 AM
+ *
+ *
+ */
+public class QuarantinedNode
+{
+   private Integer nodeID;
+   private Integer failoverID; //it must be an active node at the time of being quarantined.
+   private boolean crashed;
+
+   public QuarantinedNode(Integer leftNodeID, Integer failoverID, boolean crashed)
+   {
+      nodeID = leftNodeID;
+      this.failoverID = failoverID;
+      this.crashed = crashed;
+   }
+   
+   public String toString()
+   {
+      return "Quarantined Node[" + nodeID + "], failover[" + failoverID + "], crashed[" + crashed + "]";
+   }
+
+   public Integer getFailover()
+   {
+      return failoverID;
+   }
+
+   public void setFailover(Integer newFailover)
+   {
+      this.failoverID = newFailover;
+   }
+
+   public boolean shouldFailover()
+   {
+      return crashed;
+   }
+
+}

Modified: branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-01-26 07:03:19 UTC (rev 8197)
@@ -68,4 +68,8 @@
    void handleAddAllReplicatedDeliveries(int nodeID, Map deliveries) throws Exception;
    
    void handleGetReplicatedDeliveries(String queueName, Address returnAddress) throws Exception;
+
+   void handleNodeDead(int nodeId);
+
+   boolean isAvailable();
 }

Modified: branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-01-26 07:03:19 UTC (rev 8197)
@@ -100,6 +100,10 @@
    
    private boolean failoverOnNodeLeave;
 
+   private boolean keepOldFailoverModel = true;
+
+   private long nodeStateRefreshInterval = 30000;
+
    private MessagingPostOffice postOffice;
 
    // Constructors --------------------------------------------------
@@ -345,7 +349,48 @@
 	   }
 	}
 
-   
+   public boolean isKeepOldFailoverModel()
+   {
+      if (started)
+      {
+         return postOffice.isKeepOldFailoverModel();
+      }
+      return keepOldFailoverModel;
+   }
+
+   public void setKeepOldFailoverModel(boolean isKeep)
+   {
+      if (started)
+      {
+         postOffice.setKeepOldFailoverModel(isKeep);
+      }
+      else
+      {
+         keepOldFailoverModel = isKeep;
+      }
+   }
+
+   public long getNodeStateRefreshInterval()
+   {
+      if (started)
+      {
+         return postOffice.getNodeStateRefreshInterval();
+      }
+      return nodeStateRefreshInterval;
+   }
+
+   public void setNodeStateRefreshInterval(long newValue)
+   {
+      if (started)
+      {
+         postOffice.setNodeStateRefreshInterval(newValue);
+      }
+      else
+      {
+         nodeStateRefreshInterval = newValue;
+      }
+   }
+
    public String listBindings()
    {
       return postOffice.printBindingInformation();
@@ -450,7 +495,9 @@
 	                                               failoverOnNodeLeave,
 	                                               maxRetry,
 	                                               retryInterval,
-	                                               retryOnConnectionFailure);
+                                                  retryOnConnectionFailure,
+                                                  keepOldFailoverModel,
+                                                  nodeStateRefreshInterval);
          }
          else
          {
@@ -468,6 +515,18 @@
          postOffice.start();
 
          started = true;
+
+         if (clustered)
+         {
+            try
+            {
+               postOffice.registerChannelInJmx(server, channelPartitionName);
+            }
+            catch (Exception e)
+            {
+               log.error("Caught exception registering channel in JXM", e);
+            }
+         }
       }
       catch (Throwable t)
       {
@@ -482,6 +541,11 @@
          throw new IllegalStateException("Service is not started");
       }
 
+      if (clustered)
+      {
+         postOffice.unregisterChannelInJmx(server, channelPartitionName);
+      }
+
       super.stopService();
 
       try

Modified: branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2011-01-26 06:39:04 UTC (rev 8196)
+++ branches/port1842/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2011-01-26 07:03:19 UTC (rev 8197)
@@ -95,7 +95,7 @@
                                  sc.getPostOfficeSQLProperties(), true, nodeID,
                                  "Clustered", ms, pm, tr, ff, cf, idm, cn,
                                  groupName, jChannelFactory,
-                                 stateTimeout, castTimeout, true, 100, false, 25, 1000, false);
+                                 stateTimeout, castTimeout, true, 100, false, 25, 1000, false, true, 30000);
 
       postOffice.start();
 



More information about the jboss-cvs-commits mailing list