[jboss-cvs] JBoss Messaging SVN: r8233 - in branches/Branch_1_4: docs/examples/ordering-group and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 3 00:45:03 EST 2011


Author: gaohoward
Date: 2011-03-03 00:45:01 -0500 (Thu, 03 Mar 2011)
New Revision: 8233

Added:
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/messaging-cluster-health-mbean-service.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingClusterHealthMBean-xmbean.xml
   branches/Branch_1_4/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ServerFailoverTest2.java
Modified:
   branches/Branch_1_4/
   branches/Branch_1_4/.classpath
   branches/Branch_1_4/docs/examples/ordering-group/
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/ServiceContainer.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/ClusterNotification.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanExtraTest.java
Log:
JBMESSAGING-1842



Property changes on: branches/Branch_1_4
___________________________________________________________________
Added: svn:mergeinfo
   + /branches/JBM1842:8169-8232

Modified: branches/Branch_1_4/.classpath
===================================================================
--- branches/Branch_1_4/.classpath	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/.classpath	2011-03-03 05:45:01 UTC (rev 8233)
@@ -1,6 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
+	<classpathentry kind="src" path="jgroups-src"/>
 	<classpathentry kind="src" path="integration/EAP4/src/main"/>
 	<classpathentry kind="src" path="integration/EAP4/tests-src"/>
 	<classpathentry kind="src" path="docs/examples/bridge/src"/>
@@ -35,6 +36,7 @@
 	<classpathentry kind="lib" path="thirdparty/dom4j/lib/dom4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/javassist/lib/javassist.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jdk14-pluggable-instrumentor.jar"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jrockit-pluggable-instrumentor.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/pluggable-instrumentor.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/jbossxb/lib/jboss-xml-binding.jar"/>
 	<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/backport-util-concurrent.jar"/>
@@ -42,7 +44,7 @@
 	<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-transformer.jar"/>
 	<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar" sourcepath="/home/howard/projects/jboss/ts/a_tmp/jta/classes"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>


Property changes on: branches/Branch_1_4/docs/examples/ordering-group
___________________________________________________________________
Deleted: svn:mergeinfo
   - 

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -96,6 +96,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -134,6 +135,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Copied: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/messaging-cluster-health-mbean-service.xml (from rev 8232, branches/JBM1842/integration/EAP4/etc/server/default/deploy/messaging-cluster-health-mbean-service.xml)
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/messaging-cluster-health-mbean-service.xml	                        (rev 0)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/messaging-cluster-health-mbean-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+     The JBoss Messaging service deployment descriptor.
+
+     $Id: messaging-service.xml 3881 2008-03-14 16:01:43Z timfox $
+ -->
+
+<server>
+
+   <!-- MessagingClusterHealthMBean MBean configuration
+        ============================== -->
+
+   <mbean code="org.jboss.jms.server.MessagingClusterHealthMBean"
+      name="jboss.messaging:service=MessagingClusterHealthMBean"
+      xmbean-dd="xmdesc/MessagingClusterHealthMBean-xmbean.xml">
+
+      <!-- The service name of Server Peer -->
+
+      <attribute name="ServerPeer">jboss.messaging:service=ServerPeer</attribute>
+      
+      <!-- The service name of Post Office --> 
+      
+      <attribute name="PostOffice">jboss.messaging:service=PostOffice</attribute>
+
+      <!-- The service name of Persistence Manager -->
+
+      <attribute name="PersistenceManager">jboss.messaging:service=PersistenceManager</attribute>
+
+   </mbean>
+
+</server>

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -130,6 +131,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
    ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -130,6 +131,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP = CURRENT_TIMESTAMP WHERE NODE_ID = ?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -132,6 +133,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = NDBCLUSTER
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is clustered. If you don't want a clustered post office then set to false -->

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -96,6 +96,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -134,6 +135,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP DATE, STATE INTEGER, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -92,6 +92,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -130,6 +131,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLUSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, PING_TIMESTAMP TIMESTAMP, STATE INTEGER, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -97,6 +97,7 @@
    UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
    LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
+   LOAD_SINGLE_REFERENCE=SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->
@@ -135,6 +136,12 @@
 INSERT_BINDING=INSERT INTO JBM_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
 LOAD_BINDINGS=SELECT QUEUE_NAME, COND, SELECTOR, CHANNEL_ID, CLSTERED, ALL_NODES FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?
+CREATE_CLUSTER_STATE_TABLE=CREATE TABLE JBM_CLUSTER (NODE_ID SMALLINT, PING_TIMESTAMP DATETIME, STATE SMALLINT, PRIMARY KEY(NODE_ID))
+UPDATE_STATE=UPDATE JBM_CLUSTER SET STATE=? WHERE NODE_ID=?
+UPDATE_TIMESTAMP=UPDATE JBM_CLUSTER SET PING_TIMESTAMP=CURRENT_TIMESTAMP WHERE NODE_ID=?
+LOAD_CLUSTER_STATE=SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER
+INSERT_NODE_STATE=INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, CURRENT_TIMESTAMP, ?)
+DELETE_DEAD_NODE=DELETE FROM JBM_CLUSTER WHERE NODE_ID=?
       ]]></attribute>
 
       <!-- This post office is non clustered. If you want a clustered post office then set to true -->

Copied: branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingClusterHealthMBean-xmbean.xml (from rev 8232, branches/JBM1842/integration/EAP4/etc/xmdesc/MessagingClusterHealthMBean-xmbean.xml)
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingClusterHealthMBean-xmbean.xml	                        (rev 0)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingClusterHealthMBean-xmbean.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+   <!DOCTYPE mbean PUBLIC
+      "-//JBoss//DTD JBOSS XMBEAN 1.2//EN"
+      "http://www.jboss.org/j2ee/dtd/jboss_xmbean_1_2.dtd">
+<mbean>
+   <description>JBoss Messaging Cluster Health MBean</description>
+   <class>org.jboss.jms.server.MessagingClusterHealthMBean</class>
+
+   <!-- Managed constructors -->
+
+   <constructor>
+      <name>MessagingClusterHealthMBean</name>
+   </constructor>
+
+   <!-- Managed attributes -->
+
+   <attribute access="read-only" getMethod="getInstance">
+      <description>The instance to plug into the server peer</description>
+      <name>Instance</name>
+      <type>java.lang.Object</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getServerPeer" setMethod="setServerPeer">
+      <description>The name of server peer service</description>
+      <name>ServerPeer</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getPostOffice" setMethod="setPostOffice">
+      <description>The name of post office service</description>
+      <name>PostOffice</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getPersistenceManager" setMethod="setPersistenceManager">
+      <description>The name of persistence service</description>
+      <name>PersistenceManager</name>
+      <type>java.lang.String</type>
+   </attribute>
+
+   <!-- Managed operations -->
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>create</name>
+   </operation>
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>start</name>
+   </operation>
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>stop</name>
+   </operation>
+
+   <operation>
+      <description>JBoss Service lifecycle operation</description>
+      <name>destroy</name>
+   </operation>
+
+</mbean>

Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -155,6 +155,18 @@
       <type>boolean</type>
    </attribute>
 
+   <attribute access="read-write" getMethod="getKeepOldFailoverModel" setMethod="setKeepOldFailoverModel">
+      <description>If JBM cluster failover should be in old style, default true</description>
+      <name>KeepOldFailoverModel</name>
+      <type>boolean</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getNodeStateRefreshInterval" setMethod="setNodeStateRefreshInterval">
+      <description>Time (milliseconds) between two consecutive timestamping, default 30000</description>
+      <name>NodeStateRefreshInterval</name>
+      <type>long</type>
+   </attribute>
+
    <!-- Managed operations -->
 
    <operation>

Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/ServerPeer-xmbean.xml	2011-03-03 05:45:01 UTC (rev 8233)
@@ -33,6 +33,12 @@
       <name>JMSUserManager</name>
       <type>javax.management.ObjectName</type>
    </attribute>   
+   
+   <attribute access="read-write" getMethod="getMessagingClusterHealthMBean" setMethod="setMessagingClusterHealthMBean">
+      <description>The ObjectName of the MessagingClusterHealthMBean</description>
+      <name>MessagingClusterHealthMBean</name>
+      <type>javax.management.ObjectName</type>
+   </attribute>
          
    <!-- instance access -->
 

Modified: branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/ServiceContainer.java
===================================================================
--- branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/ServiceContainer.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/ServiceContainer.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -134,6 +134,7 @@
    public static ObjectName HTTP_REMOTING_OBJECT_NAME;
    
    public static ObjectName SERVER_PEER_OBJECT_NAME;
+   public static ObjectName POSTOFFICE_OBJECT_NAME;
    
    public static String DATA_SOURCE_JNDI_NAME = "java:/DefaultDS";
    public static String TRANSACTION_MANAGER_JNDI_NAME = "java:/TransactionManager";
@@ -185,6 +186,9 @@
          
          SERVER_PEER_OBJECT_NAME = new ObjectName(
                "jboss.messaging:service=ServerPeer");
+         
+         POSTOFFICE_OBJECT_NAME = new ObjectName(
+               "jboss.messaging:service=PostOffice");
       }
       catch (Exception e)
       {

Copied: branches/Branch_1_4/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java (from rev 8232, branches/JBM1842/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java)
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java	                        (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/MessagingClusterHealthMBean.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -0,0 +1,350 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.jms.server;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.impl.postoffice.MessagingPostOffice;
+import org.jboss.messaging.util.JMXAccessor;
+import org.jboss.system.ServiceMBeanSupport;
+
+/**
+ * A MessagingClusterHealthMBean
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Jan 31, 2011 8:52:02 PM
+ *
+ *
+ */
+public class MessagingClusterHealthMBean extends ServiceMBeanSupport
+{
+   private static final Logger log = Logger.getLogger(MessagingClusterHealthMBean.class);
+   
+   private String serverPeer;
+   private String postOffice;
+   private List<ObjectName> connectionFactories;
+   private List<ObjectName> destinations;
+   private String persistenceManager;
+   
+   private boolean nodeStopped;
+   
+   private NodeRecovery nodeRecovery;
+   
+   private long shutdownDelay;
+
+   public void startService() throws Exception
+   {
+      connectionFactories = new ArrayList<ObjectName>();
+      destinations = new ArrayList<ObjectName>();
+      
+      log.info(this + " started.");
+   }
+   
+   public void stopService() throws Exception
+   {
+      setServerPeer(null);
+      setPostOffice(null);
+      setPersistenceManager(null);
+      connectionFactories.clear();
+      destinations.clear();
+      
+      if (nodeRecovery != null)
+      {
+         nodeRecovery.abandon();
+         nodeRecovery = null;
+      }
+      log.info(this + " stopped.");
+   }
+   
+   public Object getInstance()
+   {
+      return this;
+   }
+   
+   public synchronized void stopNodeOnDBFailure() throws Exception
+   {
+      if (nodeStopped) return;
+
+      log.info("Stopping JBM node on DB failure...");
+            
+      //destinations
+      stopServices(destinations);
+      
+      //connection factories
+      stopServices(connectionFactories);
+      
+      //post office
+      ObjectName postOfficeServiceName = new ObjectName(postOffice);
+      getShutdownDelay(postOfficeServiceName);
+      stopService(postOfficeServiceName);
+      
+      //server peer
+      ObjectName serverPeerName = new ObjectName(serverPeer);
+      stopService(serverPeerName);
+      
+      nodeStopped = true;
+      
+      nodeRecovery = new NodeRecovery();
+      
+      nodeRecovery.start();
+      
+      log.info("JBM node is stopped.");
+   }
+
+   private void getShutdownDelay(ObjectName postOfficeServiceName)
+   {
+      try
+      {
+         long nodeRefreshInterval = (Long)server.getAttribute(postOfficeServiceName, "NodeStateRefreshInterval");
+         shutdownDelay = nodeRefreshInterval * 2;
+      }
+      catch (Exception e)
+      {
+         log.warn("Failed to get node refresh interval, use default.", e);
+         shutdownDelay = 30000;
+      }
+   }
+
+   private void stopService(ObjectName service) throws Exception
+   {
+      try
+      {
+         server.invoke(service, "stop", new Object[0], new String[0]);
+         log.debug("Service " + serviceName + " stopped.");
+      }
+      catch (Exception e)
+      {
+         log.warn("Error stopping service " + serviceName, e);
+      }      
+   }
+
+   private void stopServices(List<ObjectName> services) throws Exception
+   {
+      Iterator<ObjectName> iter = services.iterator();
+      
+      while (iter.hasNext())
+      {
+         ObjectName serviceName = iter.next();
+         stopService(serviceName);
+      }
+   }
+
+   public void setServerPeer(String serverPeer)
+   {
+      this.serverPeer = serverPeer;
+   }
+
+   public String getServerPeer()
+   {
+      return serverPeer;
+   }
+
+   public void setPostOffice(String postOffice)
+   {
+      this.postOffice = postOffice;
+   }
+
+   public String getPostOffice()
+   {
+      return postOffice;
+   }
+
+   public void setPersistenceManager(String persistenceManager)
+   {
+      this.persistenceManager = persistenceManager;
+   }
+
+   public String getPersistenceManager()
+   {
+      return persistenceManager;
+   }
+   
+   private boolean checkDBConnection() throws Exception
+   {
+      ObjectName pmName = new ObjectName(persistenceManager);
+      
+      PersistenceManager pm = (PersistenceManager)JMXAccessor.getJMXAttributeOverSecurity(server, pmName, "Instance");
+
+      boolean isConnOK = false;
+      try
+      {
+         isConnOK = pm.checkConnection();
+      }
+      catch (Exception e)
+      {
+         log.warn("failed to check connection", e);
+         isConnOK = false;
+      }
+      return isConnOK;
+   }
+   
+   private synchronized void restartJBMNode() throws Exception
+   {
+      if (!nodeStopped) return;
+      
+      makeSureDelay();
+      
+      log.info("Restarting JBM node...");
+      
+      //start server peer
+      ObjectName spName = new ObjectName(serverPeer);
+      startService(spName);
+      
+      //starat post office
+      ObjectName poName = new ObjectName(postOffice);
+      startService(poName);
+      
+      //wait for jgroups back
+      MessagingPostOffice pm = (MessagingPostOffice)JMXAccessor.getJMXAttributeOverSecurity(server, poName, "Instance");
+      while (!pm.waitForJGroups())
+      {
+         log.info("Failed to start post office due to JGroups failure, retrying...");
+         this.stopService(poName);
+         makeSureDelay();
+         this.startService(poName);
+         pm = (MessagingPostOffice)JMXAccessor.getJMXAttributeOverSecurity(server, poName, "Instance");
+      }
+      
+      List<ObjectName> copy = new ArrayList<ObjectName>(connectionFactories);
+      connectionFactories.clear();
+      //start cf
+      startServices(copy);
+      
+      copy = new ArrayList<ObjectName>(destinations);
+      destinations.clear();
+      //destinations
+      startServices(copy);
+
+      nodeStopped = false;
+      
+      log.info("JBM node restarted.");
+   }
+   
+   private void makeSureDelay()
+   {
+      long delay = shutdownDelay;
+      
+      while (delay > 0)
+      {
+         long stopTime = System.currentTimeMillis();
+         try
+         {
+            Thread.sleep(delay);
+         }
+         catch (InterruptedException e)
+         {
+         }
+         delay = shutdownDelay - (System.currentTimeMillis() - stopTime);
+      }
+      
+   }
+   
+   private void startService(ObjectName serviceName) throws Exception
+   {
+      server.invoke(serviceName, "start", new Object[0], new String[0]);
+      log.debug("Service " + serviceName + " started.");
+   }
+
+   private void startServices(List<ObjectName> services) throws Exception
+   {
+      Iterator<ObjectName> iter = services.iterator();
+      
+      while (iter.hasNext())
+      {
+         ObjectName serviceName = iter.next();
+         startService(serviceName);
+      }
+   }
+
+   private class NodeRecovery extends Thread
+   {
+      private boolean keepTrying = true;
+      
+      public NodeRecovery()
+      {
+         this.setDaemon(true);
+      }
+      
+      public synchronized void abandon()
+      {
+         keepTrying = false;
+         notify();
+      }
+
+      public synchronized void run()
+      {
+         try
+         {
+            while (keepTrying)
+            {
+               if (checkDBConnection())
+               {
+                  restartJBMNode();
+                  break;
+               }
+               try
+               {
+                  wait(5000);
+               }
+               catch (InterruptedException e)
+               {
+                  //ignore.
+               }
+            }
+         }
+         catch (Exception e)
+         {
+            log.error("Getting exception, stop recovery.", e);
+         }
+      }
+   }
+
+   public void registerFactory(ObjectName serviceName)
+   {
+      synchronized(connectionFactories)
+      {
+         connectionFactories.add(serviceName);
+         log.info("Registered connection factory " + serviceName);
+      }
+   }
+
+   public void registerDestination(ObjectName serviceName)
+   {
+      synchronized(destinations)
+      {
+         destinations.add(serviceName);
+         log.info("Registered destination " + serviceName);
+      }
+   }
+}

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/ServerPeer.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/ServerPeer.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -215,6 +215,9 @@
 
    protected ObjectName jmsUserManagerObjectName;
    protected JMSUserManager jmsUserManager;
+   
+   protected ObjectName messagingClusterHealthMBeanObjectName;
+   protected MessagingClusterHealthMBean clusterHealthBean;
 
    protected ObjectName defaultDLQObjectName;
    protected Queue defaultDLQ;
@@ -360,8 +363,10 @@
          	startMessageCounters();
          }
 
+         initClusterHealthBean();
+         
          started = true;
-
+         
          log.info("JBoss Messaging " + getVersion().getProviderVersion() + " server [" +
             getServerPeerID()+ "] started");
       }
@@ -480,6 +485,21 @@
       jmsUserManagerObjectName = on;
    }
 
+   public synchronized ObjectName getMessagingClusterHealthMBean()
+   {
+      return messagingClusterHealthMBeanObjectName;
+   }
+
+   public synchronized void setMessagingClusterHealthMBean(ObjectName on)
+   {
+      if (started)
+      {
+         log.warn("Cannot set messaging cluster health mbean on server peer when server peer is started");
+         return;
+      }
+      messagingClusterHealthMBeanObjectName = on;
+   }
+
    public synchronized ObjectName getDefaultDLQ()
    {
       return defaultDLQObjectName;
@@ -2047,6 +2067,75 @@
       }
    }
 
+   private void initClusterHealthBean()
+   {
+      if (this.messagingClusterHealthMBeanObjectName != null)
+      {
+         try
+         {
+            clusterHealthBean = (MessagingClusterHealthMBean)JMXAccessor.getJMXAttributeOverSecurity(getServer(),
+                                                                                                     messagingClusterHealthMBeanObjectName,
+                                                                                                     "Instance");
+         }
+         catch (Exception e)
+         {
+            log.warn("Failed to get MessagingClusterHealthMBean instance.", e);
+            clusterHealthBean = null;
+         }
+      }
+   }
+   
+   public void stopJBMNodeForRecovery()
+   {
+      if (clusterHealthBean != null)
+      {
+         try
+         {
+            clusterHealthBean.stopNodeOnDBFailure();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to stop node for DB failure, need to manually restart node.", e);
+            try
+            {
+               this.stopService();
+            }
+            catch (Exception e1)
+            {
+               log.error("Failed to stop server peer.", e1);
+            }
+         }
+      }
+      else
+      {
+         log.warn(" mbean " + messagingClusterHealthMBeanObjectName + " not deployed! You need manually restart the node.");
+         try
+         {
+            this.stopService();
+         }
+         catch (Exception e1)
+         {
+            log.error("Failed to stop server peer.", e1);
+         }
+      }
+   }
+
+   public void registerCFForRecovery(ObjectName serviceName)
+   {
+      if (clusterHealthBean != null)
+      {
+         clusterHealthBean.registerFactory(serviceName);
+      }
+   }
+
+   public void registerDestForRecovery(ObjectName serviceName)
+   {
+      if (clusterHealthBean != null)
+      {
+         clusterHealthBean.registerDestination(serviceName);
+      }
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -223,6 +223,8 @@
          }
       
          log.info(info);
+         
+         serverPeer.registerCFForRecovery(serviceName);
          log.info(this + " started");
       }
       catch (Throwable t)

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -139,6 +139,8 @@
          {
             setExpiryQueue(expiryQueueObjectName);
          }
+         
+         serverPeer.registerDestForRecovery(serviceName);
       }
       catch (Throwable t)
       {

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/ClusterNotification.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/ClusterNotification.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/ClusterNotification.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -47,6 +47,8 @@
 	public static final int TYPE_REPLICATOR_PUT = 6;
 	
 	public static final int TYPE_REPLICATOR_REMOVE = 7;
+	
+	public static final int TYPE_NODE_FAILEDOVER = 8;
 		
 	public int type;
 	

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -104,6 +104,9 @@
 
    //update messages state to 'C' of the channel whose state is 'S', and return their messages ids
    List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception;
+   
+   //check if the connection is ok
+   boolean checkConnection();
 
    // Interface value classes ----------------------------------------------------------------------
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -751,6 +751,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psInsertReference);
@@ -856,8 +872,13 @@
                }
 
                // Sanity check
-               if (rows != num) { throw new IllegalStateException(
-                     "Did not update correct number of rows"); }
+               if (rows != num)
+               {
+                  if (!transactionDone)
+                  {
+                     throw new IllegalStateException("Did not update correct number of rows");
+                  }
+               }
 
                return null;
             }
@@ -1558,7 +1579,7 @@
       if (trace) { log.trace("Adding reference " + ref + " in channel " + channelID + " tx " + tx); }
 
       class AddReferenceRunner extends JDBCTxRunner2
-      {
+      {         
          private Message message;
          private boolean messagePersisted = false;
          public Object doTransaction() throws Exception
@@ -1614,6 +1635,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a problem but the message " + message + " has already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -1651,10 +1688,11 @@
       if (trace) { log.trace("Moving reference " + ref + " from " + sourceChannelID + " to " + destChannelID); }
 
       class MoveReferenceRunner extends JDBCTxRunner2
-      {
+      {         
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
+            PreparedStatement ps = null;
 
             try
             {
@@ -1673,8 +1711,32 @@
                
                if (rows == 0)
                {
-                  //no message updated, should be canceled back already
-                  throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+                  //check if it's already sucked before a possible failure
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+                  ps.setLong(1, ref.getMessage().getMessageID());
+                  
+                  ResultSet rs = ps.executeQuery();
+                  
+                  if (rs.next())
+                  {
+                     String state = rs.getString(1);
+                     long channelID = rs.getLong(2);
+                     
+                     if ((!"C".equals(state)) || channelID != destChannelID)
+                     {
+                        //the message didn't get moved!
+                        throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+                     }
+                     else
+                     {
+                        log.info("Already moved message " + ref.getMessage().getMessageID());
+                     }
+                  }
+                  else
+                  {
+                     // no message updated, should be canceled back already
+                     throw new JMSException("Failed to move message " + ref.getMessage().getMessageID() + " probably canceled back to source channel.");
+                  }
                }
 
                return null;
@@ -1682,6 +1744,7 @@
             finally
             {
                closeStatement(psReference);
+               closeStatement(ps);
             }
          }
       }
@@ -2080,6 +2143,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -2280,6 +2359,22 @@
 
                return null;
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a after-commit problem however messages have already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(psReference);
@@ -2991,6 +3086,7 @@
       map.put("UPDATE_MESSAGE_STATE", "UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?");
       map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'");
       map.put("LOAD_REFS_IN_SUCK", "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
+      map.put("LOAD_SINGLE_REFERENCE", "SELECT STATE, CHANNEL_ID FROM JBM_MSG_REF WHERE MESSAGE_ID=?");      
 
       return map;
    }
@@ -3360,6 +3456,7 @@
          public Object doTransaction() throws Exception
          {
             PreparedStatement psReference = null;
+            PreparedStatement ps = null;
 
             try
             {
@@ -3381,7 +3478,28 @@
                
                if (rows != 1)
                {
-                  throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_SINGLE_REFERENCE"));
+                  ps.setLong(1, ref.getMessage().getMessageID());
+                  
+                  ResultSet rs = ps.executeQuery();
+                  
+                  if (rs.next())
+                  {
+                     String state = rs.getString(1);
+                     
+                     if (!c.equals(state))
+                     {
+                        throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                     }
+                     else
+                     {
+                        log.info("Already updated message " + ref.getMessage().getMessageID());
+                     }
+                  }
+                  else
+                  {
+                     throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+                  }
                }
 
                return null;
@@ -3389,6 +3507,7 @@
             finally
             {
                closeStatement(psReference);
+               closeStatement(ps);
             }
          }
       }
@@ -3446,4 +3565,33 @@
       return msgIDs;
    }
 
+   public boolean checkConnection()
+   {
+      Connection connection = null;
+      try
+      {
+         connection = ds.getConnection();
+         return connection != null;
+      }
+      catch (Throwable e)
+      {
+         log.warn("Check connection failed ", e);
+         return false;
+      }
+      finally
+      {
+         if (connection != null)
+         {
+            try
+            {
+               connection.close();
+            }
+            catch (SQLException e)
+            {
+               log.warn("Error closing connection", e);
+            }
+         }
+      }
+   }
+
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -28,6 +28,7 @@
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
@@ -335,7 +336,6 @@
                   if (log.isTraceEnabled()) { log.trace("Executing: " + statement); }
                       
                   st = conn.createStatement();
-                  
                   st.executeUpdate(statement);
                }
                catch (Exception e) 
@@ -523,7 +523,31 @@
    	protected Connection conn;
 
       private TransactionWrapper wrap;
+      
+      protected boolean shouldFailCommit = false;
+      
+      public T executeOnlyOnce() throws Exception
+      {
+         wrap = new TransactionWrapper();
 
+         try
+         {
+            conn = ds.getConnection();
+
+            return doTransaction();
+         }
+         catch (Exception e)
+         {
+            wrap.exceptionOccurred();
+            throw e;
+         }
+         finally
+         {
+            wrap.end();
+            closeConnection(conn);           
+         }         
+      }
+
       public T execute() throws Exception
 		{
 	      wrap = new TransactionWrapper();
@@ -590,6 +614,8 @@
       private boolean getConnectionFailed;
 
       private boolean getCommitFailed;
+      
+      protected boolean transactionDone = false;
 
       public T execute() throws Exception
       {                    
@@ -610,6 +636,8 @@
             
             T res = doTransaction();
             
+            transactionDone = true;
+
             conn.commit();
             
             return res;

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -237,6 +237,11 @@
       return Collections.emptyList();
    }
 
+   public boolean checkConnection()
+   {
+      return true;
+   }
+
 }
 
 class IDCounter

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -376,6 +376,15 @@
 					removeSucker(queueName, notification.nodeID);					
 				}
 			}
+			else if (notification.type == ClusterNotification.TYPE_NODE_FAILEDOVER)
+			{
+			   //clean up connections
+			   ConnectionInfo conn = (ConnectionInfo)connections.remove(notification.nodeID);
+			   if (conn != null)
+			   {
+			      conn.close();
+			   }
+			}
 		}
 		catch (Exception e)
 		{

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/ClusterRequest.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -64,9 +64,10 @@
 	
 	public static final int ADD_ALL_REPLICATED_DELIVERIES_REQUEST = 12;
 	
-	public static final int GET_REPLICATED_DELIVERIES_REQUEST = 13;
-		
+	public static final int GET_REPLICATED_DELIVERIES_REQUEST = 13;		
 	
+	public static final int STATE_REQUEST = 14;
+	
 	protected static final int NULL = 0;
 	
 	protected static final int NOT_NULL = 1;
@@ -148,6 +149,11 @@
          	request = new GetReplicatedDeliveriesRequest();
          	break;
          }
+         case STATE_REQUEST:
+         {
+            request = new StateRequest();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + type);

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -27,6 +27,7 @@
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.MessageDispatcher;
 import org.jgroups.blocks.RequestHandler;
+import org.jgroups.jmx.JmxConfigurator;
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
@@ -37,6 +38,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -46,6 +48,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 /**
  * 
  * This class handles the interface with JGroups
@@ -58,6 +63,10 @@
  */
 public class GroupMember
 {
+   public static final String JGROUPS_JMX_DOMAIN = "jboss.jgroups";
+   public static final String CHANNEL_JMX_ATTRIBUTES = "type=channel,cluster=";
+   public static final String PROTOCOL_JMX_ATTRIBUTES = "type=protocol,cluster=";
+
    public static final String DATA_SUFFIX = "-DATA";
 
    public static final String CONTROL_SUFFIX = "-CTRL";
@@ -94,6 +103,11 @@
    
    private volatile boolean starting;
    
+   //jmx
+   protected boolean channelRegistered;   
+   protected boolean protocolsRegistered;
+
+   
    //We need to process view changes on a different thread, since if we have more than one node running
    //in the same VM then the thread that sends the leave message ends up executing the view change on the other node
    //We probably don't need this if all nodes are in different VMs
@@ -186,6 +200,20 @@
       future.get();
    }
 
+   public boolean requestState() throws Exception
+   {
+      boolean stateGot = controlChannel.getState(null, stateTimeout);
+      waitForState();
+      return stateGot;
+   }
+
+   public boolean requestState(Address addr) throws Exception
+   {
+      boolean stateGot = controlChannel.getState(addr, stateTimeout);
+      waitForState();
+      return stateGot;
+   }
+
    private Future<String> connectDataChannel()
    {
       Callable<String> dataRunner = new Callable<String> () {
@@ -258,8 +286,13 @@
    
    public void multicastControl(ClusterRequest request, boolean sync) throws Exception
    {
-   	if (ready.get())
-   	{   		
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
+   	{
 	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
 	
 	   	Message message = new Message(null, null, writeRequest(request));
@@ -290,7 +323,12 @@
    
    public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
    {
-   	if (ready.get())
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
 	
@@ -318,9 +356,40 @@
 	   	}
    	}
    }
-   
+
+   public Object unicastRequest(ClusterRequest request, Address address) throws Exception
+   {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace)
+         {
+            log.trace(this + " the request target is not available");
+         }
+      }
+
+      Object response = null;
+
+      if (ready.get())
+      {
+         if (trace)
+         {
+            log.trace(this + " sending " + request + " to control channel");
+         }
+
+         Message message = new Message(address, null, writeRequest(request));
+
+         response = dispatcher.sendMessage(message, GroupRequest.GET_FIRST, castTimeout);
+      }
+      return response;
+   }
+
    public void multicastData(ClusterRequest request) throws Exception
    {
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
    	if (ready.get())
    	{   		
 	   	if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
@@ -333,7 +402,12 @@
    
    public void unicastData(ClusterRequest request, Address address) throws Exception
    {
-   	if (ready.get())
+      if (!requestTarget.isAvailable())
+      {
+         if (trace) { log.trace(this + " the request target is not available"); }
+      }
+
+      if (ready.get())
    	{
 	   	if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
 	
@@ -659,5 +733,70 @@
          }
       }
    }
+
+   public void registerChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      try
+      {
+         String protocolPrefix = JGROUPS_JMX_DOMAIN + ":" + PROTOCOL_JMX_ATTRIBUTES + channelPartitionName;
+         JmxConfigurator.registerProtocols(server, (JChannel)controlChannel, protocolPrefix);
+         protocolsRegistered = true;
+         
+         log.debug("Registed protocol: " + protocolPrefix);
+         
+         String name = JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + channelPartitionName;
+         JmxConfigurator.registerChannel((JChannel)controlChannel, server, name);
+         channelRegistered = true;
+         
+         log.debug("Registed channel: " + name);
+      }
+      catch (Exception e)
+      {
+         log.error("Caught exception registering channel in JXM", e);
+      }
+   }
+
+   public void unregisterChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      ObjectName on = null;
+      if (channelRegistered)
+      {          
+         // Unregister the channel itself
+         try
+         {
+            on = new ObjectName(JGROUPS_JMX_DOMAIN + ":" + CHANNEL_JMX_ATTRIBUTES + channelPartitionName);
+            server.unregisterMBean(on);
+         }
+         catch (Exception e)
+         {
+            if (on != null)
+               log.error("Caught exception unregistering channel at " + on, e);
+            else
+               log.error("Caught exception unregistering channel", e);
+         }
+      }
+      
+      if (protocolsRegistered)
+      {
+         // Unregister the protocols
+         try
+         {
+            on = new ObjectName(JGROUPS_JMX_DOMAIN + ":*," + PROTOCOL_JMX_ATTRIBUTES + channelPartitionName);
+            Set mbeans=server.queryNames(on, null);
+            if(mbeans != null) {
+                for(Iterator it=mbeans.iterator(); it.hasNext();) {
+                    server.unregisterMBean((ObjectName)it.next());
+                }
+            }
+         }
+         catch (Exception e)
+         {
+            if (on != null)
+               log.error("Caught exception unregistering protocols at " + on, e);
+            else
+               log.error("Caught exception unregistering protocols", e);
+         }
+      }
+   }
    
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -24,6 +24,8 @@
 import java.io.Serializable;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -41,6 +43,7 @@
 
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanServer;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 import javax.management.NotificationFilter;
@@ -107,6 +110,14 @@
    // Constants ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(MessagingPostOffice.class);
+   
+   public static final int STATE_STANDALONE = 1;
+   
+   public static final int STATE_CLUSTERED = 2;
+   
+   public static final int STATE_QUARANTINED = 3;
+   
+   public static final int STATE_DEAD = 4;
 
    //This are only used in testing
    
@@ -118,6 +129,7 @@
    
    private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
            Executors.newCachedThreadPool(new JBMThreadFactory("msg-post-office")));
+
    //End only used in testing
 
    // Static ---------------------------------------------------------------------------------------
@@ -245,7 +257,21 @@
    private Object viewUpdateLock;
    private boolean stopUpdate = false;
    private boolean updateInProcess = false;
-      
+   
+   private StateMonitor stateMonitor = null;
+   
+   private Map<Integer, QuarantinedNode> suspectedNodes = new java.util.concurrent.ConcurrentHashMap<Integer, QuarantinedNode>();
+
+   private ClusterState clusterState = new ClusterState();
+   
+   private long nodeStateRefreshInterval = 30000;
+   
+   private boolean keepOldFailoverModel = true;
+   
+   private Object jgroupsLock = new Object();
+
+   private List<Address> newNodes;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public boolean isFailoverOnNodeLeave()
@@ -330,7 +356,9 @@
                               boolean failoverOnNodeLeave,
                               int maxRetry,
                               int retryInterval,
-                              boolean retryOnConnectionFailure)
+                              boolean retryOnConnectionFailure,
+                              boolean keepOldFailoverModel,
+                              long nodeStateRefreshInterval)
       throws Exception
    {
    	this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
@@ -346,6 +374,10 @@
 
       this.supportsFailover = supportsFailover;
       
+      this.keepOldFailoverModel = keepOldFailoverModel;
+      
+      this.nodeStateRefreshInterval = nodeStateRefreshInterval;
+      
       nbSupport = new NotificationBroadcasterSupport();
       
       replicateSemaphore = new ClearableSemaphore(maxConcurrentReplications);
@@ -393,7 +425,7 @@
 	      //Sanity check - we check there aren't any other nodes already in the cluster with the same node id
 	      if (knowAboutNodeId(thisNodeID))
 	      {
-	      	throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
+            throw new IllegalArgumentException("Cannot start post office since there is already a post office in the " +
 	      			"cluster with the same node id (" + thisNodeID + "). " +
 	      			"Are you sure you have given each node a unique node id during installation?");
 	      }
@@ -411,6 +443,8 @@
 	      put(Replicator.JVM_ID_KEY, clientVMId);
 	      
 	      groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+	      
+	      initClusterState();
       }
    
       //Now load the bindings for this node
@@ -419,12 +453,343 @@
       
       started = true;
 
-      log.debug(this + " started");      
+      log.debug(this + " started");
    }
+   
+   //this method will trigger a dedicated thread to write time stamp
+   private synchronized void initClusterState() throws Exception
+   {
+      if (this.keepOldFailoverModel) return;
+      
+      this.addThisNodeStateInStorage();
+         
+      stateMonitor = new StateMonitor();
+      stateMonitor.start();
+   }
+   
+   private void addThisNodeStateInStorage() throws Exception
+   {
+      if (ds == null)
+      {
+         return;
+      }
 
+      class AddNodeState extends JDBCTxRunner
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
+            PreparedStatement ps1  = null;
+
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+
+               ps.setInt(1, STATE_CLUSTERED);
+               ps.setInt(2, thisNodeID);
+
+               int row = ps.executeUpdate();
+               
+               if (row == 0)
+               {
+                  ps1 = conn.prepareStatement(getSQLStatement("INSERT_NODE_STATE"));
+                  ps1.setInt(1, thisNodeID);
+                  ps1.setInt(2, STATE_CLUSTERED);
+
+                  ps1.executeUpdate();
+               }
+            }
+            finally
+            {
+               closeStatement(ps);
+               closeStatement(ps1);
+            }
+            return null;
+         }
+      }
+      new AddNodeState().executeWithRetry();
+   }
+   
+   private Integer updateStateInStorage(final int nID, final int newState, boolean ifRetry) throws Exception
+   {
+      if (ds == null)
+      {
+         return 0;
+      }
+
+      class UpdateState extends JDBCTxRunner
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps  = null;
+            int row = 0;
+            
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_STATE"));
+
+               ps.setInt(1, newState);
+               ps.setInt(2, nID);
+
+               row = ps.executeUpdate();
+            }
+            finally
+            {
+               closeStatement(ps);
+            }
+            return new Integer(row);
+         }
+      }
+      if (ifRetry)
+      {
+         return (Integer)new UpdateState().executeWithRetry();
+      }
+      return (Integer)new UpdateState().executeOnlyOnce();
+   }
+   
+   private void cleanUpSuspectedNode(final Integer qNode) throws Exception
+   {
+      QuarantinedNode node = suspectedNodes.remove(qNode);
+      if (node == null)
+      {
+         log.warn("Cannot find the suspected node.");
+      }
+      
+      //clean from DB
+      if (ds != null)
+      {
+         class CleanupDeadNode extends JDBCTxRunner
+         {
+            public Object doTransaction() throws Exception
+            {
+               PreparedStatement ps = null;
+
+               try
+               {
+                  ps = conn.prepareStatement(getSQLStatement("DELETE_DEAD_NODE"));
+                  ps.setInt(1, qNode);
+
+                  ps.executeUpdate();
+               }
+               finally
+               {
+                  closeStatement(ps);
+               }
+               return null;
+            }
+         }
+         try
+         {
+            new CleanupDeadNode().executeOnlyOnce();
+         }
+         catch (Exception e)
+         {
+            log.warn("Failed to clean up dead node " + node + " from DB");
+         }
+      }
+   }
+   
+   /*
+    * 1. for each dead node, clean up.
+    * 2. for its dead buddy, failover
+    * 3. if itself being quarantined, becomes standalone.
+    */
+   private void processClusterState() throws Exception
+   {
+      Iterator<Integer> iter = suspectedNodes.keySet().iterator();
+      
+      ClusterState clusterStateCopy = clusterState.copy();
+      
+      while (iter.hasNext())
+      {
+         Integer qNodeID = iter.next();
+         log.debug("Processing cluster state for node " + qNodeID);
+         
+         Boolean isDead = clusterStateCopy.isNodeDead(qNodeID);
+         
+         if (isDead == null)
+         {
+            //someone else done the job.
+            log.debug("Cluster state for node " + qNodeID + " was removed from the database; removing local information now.");
+            cleanDataForNode(qNodeID);
+            suspectedNodes.remove(qNodeID);
+         }
+         else if (isDead)
+         {
+            QuarantinedNode qNode = suspectedNodes.get(qNodeID);
+
+            // the node may be null because it has just having a DB problem. In that case we ignore
+            if (qNode == null)
+            {
+               log.debug("node " + qNodeID +
+                         " seems dead but it hasn't been suspected yet. Probably having DB problem.");
+               continue;
+            }
+            Integer fNodeID = qNode.getFailover();
+
+            Integer foNodeID = (Integer)failoverMap.get(fNodeID);
+            if (foNodeID == null)
+            {
+               throw new IllegalStateException("Failover node " + fNodeID + " for node " + qNode + " is not alive!");
+            }
+
+            //every suspected node must have a one and only failover node.
+            //however, each dead node may have multiple failover nodes that are not in one 'brain'
+            //if a node is quarantined, it suspects all other nodes which could be still in the cluster.
+            //so this left node will be failover node for all of them, at the same time cluster nodes
+            //have a strict failover map. If a node in cluster dies, this left node and it's 'legitimate' failover
+            //node all have chance to do the failover. We need to make sure this only happen once.
+            if (fNodeID.intValue() == thisNodeID)
+            {
+               ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, qNodeID.intValue(), null);
+
+               clusterNotifier.sendNotification(notification);
+
+               // I am the failover node for the dead, perform failover now
+               if (suspectedNodes.get(qNodeID).shouldFailover() && isSupportsFailover())
+               {
+                  log.debug(this + ": I am the failover node for node " + qNodeID + " that crashed");
+
+                  //update node status. this definitely not me!
+                  this.updateStateInStorage(qNodeID, STATE_DEAD, true);
+
+                  performFailover(qNodeID);
+
+                  // now clean up the quarantined set
+                  cleanUpSuspectedNode(qNodeID);
+               }
+            }
+            else
+            {
+               //now clean up the node from this node.
+               cleanDataForNode(qNodeID);
+               suspectedNodes.remove(qNodeID);
+            }
+         }
+      }
+      //if I am quarantined, but the current view is not one member. That means 
+      //I missed the startup time join (jgroups was still bad then), but later jgroups is back
+      //to normal, so I get this chance do rejoin.
+      if (clusterState.isQuarantined(thisNodeID))
+      {
+         View currView = groupMember.getCurrentView();
+         if ((currView != null) && currView.getMembers().size() > 1)
+         {
+            //here we sleep for 5 sec to allow flush finish
+            try
+            {
+               Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+               //ignore
+            }
+            PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+                                                                   groupMember.getDataChannelAddress());
+            try
+            {
+               String clientVMId = JMSClientVMIdentifier.instance;
+               //add our vm identifier to the replicator
+               put(Replicator.JVM_ID_KEY, clientVMId);
+
+               groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+               updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
+            }
+            catch (Exception e)
+            {
+               log.error("error sending node join request!", e);
+            }
+         }
+      }
+   }
+   
+   //timestamp and query the new state from db
+   private boolean refreshNodeState() throws Exception
+   {
+      if (ds == null)
+      {
+         return true;
+      }
+      
+      class RefreshNodeState extends JDBCTxRunner
+      {
+         boolean timestampDone = false;
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps1  = null;
+            PreparedStatement ps2  = null;
+
+            try
+            {
+               //writing timestamp
+               ps1 = conn.prepareStatement(getSQLStatement("UPDATE_TIMESTAMP"));
+
+               ps1.setInt(1, thisNodeID);
+
+               int result = ps1.executeUpdate();
+
+               if (result > 0)
+               {
+                  log.debug("Successfully updated cluster health timestamp for node " + thisNodeID);
+               }
+               else if (result == 0)
+               {
+                  log.debug("Cluster health timestamp update for node " + thisNodeID + " failed!");
+               }
+
+               timestampDone = true;
+
+               synchronized (clusterState)
+               {
+                  clusterState.clear();
+                  // collect states
+                  ps2 = conn.prepareStatement(getSQLStatement("LOAD_CLUSTER_STATE"));
+                  
+                  ResultSet res = ps2.executeQuery();
+
+                  while (res.next())
+                  {
+                     int nodeID = res.getInt(1);
+                     Timestamp t = res.getTimestamp(2);
+                     long timestamp = t.getTime();
+                     int nodeState = res.getInt(3);
+                     t = res.getTimestamp(4);
+                     long checkPoint = t.getTime();
+
+                     clusterState.addNode(nodeID, timestamp, nodeState, checkPoint);
+                     log.debug("Added cluster node state: nodeID = " + nodeID + ", timestamp = " + timestamp + ", nodeState = " + nodeState);
+                  }
+               }
+            }
+            finally
+            {
+               closeStatement(ps1);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      RefreshNodeState r = new RefreshNodeState();
+      try
+      {
+         r.executeOnlyOnce();
+      }
+      catch (Throwable e)
+      {
+         //ignore;
+      }
+      return r.timestampDone;
+   }
+
    public void stop() throws Exception
    {
       stopViewUpdate();
+      
+      if (this.stateMonitor != null)
+      {
+         stateMonitor.shutdown();
+         stateMonitor = null;
+      }
 
       synchronized (this)
       {
@@ -1120,99 +1485,302 @@
     */
    public void nodeJoined(Address address) throws Exception
    {
-      log.debug(this + ": " + address + " joined");      
+      log.debug(this + ": " + address + " joined");
+      
+      if (this.keepOldFailoverModel) return;
+      
+      Integer newNode = findNodeIDForAddress(address);
+
+      if (newNode == null)
+      {
+         //node not already joined. This could happen when a node is auto restarted
+         //when the jgroups is not working properly
+         synchronized (jgroupsLock)
+         {
+            newNodes.add(address);
+            jgroupsLock.notify();
+         }
+         return;
+      }
+      else
+      {
+         QuarantinedNode qNode = suspectedNodes.remove(newNode);
+         
+         if (qNode == null)
+            return;
+         
+         log.debug("A quarantined node " + qNode + " re-joined cluster.");
+         
+         // if I am the quarantined node, I do rejoin.
+         if (clusterState.isQuarantined(thisNodeID))
+         {
+            new Thread()
+            {
+               public void run()
+               {
+                  PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+                                                                         groupMember.getDataChannelAddress());
+                  try
+                  {
+                     groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+                     calculateFailoverMap();
+                     updateStateInStorage(thisNodeID, STATE_CLUSTERED, true);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("error sending node join request!", e);
+                  }
+               }
+            }.start();
+         }
+      }
    }
    
    public void nodesLeft(List addresses) throws Throwable
    {
    	if (trace) { log.trace("Nodes left " + addresses.size()); }
-   	  	
-   	Map oldFailoverMap = new HashMap(this.failoverMap);
-   	
-   	int oldFailoverNodeID = failoverNodeID;
-   	
-      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }      
-   	
-   	calculateFailoverMap();
-   	
+
+   	if (!keepOldFailoverModel)
+   	{
+         //quarantine the nodes
+         quarantine(addresses);   	   
+   	}
+   	else
+   	{
+         Map oldFailoverMap = new HashMap(this.failoverMap);
+         
+         int oldFailoverNodeID = failoverNodeID;
+         
+         if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }      
+         
+         calculateFailoverMap();
+         
+         if (trace) { log.trace("First node is now " + firstNode); }
+         
+         if (firstNode && this.useJGroupsWorkaround)
+         {
+            //If we are now the first node in the cluster then any outstanding replication requests will not get responses
+            //so we must release these and we have no more need of a semaphore until another node joins
+            replicateSemaphore.disable();
+         }
+               
+         Iterator iter = addresses.iterator();
+         
+         while (iter.hasNext())
+         {
+            Address address = (Address)iter.next();
+
+            log.debug(this + ": " + address + " left");
+
+            Integer leftNodeID = getNodeIDForSyncAddress(address);
+      
+            if (leftNodeID == null)
+            {
+               throw new IllegalStateException(this + " cannot find node ID for address " + address);
+            }
+            
+            boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
+      
+            log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+         
+            Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+         
+            log.debug(this + " the failover node for the crashed node is " + fnodeID);
+               
+            boolean doneFailover = false;
+            
+            ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+            
+            clusterNotifier.sendNotification(notification);
+            
+            if (crashed && isSupportsFailover())
+            {        
+               if (fnodeID == null)
+               {
+                  throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+               }
+               
+               if (fnodeID.intValue() == thisNodeID)
+               {
+                  // The node crashed and we are the failover node so let's perform failover
+         
+                  log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+         
+                  performFailover(leftNodeID);
+                  
+                  doneFailover = true;
+               }
+            }
+         
+            if (!doneFailover)
+            {
+               // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
+               // recalculate the connection factory delegates and failover delegates.
+         
+               cleanDataForNode(leftNodeID);
+            }
+         
+            if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+            
+            if (oldFailoverNodeID != failoverNodeID)
+            {
+               //Failover node for this node has changed
+               
+               failoverNodeChanged(oldFailoverNodeID, firstNode, false);         
+            }
+         }
+   	}
+	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+   }
+   
+   /**
+    * For each node, update its state to be STATE_QUARANTINED
+    * then store the information for later failover.
+    * @throws Exception 
+    */
+   private void quarantine(List addresses) throws Exception
+   {
+      Map oldFailoverMap = new HashMap(this.failoverMap);
+      
+      int oldFailoverNodeID = failoverNodeID;
+      
+      if (trace) { log.trace("Old failover node id: " + oldFailoverNodeID); }
+      
+      calculateFailoverMap();
+      
       if (trace) { log.trace("First node is now " + firstNode); }
       
       if (firstNode && this.useJGroupsWorkaround)
       {
-      	//If we are now the first node in the cluster then any outstanding replication requests will not get responses
-      	//so we must release these and we have no more need of a semaphore until another node joins
-      	replicateSemaphore.disable();
+         //If we are now the first node in the cluster then any outstanding replication requests will not get responses
+         //so we must release these and we have no more need of a semaphore until another node joins
+         replicateSemaphore.disable();
       }
-            
-   	Iterator iter = addresses.iterator();
-   	
-   	while (iter.hasNext())
-   	{
-   		Address address = (Address)iter.next();
 
-         log.debug(this + ": " + address + " left");
-
-	      Integer leftNodeID = getNodeIDForSyncAddress(address);
-	
-	      if (leftNodeID == null)
-	      {
-	         throw new IllegalStateException(this + " cannot find node ID for address " + address);
-	      }
-	      
-	      boolean crashed = failoverOnNodeLeave || !leaveMessageReceived(leftNodeID);
-	
-	      log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
+      Iterator iter = addresses.iterator();
       
-	      Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+      while (iter.hasNext())
+      {
+         Address addr = (Address)iter.next();
+         
+         Integer leftNodeID = getNodeIDForSyncAddress(addr);
+         
+         if (leftNodeID == null)
+         {
+            throw new IllegalStateException(this + " cannot find node ID for address " + addr);
+         }
+         
+         boolean leaveReceived = leaveMessageReceived(leftNodeID);
+         
+         boolean crashed = failoverOnNodeLeave || !leaveReceived;
+         
+         log.debug(this + ": node " + leftNodeID + " has " + (crashed ? "crashed" : "cleanly left the group"));
       
+         Integer fnodeID = (Integer)oldFailoverMap.get(leftNodeID);
+      
          log.debug(this + " the failover node for the crashed node is " + fnodeID);
-	         
-	      boolean doneFailover = false;
-	      
-	      ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
-	      
-	      clusterNotifier.sendNotification(notification);
-      
-	      if (crashed && isSupportsFailover())
-	      {	      
-		      if (fnodeID == null)
-		      {
-		      	throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
-		      }
-		      
-		      if (fnodeID.intValue() == thisNodeID)
-		      {
-		         // The node crashed and we are the failover node so let's perform failover
-		
-		         log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
-		
-		         performFailover(leftNodeID);
-		         
-		         doneFailover = true;
-		      }
-	      }
-      
-	      if (!doneFailover)
-	      {
-		      // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
-		      // recalculate the connection factory delegates and failover delegates.
-		
-		      cleanDataForNode(leftNodeID);
-	      }
-      
-	      if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
-	      
-	      if (oldFailoverNodeID != failoverNodeID)
-	      {
-	      	//Failover node for this node has changed
-	      	
-	      	failoverNodeChanged(oldFailoverNodeID, firstNode, false);      	
-	      }
-   	}
-	      
-	   sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+         
+         if (!leaveReceived)
+         {
+            log.info("Node " + leftNodeID + " didn't left normally, put it to suspected list.");
+            
+            //if its failover node is not in the new failoverMap, that means its failover node is quarantined with it
+            Integer ffNode = (Integer)failoverMap.get(fnodeID);
+            Integer currentFNode;
+            while (ffNode == null)
+            {
+               //Note this requires the failover map must form a circle! otherwise this may become an infinite loop!
+               currentFNode = fnodeID;
+               //search old map
+               fnodeID = (Integer)oldFailoverMap.get(currentFNode);
+               //then new map
+               ffNode = (Integer)failoverMap.get(fnodeID);
+            }
+         
+            //now the fnodeID is an active node
+            QuarantinedNode qNode = new QuarantinedNode(leftNodeID, fnodeID, crashed);
+         
+            //now we need take care of those already quarantined nodes
+            Iterator<Integer> iterQ = suspectedNodes.keySet().iterator();
+            while (iterQ.hasNext())
+            {
+               QuarantinedNode aNode = suspectedNodes.get(iterQ.next());
+               if (aNode.getFailover().equals(leftNodeID))
+               {
+                  //We need to transfer the failover responsibility to the new active one
+                  aNode.setFailover(fnodeID);
+               }
+            }
+         
+            suspectedNodes.put(leftNodeID, qNode);
+
+            if (isFirstNode() && (clusterState.liveNodeNum() > 1))
+            {
+               try
+               {
+                  clusterState.updateNodeState(thisNodeID, STATE_QUARANTINED);
+               }
+               catch (Exception e)
+               {
+                  log.error("Having problem update myself. Shutting down to avoid message duplicated delivery.");
+                  this.serverPeer.stopJBMNodeForRecovery();
+                  return;
+               }
+            }
+
+            if (trace) { log.trace("Quarantined node: " + qNode); }
+         }
+         else
+         {
+            log.info("Node " + leftNodeID + " left normally, clean up now.");
+            
+            boolean doneFailover = false;
+            
+            ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_LEAVE, leftNodeID.intValue(), null);
+            
+            clusterNotifier.sendNotification(notification);
+            
+            if (crashed && isSupportsFailover())
+            {        
+               if (fnodeID == null)
+               {
+                  throw new IllegalStateException("Cannot find failover node for node " + leftNodeID);
+               }
+               
+               if (fnodeID.intValue() == thisNodeID)
+               {
+                  // The node crashed and we are the failover node so let's perform failover
+         
+                  log.debug(this + ": I am the failover node for node " + leftNodeID + " that crashed");
+         
+                  performFailover(leftNodeID);
+                  
+                  doneFailover = true;
+               }
+            }
+         
+            if (!doneFailover)
+            {
+               // Remove any replicant data and non durable bindings for the node -  This will notify any listeners which will
+               // recalculate the connection factory delegates and failover delegates.
+         
+               cleanDataForNode(leftNodeID);
+            }
+         }
+         
+         if (trace) {log.trace("First node: " + firstNode + " oldFailoverNodeID: " + oldFailoverNodeID + " failoverNodeID: " + failoverNodeID); }
+         
+         if (oldFailoverNodeID != failoverNodeID)
+         {
+            //Failover node for this node has changed
+            
+            failoverNodeChanged(oldFailoverNodeID, firstNode, false);         
+         }
+      }
+
+      stateMonitor.newQuarantined();      
    }
-   
+
    // RequestTarget implementation ------------------------------------------------------------
    
    /*
@@ -1332,53 +1900,65 @@
    
    public void handleNodeJoined(int nodeId, PostOfficeAddressInfo info) throws Exception
    {	   	   	   	
-   	nodeIDAddressMap.put(new Integer(nodeId), info);
-   	
-   	log.debug(this + " handleNodeJoined: " + nodeId + " size: " + nodeIDAddressMap.size());
-   	   
-   	final int oldFailoverNodeID = this.failoverNodeID;
-   	
-   	boolean wasFirstNode = this.firstNode;
-   	
-   	calculateFailoverMap();
-   	
-   	if (wasFirstNode && useJGroupsWorkaround)
-   	{
-   		//If we were the first node but now another node has joined - we need to re-enable the semaphore
-   		replicateSemaphore.enable();
-   	}
-   	
-   	//Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's queues
-   	//the data is requested by the new node when it deploys its queues      
-   	
-   	if (!wasFirstNode && oldFailoverNodeID != this.failoverNodeID)
-   	{
-   		//Need to execute this on it's own thread since it uses the MessageDispatcher
-   		
-   		new Thread(
-	   		new Runnable() { 
-	   			public void run()
-	   			{
-	   				try
-	   				{
-	   					failoverNodeChanged(oldFailoverNodeID, firstNode, true);
-	   				}
-	   				catch (Exception e)
-	   				{
-	   					log.error("Failed to process failover node changed", e);
-	   				}
-	   			}
-	   		}).start();   		   		
-   	}
-   	
-      // Send a notification
-      
-      ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_JOIN, nodeId, null);
-      
-      clusterNotifier.sendNotification(notification);
-      
-      sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+      synchronized(jgroupsLock)
+      {
+         nodeIDAddressMap.put(new Integer(nodeId), info);
+
+         log.debug(this + " handleNodeJoined: " + nodeId + " size: " + nodeIDAddressMap.size());
+
+         final int oldFailoverNodeID = this.failoverNodeID;
+
+         boolean wasFirstNode = this.firstNode;
+
+         calculateFailoverMap();
+
+         if (wasFirstNode && useJGroupsWorkaround)
+         {
+            // If we were the first node but now another node has joined - we need to re-enable the semaphore
+            replicateSemaphore.enable();
+         }
+
+         // Note - when a node joins, we DO NOT send it replicated data - this is because it won't have deployed it's
+         // queues
+         // the data is requested by the new node when it deploys its queues
+
+         if (!wasFirstNode && oldFailoverNodeID != this.failoverNodeID)
+         {
+            // Need to execute this on it's own thread since it uses the MessageDispatcher
+
+            new Thread(new Runnable()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     failoverNodeChanged(oldFailoverNodeID, firstNode, true);
+                  }
+                  catch (Exception e)
+                  {
+                     log.error("Failed to process failover node changed", e);
+                  }
+               }
+            }).start();
+         }
+
+         // Send a notification
+
+         ClusterNotification notification = new ClusterNotification(ClusterNotification.TYPE_NODE_JOIN, nodeId, null);
+
+         clusterNotifier.sendNotification(notification);
+
+         sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+      }
    }
+   
+   //some node is finally dead and failed over
+   //so it's safe to remove it now.
+   public void handleNodeDead(int nodeId)
+   {
+      QuarantinedNode node = suspectedNodes.remove(nodeId);
+      log.info("Quarantined node " + node + " is finally dead.");
+   }
 
    /**
     * @param originatorNodeID - the ID of the node that initiated the modification.
@@ -1751,6 +2331,23 @@
                  "ALL_NODES " +
                  "FROM JBM_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=?");
 
+      map.put("UPDATE_STATE",
+              "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+      map.put("UPDATE_STATE",
+              "UPDATE JBM_CLUSTER SET STATE = ? WHERE NODE_ID = ?");
+
+      map.put("UPDATE_TIMESTAMP",
+              "UPDATE JBM_CLUSTER SET PING_TIMESTAMP = CURRENT_TIMESTAMP WHERE NODE_ID = ?");
+
+      map.put("LOAD_CLUSTER_STATE",
+              "SELECT NODE_ID, PING_TIMESTAMP, STATE, CURRENT_TIMESTAMP FROM JBM_CLUSTER");
+
+      map.put("INSERT_NODE_STATE",
+              "INSERT INTO JBM_CLUSTER (NODE_ID, PING_TIMESTAMP, STATE) VALUES (?, ?, ?)");
+      
+      map.put("DELETE_DEAD_NODE",
+              "DELETE FROM JBM_CLUSTER WHERE NODE_ID = ?");
       return map;
    }
 
@@ -1762,6 +2359,8 @@
               "QUEUE_NAME VARCHAR(255), CONDITION VARCHAR(1023), " +
               "SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT, " +
               "CLUSTERED CHAR(1), ALL_NODES CHAR(1), PRIMARY KEY(POSTOFFICE_NAME, NODE_ID, QUEUE_NAME))");
+      map.put("CREATE_CLUSTER_STATE_TABLE", "CREATE TABLE JBM_CLUSTER (NODE_ID INTEGER, " + 
+              "PING_TIMESTAMP DATETIME, STATE INTEGER, PRIMARY KEY(NODE_ID)) ENGINE = INNODB");
       return map;
    }
 
@@ -1895,6 +2494,8 @@
          failoverMap = new ConcurrentHashMap();
 
          leftSet = new ConcurrentHashSet();
+         
+         newNodes = new ArrayList<Address>();
       }
       
       replyExecutor = executorFactory.getExecutor("jbm-reply-executor");
@@ -1919,6 +2520,8 @@
          failoverMap = null;
 
          leftSet = null;
+         
+         newNodes = null;
       }
    	
    	replyExecutor.shutdownNow();   
@@ -2167,7 +2770,7 @@
 
             groupMember.multicastControl(request, sync);
          }      	
-      }      
+      }
       
       return added;
    }   
@@ -2272,7 +2875,12 @@
       	{
       		failoverNodeID = fid;
       		firstNode = false;	   		
-      	}	  
+      	}
+      	
+      	synchronized (jgroupsLock)
+      	{
+      	   jgroupsLock.notify();
+      	}
    	}
    	else
    	{
@@ -2905,8 +3513,8 @@
    	{
    		return;
    	}
-      class InsertBindings extends JDBCTxRunner
-      {
+      class InsertBindings extends JDBCTxRunner2
+      {         
          public Object doTransaction() throws Exception
          {
             PreparedStatement ps  = null;
@@ -2948,6 +3556,22 @@
 
                ps.executeUpdate();
             }
+            catch (SQLException e)
+            {
+               if (transactionDone)
+               {
+                  String sqlState = e.getSQLState();
+                  //23000(XA) or 23xxx (SQL:2003)
+                  if (sqlState.startsWith("23"))
+                  {
+                     //this is a constraint violation. i.e. the record identified by the same key already exists
+                     //This is fine, log a warning and go ahead.
+                     log.warn("We encountered a problem but the binding has already been inserted.");
+                     return null;
+                  }
+               }
+               throw e;
+            }
             finally
             {
                closeStatement(ps);
@@ -3405,6 +4029,13 @@
       //TODO - does this need to be inside the lock above?
       cleanDataForNode(failedNodeID);
       
+      if (!keepOldFailoverModel)
+      {
+         notification = new ClusterNotification(ClusterNotification.TYPE_NODE_FAILEDOVER, failedNodeID.intValue(), null);
+      
+         clusterNotifier.sendNotification(notification);
+      }
+
       log.debug(this + " announcing that failover procedure is complete");
 
       notification = new ClusterNotification(ClusterNotification.TYPE_FAILOVER_END, failedNodeID.intValue(), null);
@@ -3537,5 +4168,334 @@
 		}
    	
    }
+   
+   /*
+    * This thread does the following:
+    * 
+    * Periodically update the timestamp of this node
+    * and monitor its status. If it becomes quarantined, it will
+    * make itself to be a standalone
+    * it also monitors its buddy's state, if it is quarantined, see if it will really die.
+    * When it dies, trigger failover then.
+    */
+   private class StateMonitor extends Thread
+   {
+      private boolean working = true;
+      
+      public StateMonitor()
+      {
+         //force once
+         try
+         {
+            refreshNodeState();
+         }
+         catch (Exception e)
+         {
+            log.warn("first time refreshing node state error", e);
+         }
+      }
+      
+      public synchronized void run()
+      {
+         do
+         {
+            ClusterState newState;
+            boolean timeStampDone = false;
+            try
+            {
+               timeStampDone = refreshNodeState();
+               
+               if (timeStampDone)
+               {
+                  processClusterState();
+               }
+               else if (clusterState.isQuarantined(thisNodeID))
+               {
+                  log.error("I'm orphaned and now I can't tell others that I'm alive. Shutdown node: " + thisNodeID);
+                  serverPeer.stopJBMNodeForRecovery();
+                  working = false;
+                  nodeStateRefreshInterval = 1; //let the thread quite quickly.
+               }
 
+               try
+               {
+                  wait(nodeStateRefreshInterval);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+            catch (Exception e)
+            {
+               log.error("Error refreshing state of node: " + thisNodeID, e);
+            }
+
+         } while (working);
+         log.debug("Stop monitoring the stats at node " + thisNodeID);
+      }
+
+      public synchronized void shutdown()
+      {
+         working = false;
+         notify();
+      }
+
+      public synchronized void newQuarantined()
+      {
+         notify();
+      }
+   }
+   
+   //not for concurrent use!
+   private class ClusterState
+   {
+      Map<Integer, NodeState> states = new java.util.concurrent.ConcurrentHashMap<Integer, NodeState>();
+
+      private ClusterState(Map<Integer, NodeState> copy)
+      {
+         states = new HashMap<Integer, NodeState>(copy);
+      }
+      
+      public void updateNodeState(int nodeID, int newState) throws Exception
+      {
+         NodeState node = states.get(nodeID);
+         node.setState(newState);
+         updateStateInStorage(nodeID, newState, false);
+      }
+
+      public boolean allDeadButMe(int nodeID)
+      {
+         Iterator<NodeState> iter = states.values().iterator();
+         while (iter.hasNext())
+         {
+            NodeState node = iter.next();
+            if (!node.isDead() && (node.getID() != nodeID))
+            {
+               return false;
+            }
+         }
+         return true;
+      }
+
+      public void clear()
+      {
+         states.clear();
+      }
+
+      public ClusterState()
+      {
+      }
+
+      public void addNode(int nodeID, long timestamp, int nodeState, long checkPoint)
+      {
+         states.put(nodeID, new NodeState(nodeID, timestamp, nodeState, checkPoint));
+      }
+
+      public ClusterState copy()
+      {
+         return new ClusterState(states);
+      }
+
+      public boolean isQuarantined(int qNodeID)
+      {
+         NodeState nState = states.get(qNodeID);
+         if (nState == null) return false;
+         return nState.isQurarntined();
+      }
+
+      public Boolean isNodeDead(Integer qNodeID)
+      {
+         NodeState nState = states.get(qNodeID);
+
+         //if doesn't exists, it is already dead.
+         if (nState == null) return null;
+         
+         return nState.isDead();
+      }
+
+      public int liveNodeNum()
+      {
+         int liveNum = 0;
+         Iterator<NodeState> iter = states.values().iterator();
+         while (iter.hasNext())
+         {
+            NodeState s = iter.next();
+            if (!s.isDead())
+            {
+               liveNum++;
+            }
+         }
+         return liveNum;
+      }
+   }
+   
+   private class NodeState
+   {
+      private int nodeID;
+      private long timestamp;
+      private int state;
+      private long checkPoint;
+      
+      public NodeState(int nodeID, long timestamp, int state, long checkPoint)
+      {
+         this.nodeID = nodeID;
+         this.timestamp = timestamp;
+         this.state = state;
+         this.checkPoint = checkPoint;
+      }
+
+      public void setState(int newState)
+      {
+         state = newState;
+      }
+
+      public int getID()
+      {
+         return nodeID;
+      }
+
+      public boolean isQurarntined()
+      {
+         return state == STATE_QUARANTINED;
+      }
+
+      //don't rely on state, use timestamp
+      public boolean isDead()
+      {
+         long stampAge = checkPoint - timestamp;
+         if (stampAge > (2 * nodeStateRefreshInterval))
+         {
+            log.debug("Timestamp age of " + stampAge + "ms exceeds limit of " + (2 * nodeStateRefreshInterval) + "ms; treating node as dead.");
+            return true;
+         }
+         log.debug("Timestamp age of " + stampAge + "ms is under limit of " + (2 * nodeStateRefreshInterval) + "ms; treating node as healthy.");
+         return false;
+      }
+   }
+
+   public boolean isAvailable()
+   {
+      return !clusterState.isQuarantined(thisNodeID);
+   }
+
+   public boolean isKeepOldFailoverModel()
+   {
+      return keepOldFailoverModel;
+   }
+
+   public void setKeepOldFailoverModel(boolean isKeep)
+   {
+      keepOldFailoverModel = isKeep;
+   }
+   
+   public long getNodeStateRefreshInterval()
+   {
+      return nodeStateRefreshInterval;
+   }
+   
+   public void setNodeStateRefreshInterval(long newValue)
+   {
+      nodeStateRefreshInterval = newValue;
+   }
+
+   public void registerChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      groupMember.registerChannelInJmx(server, channelPartitionName);
+   }
+
+   public void unregisterChannelInJmx(MBeanServer server, String channelPartitionName)
+   {
+      groupMember.unregisterChannelInJmx(server, channelPartitionName);
+   }
+
+   public boolean waitForJGroups()
+   {
+      log.info("Waiting for JGroups...");
+      synchronized (jgroupsLock)
+      {
+         //if I am alone but there are still others there
+         while (isFirstNode())
+         {
+            int n = clusterState.liveNodeNum();
+
+            if (n == 1)
+            {
+               //only me in the cluster!
+               break;
+            }
+            if (newNodes.size() == n)
+            {
+               //all nodes joined
+               break;
+            }
+            try
+            {
+               jgroupsLock.wait(5000);
+            }
+            catch (InterruptedException e)
+            {
+            }
+         }
+
+         log.info("JGroups starts to work again, initializing state");
+         
+         //if we have new nodes, we need to
+         // 1) request states
+         // 2) announce myself
+         // 3) clear newNodes list
+         // then we are ready to go
+         if (isFirstNode() && (newNodes.size() > 1))
+         {
+            try
+            {
+               PostOfficeAddressInfo info = new PostOfficeAddressInfo(groupMember.getControlChannelAddress(),
+                                                                      groupMember.getDataChannelAddress());
+               boolean stateGot = groupMember.requestState();
+               
+               // requestState
+               if (!stateGot)
+               {
+                  log.info("couldn't get state, we are the first  (coordinator).");
+                  for (Address addr : newNodes)
+                  {
+                     if (!addr.equals(info.getControlChannelAddress()))
+                     {
+                        stateGot = groupMember.requestState(addr);
+                        if (stateGot) break;
+                     }
+                  }
+               }
+               
+               if (!stateGot)
+               {
+                  log.info("We cannot get state from JGroups.");
+                  return false;
+               }
+
+               nodeIDAddressMap.put(new Integer(thisNodeID), info);
+
+               // calculate failover map
+               calculateFailoverMap();
+
+               // announce myself again
+               groupMember.multicastControl(new JoinClusterRequest(thisNodeID, info), true);
+
+               String clientVMId = JMSClientVMIdentifier.instance;
+               
+               // we do it again as the jgroups was bad before
+               put(Replicator.JVM_ID_KEY, clientVMId);
+               // clear new nodes
+               newNodes.clear();
+            }
+            catch (Exception e)
+            {
+               log.error("Error initializing state", e);
+               return false;
+            }
+         }
+         log.info("Now node is ready for work.");
+         return true;
+      }
+   }
+   
 }

Copied: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java (from rev 8232, branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java)
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java	                        (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/QuarantinedNode.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+/**
+ * A QuarantinedNode
+ *
+ * @author howard
+ * 
+ * Created Jan 18, 2011 11:02:55 AM
+ *
+ *
+ */
+public class QuarantinedNode
+{
+   private Integer nodeID;
+   private Integer failoverID; //it must be an active node at the time of being quarantined.
+   private boolean crashed;
+
+   public QuarantinedNode(Integer leftNodeID, Integer failoverID, boolean crashed)
+   {
+      nodeID = leftNodeID;
+      this.failoverID = failoverID;
+      this.crashed = crashed;
+   }
+   
+   public String toString()
+   {
+      return "Quarantined Node[" + nodeID + "], failover[" + failoverID + "], crashed[" + crashed + "]";
+   }
+
+   public Integer getFailover()
+   {
+      return failoverID;
+   }
+
+   public void setFailover(Integer newFailover)
+   {
+      this.failoverID = newFailover;
+   }
+
+   public boolean shouldFailover()
+   {
+      return crashed;
+   }
+
+}

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/RequestTarget.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -68,4 +68,10 @@
    void handleAddAllReplicatedDeliveries(int nodeID, Map deliveries) throws Exception;
    
    void handleGetReplicatedDeliveries(String queueName, Address returnAddress) throws Exception;
+   
+   void handleNodeDead(int nodeId);
+
+   boolean isAvailable();
+   
+   Object getState() throws Exception;
 }

Copied: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java (from rev 8232, branches/JBM1842/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java)
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java	                        (rev 0)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/StateRequest.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2011, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.impl.postoffice;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+/**
+ * A StateRequest
+ *
+ * @author howard
+ * 
+ * Created Feb 8, 2011 12:00:55 AM
+ *
+ *
+ */
+public class StateRequest extends ClusterRequest
+{
+   public void write(DataOutputStream out) throws Exception
+   {
+   }
+
+   public void read(DataInputStream in) throws Exception
+   {
+   }
+
+   Object execute(RequestTarget office) throws Throwable
+   {
+      return office.getState();
+   }
+
+   byte getType()
+   {
+      return ClusterRequest.STATE_REQUEST;
+   }
+}

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -47,6 +47,7 @@
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.messaging.util.JMXAccessor;
+import org.jgroups.jmx.JmxConfigurator;
 import org.w3c.dom.Element;
 
 /**
@@ -99,7 +100,11 @@
    private int maxConcurrentReplications = 25;
    
    private boolean failoverOnNodeLeave;
+   
+   private boolean keepOldFailoverModel = true;
 
+   private long nodeStateRefreshInterval = 30000;
+
    private MessagingPostOffice postOffice;
 
    // Constructors --------------------------------------------------
@@ -344,8 +349,49 @@
 	      failoverOnNodeLeave = fover;
 	   }
 	}
+	
+	public boolean isKeepOldFailoverModel()
+	{
+	   if (started)
+	   {
+	      return postOffice.isKeepOldFailoverModel();
+	   }
+	   return keepOldFailoverModel;
+	}
+	
+	public void setKeepOldFailoverModel(boolean isKeep)
+	{
+      if (started)
+      {
+         postOffice.setKeepOldFailoverModel(isKeep);
+      }
+      else
+      {
+         keepOldFailoverModel = isKeep;
+      }
+	}
+	
+	public long getNodeStateRefreshInterval()
+	{
+      if (started)
+      {
+         return postOffice.getNodeStateRefreshInterval();
+      }
+      return nodeStateRefreshInterval;
+	}
+   
+   public void setNodeStateRefreshInterval(long newValue)
+   {
+      if (started)
+      {
+         postOffice.setNodeStateRefreshInterval(newValue);
+      }
+      else
+      {
+         nodeStateRefreshInterval = newValue;
+      }
+   }
 
-   
    public String listBindings()
    {
       return postOffice.printBindingInformation();
@@ -394,7 +440,7 @@
          FilterFactory ff = new SelectorFactory();
          
          if (clustered)
-         {        
+         {
             ChannelFactory jChannelFactory = null;
 
          	if (channelFactoryName != null)
@@ -450,7 +496,9 @@
                                                   failoverOnNodeLeave,
                                                   maxRetry,
                                                   retryInterval,
-                                                  retryOnConnectionFailure);
+                                                  retryOnConnectionFailure,
+                                                  keepOldFailoverModel,
+                                                  nodeStateRefreshInterval);
          }
          else
          {
@@ -468,6 +516,18 @@
          postOffice.start();
 
          started = true;
+         
+         if (clustered)
+         {
+            try
+            {
+               postOffice.registerChannelInJmx(server, channelPartitionName);
+            }
+            catch (Exception e)
+            {
+               log.error("Caught exception registering channel in JXM", e);
+            }
+         }
       }
       catch (Throwable t)
       {
@@ -481,6 +541,11 @@
       {
          throw new IllegalStateException("Service is not started");
       }
+      
+      if (clustered)
+      {
+         postOffice.unregisterChannelInJmx(server, channelPartitionName);
+      }
 
       super.stopService();
 

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2011-03-02 12:10:49 UTC (rev 8232)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -93,7 +93,7 @@
                                  sc.getPostOfficeSQLProperties(), true, nodeID,
                                  "Clustered", ms, pm, tr, ff, cf, idm, cn,
                                  groupName, jChannelFactory,
-                                 stateTimeout, castTimeout, true, 100, false, 25, 1000, false);
+                                 stateTimeout, castTimeout, true, 100, false, 25, 1000, false, true, 30000);
 
       postOffice.start();
 


Property changes on: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/bridge/BridgeMBeanExtraTest.java
___________________________________________________________________
Deleted: svn:mergeinfo
   - 

Copied: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ServerFailoverTest2.java (from rev 8232, branches/JBM1842/tests/src/org/jboss/test/messaging/jms/clustering/ServerFailoverTest2.java)
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ServerFailoverTest2.java	                        (rev 0)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ServerFailoverTest2.java	2011-03-03 05:45:01 UTC (rev 8233)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
+
+/**
+ * A ServerFailoverTest
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Jun 19, 2009 2:02:15 PM
+ *
+ *
+ */
+public class ServerFailoverTest2 extends ClusteringTestBase
+{
+   
+   private static final long REFRESH_INTERVAL = 10000;
+   
+   public ServerFailoverTest2(String name)
+   {
+      super(name);
+   }
+
+
+   public void setUp() throws Exception
+   {
+      nodeCount = 2;
+      
+      //make the failover follow new behavior
+      //https://issues.jboss.org/browse/JBMESSAGING-1842
+      overrides = new ServiceAttributeOverrides();
+      overrides.put(ServiceContainer.POSTOFFICE_OBJECT_NAME, "KeepOldFailoverModel", Boolean.FALSE);
+      overrides.put(ServiceContainer.POSTOFFICE_OBJECT_NAME, "NodeStateRefreshInterval", new Long(REFRESH_INTERVAL));
+      
+      super.setUp();
+   }
+
+   //start two nodes, send a message to node1, kill node1,
+   //receive from node0, the receive time should be greater than NodeStateRefreshInterval
+   public void testNewFailover() throws Exception
+   {
+      Connection conn = createConnectionOnServer(cf, 1);
+      
+      Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer prod = session.createProducer(queue[1]);
+      TextMessage message = session.createTextMessage("Message_testNewFailover");
+
+      prod.send(message);
+
+      conn.close();
+      
+      long start = System.currentTimeMillis();
+      
+      ServerManagement.kill(1);
+
+      conn = createConnectionOnServer(cf, 0);
+      
+      session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      
+      MessageConsumer consumer = session.createConsumer(queue[0]);
+      
+      conn.start();
+      
+      message = (TextMessage)consumer.receive(30000);
+      
+      assertNotNull(message);
+      
+      long stop = System.currentTimeMillis();
+      
+      assertEquals("Message_testNewFailover", message.getText());
+      
+      long duration = stop - start;
+      
+      assertTrue(duration > REFRESH_INTERVAL);
+      
+      conn.close();
+   }
+}
+
+
+
+
+



More information about the jboss-cvs-commits mailing list