[jboss-cvs] JBoss Messaging SVN: r8166 - in branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838: integration/EAP4/etc/server/default/deploy and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jan 6 13:03:02 EST 2011


Author: jbertram at redhat.com
Date: 2011-01-06 13:03:00 -0500 (Thu, 06 Jan 2011)
New Revision: 8166

Added:
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java
Modified:
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/.classpath
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JMSUserManager-xmbean.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/client/remoting/ClientSocketWrapper.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManagerService.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/Delivery.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
   branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java
Log:
JBPAPP-5738

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/.classpath
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/.classpath	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/.classpath	2011-01-06 18:03:00 UTC (rev 8166)
@@ -1,7 +1,8 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
 	<classpathentry kind="src" path="docs/examples/queue-failover/src"/>
-	<classpathentry kind="src" path="integration/EAP4/src"/>
+	<classpathentry kind="src" path="jgroups-src"/>
+	<classpathentry kind="src" path="integration/EAP4/src/main"/>
 	<classpathentry kind="src" path="integration/EAP4/tests-src"/>
 	<classpathentry kind="src" path="docs/examples/bridge/src"/>
 	<classpathentry kind="src" path="docs/examples/stateless-clustered/src"/>

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -66,7 +66,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT (CAST(? AS DECIMAL(19,0))), (CAST(? AS CHAR)), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))) FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -93,6 +93,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -62,7 +62,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -62,7 +62,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
    ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -62,7 +62,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -66,7 +66,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -93,6 +93,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -62,7 +62,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -67,7 +67,7 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
-   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -94,6 +94,9 @@
    SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
    INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
    DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+   UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+   CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? AND STATE='S'
+   LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JDBCPersistenceManager-xmbean.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JDBCPersistenceManager-xmbean.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -94,7 +94,25 @@
       <name>IDCacheSize</name>
       <type>int</type>
    </attribute>
+ 
+   <attribute access="read-write" getMethod="getMaxRetry" setMethod="setMaxRetry">
+      <description>Maximal retry times on DataSource failures, default 25</description>
+      <name>MaxRetry</name>
+      <type>int</type>
+   </attribute>
 
+   <attribute access="read-write" getMethod="getRetryInterval" setMethod="setRetryInterval">
+      <description>Retry interval (in milliseconds), default 1000 (1 sec)</description>
+      <name>RetryInterval</name>
+      <type>int</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getRetryOnConnectionFailure" setMethod="setRetryOnConnectionFailure">
+      <description>If retry on connection failure, default false</description>
+      <name>RetryOnConnectionFailure</name>
+      <type>boolean</type>
+   </attribute>
+
    <!-- Managed operations -->
 
    <operation>

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JMSUserManager-xmbean.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JMSUserManager-xmbean.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/JMSUserManager-xmbean.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -40,7 +40,25 @@
       <name>CreateTablesOnStartup</name>
       <type>boolean</type>
    </attribute>
+ 
+   <attribute access="read-write" getMethod="getMaxRetry" setMethod="setMaxRetry">
+      <description>Maximal retry times on DataSource failures, default 25</description>
+      <name>MaxRetry</name>
+      <type>int</type>
+   </attribute>
 
+   <attribute access="read-write" getMethod="getRetryInterval" setMethod="setRetryInterval">
+      <description>Retry interval (in milliseconds), default 1000 (1 sec)</description>
+      <name>RetryInterval</name>
+      <type>int</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getRetryOnConnectionFailure" setMethod="setRetryOnConnectionFailure">
+      <description>If retry on connection failure, default false</description>
+      <name>RetryOnConnectionFailure</name>
+      <type>boolean</type>
+   </attribute>
+
    <!-- Managed operations -->
 
    <operation>
@@ -63,4 +81,5 @@
       <name>destroy</name>
    </operation>   
 
-</mbean>
\ No newline at end of file
+</mbean>
+

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/integration/EAP4/etc/xmdesc/MessagingPostOffice-xmbean.xml	2011-01-06 18:03:00 UTC (rev 8166)
@@ -136,7 +136,25 @@
       <name>NodeIDView</name>
       <type>java.util.Set</type>
    </attribute>
+ 
+   <attribute access="read-write" getMethod="getMaxRetry" setMethod="setMaxRetry">
+      <description>Maximal retry times on DataSource failures, default 25</description>
+      <name>MaxRetry</name>
+      <type>int</type>
+   </attribute>
 
+   <attribute access="read-write" getMethod="getRetryInterval" setMethod="setRetryInterval">
+      <description>Retry interval (in milliseconds), default 1000 (1 sec)</description>
+      <name>RetryInterval</name>
+      <type>int</type>
+   </attribute>
+
+   <attribute access="read-write" getMethod="getRetryOnConnectionFailure" setMethod="setRetryOnConnectionFailure">
+      <description>If retry on connection failure, default false</description>
+      <name>RetryOnConnectionFailure</name>
+      <type>boolean</type>
+   </attribute>
+
    <!-- Managed operations -->
 
    <operation>
@@ -159,4 +177,5 @@
       <name>destroy</name>
    </operation>
      
-</mbean>
\ No newline at end of file
+</mbean>
+

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/client/remoting/ClientSocketWrapper.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/client/remoting/ClientSocketWrapper.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/client/remoting/ClientSocketWrapper.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -48,7 +48,7 @@
 {
    // Constants ------------------------------------------------------------------------------------
    final static private Logger log = Logger.getLogger(ClientSocketWrapper.class);
-   final static protected int CLOSING = 1;
+   final static protected int CLOSING = 254;
    
    // Static ---------------------------------------------------------------------------------------
 
@@ -90,6 +90,10 @@
       {
          int b = ((DataInputStream)getInputStream()).readByte();
          log.debug(this + ".checkConnection read " + b);
+         if (b != ACK)
+         {
+            throw new IOException("got " + b + " instead of " + ACK);
+         }
       }
       catch (IOException e)
       {
@@ -104,7 +108,7 @@
    {
       if (log.isTraceEnabled()) log.trace("checking open connection");
 
-      if (((DataInputStream)getInputStream()).available() > 1)
+      if (((DataInputStream)getInputStream()).available() > 0)
       {
          log.trace("remote endpoint has closed");
          throw new IOException("remote endpoint has closed");

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -485,7 +485,14 @@
                }
                try
                {
-                  conn.close();
+                  if (conn instanceof ServerConnectionEndpoint)
+                  {
+                     ((ServerConnectionEndpoint)conn).close(true);
+                  }
+                  else
+                  {
+                     conn.close();
+                  }
                }
                catch (Throwable ignore)
                {              

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -364,6 +364,11 @@
 
    public void close() throws JMSException
    {
+      close(false);
+   }
+
+   public void close(boolean isFromFailure) throws JMSException
+   {
       try
       {
          //reason for synchronization
@@ -392,7 +397,7 @@
          {
             ServerSessionEndpoint sess = (ServerSessionEndpoint)i.next();
 
-            sess.localClose();
+            sess.localClose(isFromFailure);
          }
 
          sessions.clear();

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -35,10 +35,12 @@
 import org.jboss.jms.server.selector.Selector;
 import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Channel;
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.DeliveryObserver;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
 import org.jboss.messaging.core.contract.Receiver;
@@ -320,8 +322,28 @@
             clientAccepting = false;
             
             firstTime = false;
-         }         
-         
+         }
+
+         // now for a remote sucker, we need to update the messages status
+         if (remote)
+         {
+            PersistenceManager pm = sessionEndpoint.getPersistenceManager();
+            if (ref.getMessage().isReliable() && messageQueue.isRecoverable())
+            {
+               try
+               {
+                  pm.updateMessageState(messageQueue.getChannelID(), ref, "S");
+               }
+               catch (Exception e)
+               {
+                  // we need to stop the sucking process. the message should be re-delivered.
+                  log.error("Failed to update state for message: " + ref, e);
+                  return null;
+               }
+            }
+            delivery.setSucked(true);
+         }
+
          try
          {
          	sessionEndpoint.handleDelivery(delivery, this);
@@ -647,6 +669,16 @@
       sessionEndpoint.promptDelivery(messageQueue);
    }
 
+   public long getChannelID()
+   {
+      return messageQueue.getChannelID();
+   }
+
+   public Channel getChannel()
+   {
+      return messageQueue;
+   }   
+
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -33,6 +33,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
@@ -74,11 +75,13 @@
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PersistenceManager.ReferenceInfo;
 import org.jboss.messaging.core.contract.PostOffice;
 import org.jboss.messaging.core.contract.Queue;
 import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.core.impl.IDManager;
 import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.SimpleDelivery;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionException;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -186,8 +189,11 @@
    private Object waitLock = new Object();
 
    private long lastSequence = -1;
+ 
+   private Map<Long, Long> failureCanceledDels;
+   
+   private AtomicBoolean isSuckerSession = new AtomicBoolean(false);
 
-
    // Constructors ---------------------------------------------------------------------------------
 
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -236,6 +242,8 @@
       defaultRedeliveryDelay = sp.getDefaultRedeliveryDelay();
 
       deliveries = new ConcurrentHashMap();
+
+      failureCanceledDels = new HashMap<Long, Long>();
    }
 
    // SessionDelegate implementation ---------------------------------------------------------------
@@ -1136,7 +1144,12 @@
 
    void localClose() throws Throwable
    {
-           
+      localClose(false);
+   }
+
+   void localClose(boolean isFromFailure) throws Throwable
+   {
+
       if (closed)
       {
          //don't throw the exception as it maybe called twice
@@ -1153,10 +1166,14 @@
       {
          consumersClone = new HashMap(consumers);
       }
+      
+      List<Channel> curChannels = new ArrayList<Channel>();
 
       for( Iterator i = consumersClone.values().iterator(); i.hasNext(); )
       {
-         ((ServerConsumerEndpoint)i.next()).localClose();
+         ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
+         curChannels.add(consumer.getChannel());
+         consumer.localClose();
       }
 
       consumers.clear();
@@ -1213,6 +1230,27 @@
 
             DeliveryRecord rec = (DeliveryRecord)entry.getValue();
 
+            // for a suck delivery, we need to update the state back to 'C'
+            if (rec.del.isSucked())
+            {
+               // we need to reverse the message (if still there). If reverse failed, we don't do
+               // cancel.
+               if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+               {
+                  try
+                  {
+                     // now ask pm to do it.
+                     pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+                  }
+                  catch (Exception e)
+                  {
+                     // if update failed, it must be a DB failure, we log the error and let others be canceled
+                     log.error("Failed to update message " + rec.del.getReference() + " to state C", e);
+                     continue;
+                  }
+               }
+            }
+
             /*
              * https://jira.jboss.org/jira/browse/JBMESSAGING-1440
              */
@@ -1222,8 +1260,55 @@
             }
 
             channels.add(rec.del.getObserver());
+
+            if (isFromFailure)
+            {
+               failureCanceledDels.put(rec.deliveryID, rec.deliveryID);
+            }
          }
 
+         if (isSuckerSession.get())
+         {
+            // here we handle rare cases where a sucker acked a message but then crashed.
+            // so the message won't be updated to target channel and also the session already
+            // forgets it. We take this chance here
+            // to load those messages into channels and redeliver
+            for (Channel ch : curChannels)
+            {
+               List<ReferenceInfo> refs = pm.claimMessagesInSuck(ch.getChannelID());
+
+               if (refs.size() > 0)
+               {
+                  List<Long> mids = new ArrayList<Long>();
+                  Map<Long, ReferenceInfo> refInfoMap = new HashMap<Long, ReferenceInfo>();
+
+                  for (ReferenceInfo refInfo : refs)
+                  {
+                     mids.add(refInfo.getMessageId());
+                     refInfoMap.put(refInfo.getMessageId(), refInfo);
+                  }
+
+                  List messages = pm.getMessages(mids);
+
+                  iter = messages.iterator();
+
+                  while (iter.hasNext())
+                  {
+                     Message m = (Message)iter.next();
+                     MessageReference mref = ms.reference(m);
+
+                     ReferenceInfo mInfo = refInfoMap.get(m.getMessageID());
+                     mref.setDeliveryCount(mInfo.getDeliveryCount());
+                     mref.setScheduledDeliveryTime(mInfo.getScheduledDelivery());
+                     
+                     Delivery del = new SimpleDelivery(ch, mref, true, true);
+                     del.cancel();
+                  }
+                  channels.add(ch);
+               }
+            }
+         }
+
          promptDelivery(channels);
 
          //Close down the executor
@@ -1600,19 +1685,40 @@
       synchronized (deliveries)
       {
          rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+         if (rec == null)
+         {
+            // The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
+            // and has failed over since recoverDeliveries won't have been called
+            if (trace)
+            {
+               log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
+            }
+            return null;
+         }
+         // now we check for suckers
+         if (rec.del.isSucked())
+         {
+            // we need to reverse the message (if still there). If reverse failed, we don't do
+            // cancel.
+            if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+            {
+               try
+               {
+                  // now ask pm to do it.
+                  pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+               }
+               catch (Exception e)
+               {
+                  if (trace)
+                  {
+                     log.trace("Failed to update message " + rec.del.getReference() + " to state C");
+                  }
+                  return null;
+               }
+            }
+         }
       }
 
-      if (rec == null)
-      {
-         //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
-      	//and has failed over since recoverDeliveries won't have been called
-      	if (trace)
-      	{
-      		log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
-      	}
-      	return null;
-      }
-
       //Note we check the flag *and* evaluate again, this is because the server and client clocks may
       //be out of synch and don't want to send back to the client a message it thought it has sent to
       //the expiry queue
@@ -1762,15 +1868,31 @@
       synchronized (deliveries)
       {
          rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+         if (rec == null)
+         {
+            // This can happen in one of the two cases:
+            //
+            // 1. If an ack comes in after failover, or
+            // 2. The session is closed due to server side connection failure notification processing.
+            // When a connection failure is detected at the server end, it will close all related server side
+            // sessions. As part of closing, any un-acked message will be canceled.
+            // if a normal client side ack comes in just after the session is thus being closed, and this ack
+            // has just been canceled, the client side ack will end up here.
+            //
+            // We treat the cases differently. For case 1, we can safely ignore it.
+            // For case 2, we must throw an exception to indicating that the ack failed and the message will be
+            // re-delivered.
+            if (failureCanceledDels.remove(ack.getDeliveryID()) != null)
+            {
+               // ack should fail
+               throw new JMSException("Message already canceled before this ack " + ack +
+                                      " and the message will be redelivered.");
+            }
+            log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
+            return false;
+         }
       }
 
-      if (rec == null)
-      {
-      	//This can happen if an ack comes in after failover
-         log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
-         return false;
-      }
-
       ServerConsumerEndpoint consumer = rec.getConsumer();
 
       if (consumer != null && consumer.isRemote())
@@ -1857,6 +1979,8 @@
 
       log.trace(this + " created and registered " + ep);
 
+      isSuckerSession.set(true);
+
       return stub;
    }
 
@@ -2397,9 +2521,40 @@
 
       public void afterRollback(boolean onePhase) throws TransactionException
       {
-         //One phase rollbacks never hit the server - they are dealt with locally only
-         //so this would only ever be executed for a two phase rollback.
+         if (log.isTraceEnabled())
+         {
+            log.trace(this + " afterRollback, onePhase: " + onePhase);
+         }
+         // One phase rollbacks usually don't hit the server - they are dealt with locally only
+         // but if one-phase commit fails, we need to rollback the delivery
+         if (onePhase)
+         {
+            // Remove the deliveries from the delivery map.
+            Iterator iter = delList.iterator();
+            while (iter.hasNext())
+            {
+               Long deliveryId = (Long)iter.next();
 
+               DeliveryRecord del = (DeliveryRecord)deliveries.remove(deliveryId);
+
+               if (del != null && del.replicating)
+               {
+                  // TODO - we could batch this in one message
+                  try
+                  {
+                     postOffice.sendReplicateAckMessage(del.queueName, del.del.getReference()
+                                                                              .getMessage()
+                                                                              .getMessageID());
+                  }
+                  catch (Exception e)
+                  {
+                     throw new TransactionException("Failed to handle send ack", e);
+                  }
+               }
+            }
+         }
+
+         // for a two phase rollback.
          //We don't do anything since cancellation is driven from the client.
       }
 
@@ -2409,4 +2564,9 @@
       }
    }
 
+   public PersistenceManager getPersistenceManager()
+   {
+      return pm;
+   }
+
 }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -64,6 +64,17 @@
       super(ds, tm, sqlProperties, createTablesOnStartup);
    }
       
+   public JDBCJMSUserManager(DataSource ds,
+                             TransactionManager tm,
+                             Properties sqlProperties,
+                             boolean createTablesOnStartup,
+                             int maxRetry,
+                             int retryInterval,
+                             boolean retryOnConnectionFailure)
+   {
+      super(ds, tm, sqlProperties, createTablesOnStartup, maxRetry, retryInterval, retryOnConnectionFailure);
+   }
+
    // JDBCSupport overrides ----------------------------
    
    protected Map getDefaultDMLStatements()
@@ -123,7 +134,7 @@
       
       try
       {
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
 
          ps = conn.prepareStatement(getSQLStatement("SELECT_PRECONF_CLIENTID"));
          
@@ -194,7 +205,7 @@
                
                tx = new TransactionWrapper();
                
-               conn = ds.getConnection();
+               conn = getConnectionWithRetry();
                
                st = conn.createStatement();
                

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManagerService.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManagerService.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/server/plugin/JDBCJMSUserManagerService.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -73,7 +73,8 @@
       {  
          TransactionManager tm = getTransactionManagerReference();
          
-         userManager = new JDBCJMSUserManager(ds, tm, sqlProperties, createTablesOnStartup);
+         userManager = new JDBCJMSUserManager(ds, tm, sqlProperties, createTablesOnStartup, maxRetry,
+                                              retryInterval, retryOnConnectionFailure);
          
          userManager.start();
          

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/tx/ResourceManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/jms/tx/ResourceManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -699,6 +699,22 @@
                throw new MessagingXAException(XAException.XA_RBCOMMFAIL, "A Throwable was caught in sending the transaction", t);
             }            
          }
+         else if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST)
+         {
+            // for one-phase commit, we may have a rollback exeption
+            if (t instanceof XAException)
+            {
+               throw new MessagingXAException(XAException.XA_RBOTHER,
+                                              "A Throwable was caught in sending one phase commit",
+                                              t);
+            }
+            else
+            {
+               throw new MessagingXAException(XAException.XA_RETRY,
+                                              "A Throwable was caught in sending one phase commit",
+                                              t);
+            }
+         }
          else
          {
             throw new MessagingXAException(XAException.XA_RETRY, "A Throwable was caught in sending the transaction", t);            

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/Delivery.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/Delivery.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -53,4 +53,8 @@
     * Mark if this delivery is with a prepared XA transaction.
     */
    boolean isXAPrepared();
+
+   boolean isSucked();
+
+   void setSucked(boolean isSucked);
 }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -99,6 +99,12 @@
    //if supports transaction creation time
    boolean supportsTxAge();
 
+   // update the status of the message
+   void updateMessageState(long channelID, MessageReference ref, String state) throws Exception;
+
+   // update messages state to 'C' of the channel whose state is 'S', and return their messages ids
+   List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception;
+
    // Interface value classes ----------------------------------------------------------------------
 
    class MessageChannelPair

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -598,7 +598,7 @@
    protected void cancelInternal(MessageReference ref) throws Exception
    {
       if (trace) { log.trace(this + " cancelling " + ref + " in memory"); }
-
+      
       synchronized (lock)
       {
          monitor.unmarkSending(ref);

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/ClusterRoundRobinDistributor.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -73,7 +73,6 @@
    public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
    {             
       //First try the local distributor
-   	
    	if (trace) { log.trace(this + " first trying with local distributor"); }
    	
    	Delivery del = localDistributor.handle(observer, ref, tx);
@@ -85,7 +84,7 @@
    		//If no local distributor takes the ref then we try the remote distributor
    		
    		if (trace) { log.trace(this + " trying with remote distributor"); }
-   		   		
+   		
    		del = remoteDistributor.handle(observer, ref, tx);
    		 		   	
    		if (trace) { log.trace(this + " remote distributor returned " + del); }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -36,6 +36,7 @@
 import org.jboss.messaging.util.StreamUtils;
 import org.jboss.messaging.util.Util;
 
+import javax.jms.JMSException;
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
 import javax.transaction.xa.Xid;
@@ -104,6 +105,12 @@
    
    private boolean supportsTxAge;
 
+   private int maxRetry = 25;
+
+   private int retryInterval = 1000;
+
+   private boolean retryOnConnectionFailure = false;
+
    // Constructors --------------------------------------------------
 
    public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -142,6 +149,50 @@
             maxParams, supportsBlobSelect, supportsSetNullOnBlobs, false, false, 0);
    }
 
+   public JDBCPersistenceManager(DataSource ds,
+                                 TransactionManager tm,
+                                 Properties sqlProperties,
+                                 boolean createTablesOnStartup,
+                                 boolean usingBatchUpdates,
+                                 boolean usingBinaryStream,
+                                 boolean usingTrailingByte,
+                                 int maxParams,
+                                 boolean supportsBlobSelect,
+                                 boolean supportsSetNullOnBlobs,
+                                 boolean detectDuplicates,
+                                 boolean useNDBFailoverStrategy,
+                                 int idCacheSize,
+                                 int maxRetry,
+                                 int retryInterval,
+                                 boolean retryOnConnectionFailure)
+   {
+      super(ds,
+            tm,
+            sqlProperties,
+            createTablesOnStartup,
+            useNDBFailoverStrategy,
+            maxRetry,
+            retryInterval,
+            retryOnConnectionFailure);
+
+      // usingBatchUpdates is currently ignored due to sketchy support from
+      // databases
+
+      this.usingBinaryStream = usingBinaryStream;
+
+      this.usingTrailingByte = usingTrailingByte;
+
+      this.maxParams = maxParams;
+
+      this.supportsBlobSelect = supportsBlobSelect;
+
+      this.supportsSetNullOnBlobs = supportsSetNullOnBlobs;
+
+      this.detectDuplicates = detectDuplicates;
+
+      this.idCacheSize = idCacheSize;
+   }
+
    // MessagingComponent overrides ---------------------------------
 
    public void start() throws Exception
@@ -156,7 +207,7 @@
 
       try
       {
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
          // JBossMessaging requires transaction isolation of READ_COMMITTED
          // Any looser isolation level and we cannot maintain consistency for
          // paging (HSQL)
@@ -331,7 +382,7 @@
       {
          List<PreparedTxInfo> transactions = new ArrayList<PreparedTxInfo>();
 
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
 
          if (supportsTxAge)
          {
@@ -905,7 +956,7 @@
 
       try
       {
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
 
          ps = conn.prepareStatement(getSQLStatement("LOAD_PAGED_REFS"));
 
@@ -975,7 +1026,7 @@
 
       try
       {
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
 
          // First we get the values for min() and max() page order
          ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
@@ -1637,6 +1688,12 @@
                   log.trace("Updated " + rows + " rows");
                }
 
+               if (rows == 0)
+               {
+                  // no message updated, should be canceled back already
+                  throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+               }
+
                return null;
             }
             finally
@@ -1862,7 +1919,7 @@
 
       try
       {
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
 
          st = conn.prepareStatement(getSQLStatement("LOAD_ID_CACHE"));
 
@@ -2947,6 +3004,12 @@
       map.put("INSERT_TRANSACTION_EXTRA", "INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)");
       map.put("DELETE_TRANSACTION_EXTRA", "DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?");
 
+      // sucker use
+      map.put("UPDATE_MESSAGE_STATE", "UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?");
+      map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'");
+      map.put("LOAD_REFS_IN_SUCK",
+              "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
+
       return map;
    }
 
@@ -3064,7 +3127,7 @@
 
       try
       {
-         conn = ds.getConnection();
+         conn = getConnectionWithRetry();
 
          ps = conn.prepareStatement(sqlQuery);
 
@@ -3308,4 +3371,97 @@
       return supportsTxAge;
    }
 
+   public void updateMessageState(final long channelID, final MessageReference ref, final String c) throws Exception
+   {
+      class UpdateMessageStateRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psReference = null;
+
+            try
+            {
+               psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_STATE"));
+
+               psReference.setString(1, c);
+
+               psReference.setLong(2, ref.getMessage().getMessageID());
+
+               psReference.setLong(3, channelID);
+
+               int rows = psReference.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Updated " + rows + " rows");
+               }
+
+               if (rows != 1)
+               {
+                  throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() +
+                                         " to state " +
+                                         c);
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psReference);
+            }
+         }
+      }
+
+      new UpdateMessageStateRunner().executeWithRetry();
+   }
+
+   public List<ReferenceInfo> claimMessagesInSuck(final long channelID) throws Exception
+   {
+      final List<ReferenceInfo> msgIDs = new ArrayList<ReferenceInfo>();
+
+      class MessageClaimRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            PreparedStatement ps2 = null;
+            ResultSet rs = null;
+
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("LOAD_REFS_IN_SUCK"));
+               ps.setLong(1, channelID);
+               rs = ps.executeQuery();
+
+               while (rs.next())
+               {
+                  long msgId = rs.getLong(1);
+                  int deliveryCount = rs.getInt(2);
+                  long sched = rs.getLong(3);
+
+                  ps2 = conn.prepareStatement(getSQLStatement("CLAIM_MESSAGE_IN_SUCK"));
+                  ps2.setLong(1, channelID);
+                  ps2.setLong(2, msgId);
+                  int rows = ps2.executeUpdate();
+
+                  if (trace)
+                  {
+                     log.trace("Message in suck claimed " + rows + " rows for message " + msgId);
+                  }
+
+                  msgIDs.add(new ReferenceInfo(msgId, deliveryCount, sched));
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return msgIDs;
+         }
+      }
+      new MessageClaimRunner().executeWithRetry();
+      return msgIDs;
+   }   
 }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/JDBCSupport.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -63,6 +63,12 @@
        
    protected boolean createTablesOnStartup = true;
       
+   private int maxRetry = 25;
+
+   private int retryInterval = 1000;
+
+   private boolean retryOnConnectionFailure = false;
+
    public JDBCSupport()
    {
       defaultDMLStatements = new LinkedHashMap();
@@ -89,13 +95,54 @@
       this.createTablesOnStartup = createTablesOnStartup;
    }
 
-    public JDBCSupport(DataSource ds, TransactionManager tm, Properties sqlProperties,
+   public JDBCSupport(DataSource ds, TransactionManager tm, Properties sqlProperties,
                       boolean createTablesOnStartup, boolean useNDBFailoverStrategy)
    {
       this(ds, tm, sqlProperties, createTablesOnStartup);
       this.useNDBFailoverStrategy = useNDBFailoverStrategy;
    }
 
+   public JDBCSupport(DataSource ds,
+                      TransactionManager tm,
+                      Properties sqlProperties,
+                      boolean createTablesOnStartup,
+                      boolean useNDBFailoverStrategy,
+                      int maxRetry,
+                      int retryInterval,
+                      boolean retryOnConnectionFailure)
+   {
+      this(ds, tm, sqlProperties, createTablesOnStartup);
+      this.useNDBFailoverStrategy = useNDBFailoverStrategy;
+      this.maxRetry = maxRetry;
+      this.retryInterval = retryInterval;
+      this.retryOnConnectionFailure = retryOnConnectionFailure;
+   }
+
+   public JDBCSupport(DataSource ds,
+                      TransactionManager tm,
+                      Properties sqlProperties,
+                      boolean createTablesOnStartup,
+                      int maxRetry,
+                      int retryInterval,
+                      boolean retryOnConnectionFailure)
+   {
+      this();
+
+      this.ds = ds;
+
+      this.tm = tm;
+
+      if (sqlProperties != null)
+      {
+         this.sqlProperties = sqlProperties;
+      }
+
+      this.createTablesOnStartup = createTablesOnStartup;
+      this.maxRetry = maxRetry;
+      this.retryInterval = retryInterval;
+      this.retryOnConnectionFailure = retryOnConnectionFailure;
+   }
+
    // MessagingComponent overrides ---------------------------------
    
    public void start() throws Exception
@@ -284,8 +331,8 @@
                   
          try
          {                        
-            conn = ds.getConnection();
-                        
+            conn = getConnectionWithRetry();
+
             String statementName = (String)i.next();
              
             String statement = getSQLStatement(statementName);
@@ -325,6 +372,111 @@
    
    // Innner classes ---------------------------------------------------------
    
+   public void setMaxRetry(int maxRetry)
+   {
+      this.maxRetry = maxRetry;
+   }
+
+   public int getMaxRetry()
+   {
+      return maxRetry;
+   }
+
+   public void setRetryInterval(int retryInterval)
+   {
+      this.retryInterval = retryInterval;
+   }
+
+   public int getRetryInterval()
+   {
+      return retryInterval;
+   }
+
+   public void setRetryOnConnectionFailure(boolean retryOnConnectionFailure)
+   {
+      this.retryOnConnectionFailure = retryOnConnectionFailure;
+   }
+
+   public boolean getRetryOnConnectionFailure()
+   {
+      return retryOnConnectionFailure;
+   }
+
+   public Connection getConnectionWithRetry(boolean autoCommit) throws Exception
+   {
+      Connection connection = null;
+      int retries = 0;
+      while (connection == null)
+      {
+         try
+         {
+            connection = ds.getConnection();
+            connection.setAutoCommit(autoCommit);
+         }
+         catch (Exception e)
+         {
+            if (trace)
+            {
+               log.trace("Failure in getting connection", e);
+            }
+            if (retryOnConnectionFailure)
+            {
+               if ((maxRetry != -1) && (retries == maxRetry))
+               {
+                  log.error("Retried " + retries + " times reconnection, now giving up", e);
+                  throw new IllegalStateException("Failed to get connection");
+               }
+               retries++;
+               log.warn("Trying reconnection again after a pause of " + retryInterval + " ms.");
+
+               Thread.sleep(retryInterval);
+            }
+            else
+            {
+               throw e;
+            }
+         }
+      }
+      return connection;
+   }
+
+   public Connection getConnectionWithRetry() throws Exception
+   {
+      Connection connection = null;
+      int retries = 0;
+      while (connection == null)
+      {
+         try
+         {
+            connection = ds.getConnection();
+         }
+         catch (Exception e)
+         {
+            if (trace)
+            {
+               log.trace("Failure in getting connection", e);
+            }
+            if (retryOnConnectionFailure)
+            {
+               if ((maxRetry != -1) && (retries == maxRetry))
+               {
+                  log.error("Retried " + retries + " times reconnection, now giving up", e);
+                  throw new IllegalStateException("Failed to get connection");
+               }
+               retries++;
+               log.warn("Trying reconnection again after a pause of " + retryInterval + " ms.");
+
+               Thread.sleep(retryInterval);
+            }
+            else
+            {
+               throw e;
+            }
+         }
+      }
+      return connection;
+   }   
+
    protected class TransactionWrapper
    {
       private javax.transaction.Transaction oldTx;
@@ -377,8 +529,6 @@
 
    protected abstract class JDBCTxRunner<T>
    {
-   	private static final int MAX_TRIES = 25;
-
    	protected Connection conn;
 
       private TransactionWrapper wrap;
@@ -389,7 +539,7 @@
 
 	      try
 	      {
-	         conn = ds.getConnection();
+	         conn = getConnectionWithRetry();
 
 	         return doTransaction();
 	      }
@@ -425,15 +575,15 @@
 	         {
   	            log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
 
-	            tries++;
-	            if (tries == MAX_TRIES)
+  	            if ((maxRetry != -1) && (tries == maxRetry))
 	            {
 	               log.error("Retried " + tries + " times, now giving up");
 	               throw new IllegalStateException("Failed to excecute transaction");
 	            }
-	            log.warn("Trying again after a pause");
-	            //Now we wait for a random amount of time to minimise risk of deadlock
-	            Thread.sleep((long)(Math.random() * 500));
+	            tries++;
+	            log.warn("Trying again after a pause of " + retryInterval + " ms.");
+	            
+	            Thread.sleep(retryInterval);
 	         }
 	      }
 		}
@@ -444,8 +594,6 @@
    
    protected abstract class JDBCTxRunner2<T>
    {
-      private static final int MAX_TRIES = 25;
-
       protected Connection conn;
       
       private boolean getConnectionFailed;
@@ -460,9 +608,7 @@
          {
             try
             {
-               conn = ds.getConnection();
-            
-               conn.setAutoCommit(false);
+               conn = getConnectionWithRetry(false);
             }
             catch (Exception e)
             {
@@ -537,15 +683,15 @@
                }
                log.warn("SQLException caught, SQLState " + e.getSQLState() + " code:" + e.getErrorCode() + "- assuming deadlock detected, try:" + (tries + 1), e);
 
-               tries++;
-               if (tries == MAX_TRIES)
+               if ((maxRetry != -1) && (tries == maxRetry))
                {
                   log.error("Retried " + tries + " times, now giving up");
                   throw new IllegalStateException("Failed to execute transaction");
                }
-               log.warn("Trying again after a pause");
-               //Now we wait for a random amount of time to minimise risk of deadlock
-               Thread.sleep((long)(Math.random() * 500));
+               tries++;
+               log.warn("Trying again after a pause of " + retryInterval + " ms.");
+
+               Thread.sleep((long)(retryInterval));
             }
          }
       }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -221,6 +221,8 @@
       synchronized (lock)
       {
          flushDownCache();
+
+         pm.claimMessagesInSuck(theChannelID);
                   
          PersistenceManager.InitialLoadInfo ili =
             pm.mergeAndLoad(theChannelID, this.channelID, fullSize - messageRefs.size(),
@@ -491,7 +493,12 @@
    {
    	return recoverDeliveriesTimeout;
    }
-   
+
+   public void load() throws Exception
+   {
+      super.load(clustered);
+   }
+
    //testing only
    
    public Map getRecoveryArea()

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -228,6 +228,15 @@
    {
    }
 
+   public void updateMessageState(long channelID, MessageReference ref, String state) throws Exception
+   {
+   }
+
+   public List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception
+   {
+      return Collections.emptyList();
+   }
+
 }
 
 class IDCounter

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -194,7 +194,7 @@
       }
    }
    
-   public void load() throws Exception
+   public void load(boolean isClustered) throws Exception
    {            
       synchronized (lock)
       {
@@ -207,6 +207,12 @@
          
          unload();
          
+         if (isClustered)
+         {
+            // claim possible sucked messages
+            pm.claimMessagesInSuck(channelID);
+         }
+
          //Load the unpaged references
          InitialLoadInfo ili = pm.loadFromStart(channelID, fullSize);
          

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -51,7 +51,7 @@
    private MessageReference reference;   
    private boolean recovered;
    private Transaction tx;
-
+   private boolean isSucked;
    private boolean trace = log.isTraceEnabled();
 
    // Constructors ---------------------------------------------------------------------------------
@@ -136,6 +136,16 @@
       return false;
    }
 
+   public void setSucked(boolean sucked)
+   {
+      isSucked = sucked;
+   }
+
+   public boolean isSucked()
+   {
+      return isSucked;
+   }
+
    // Package protected ----------------------------------------------------------------------------
    
    // Protected ------------------------------------------------------------------------------------

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -27,7 +27,6 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
 import org.jboss.jms.client.JBossSession;
@@ -212,28 +211,13 @@
       
       suspended = true;
 
-      try
-      {
-         consumer.closing(-1);
-      }
-      catch (Throwable t)
-      {
-         // Ignore
-      }
-      try
-      {
-         consumer.close();
-      }
-      catch (Throwable t)
-      {
-         // Ignore
-      }
-
       sourceSession = null;
 
       consumer = null;
 
       clientConsumer = null;
+
+      if (trace) { log.trace(this + " suspended"); }
    }
    
    
@@ -327,8 +311,6 @@
 		
 	public void onMessage(Message msg)
 	{
-		Transaction tx = null;
-				
 		try
 		{
 	      if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
@@ -375,15 +357,6 @@
 		catch (Exception e)
 		{
 			log.error("Failed to forward message", e);
-			
-			try
-			{
-				if (tx != null) tm.rollback();
-			}
-			catch (Throwable t)
-			{
-				if (trace) { log.trace("Failed to rollback tx", t); }
-			}
 		}
 	}
 }

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -127,6 +127,8 @@
       
       // We don't want to receive local messages on any of the channels
       controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+      controlChannel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
       
       MessageListener messageListener = new ControlMessageListener();
       
@@ -197,6 +199,7 @@
          {
             dataChannel = jChannelFactory.createDataChannel();
             dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+            dataChannel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
             Receiver dataReceiver = new DataReceiver();
             dataChannel.setReceiver(dataReceiver);
             dataChannel.connect(groupName + DATA_SUFFIX);

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -268,10 +268,13 @@
                               FilterFactory filterFactory,
                               ConditionFactory conditionFactory,
                               IDManager channelIDManager,
-                              ClusterNotifier clusterNotifier)
+                              ClusterNotifier clusterNotifier,
+                              int maxRetry,
+                              int retryInterval,
+                              boolean retryOnConnectionFailure)
       throws Exception
    {
-   	super (ds, tm, sqlProperties, createTablesOnStartup);
+   	super (ds, tm, sqlProperties, createTablesOnStartup, maxRetry, retryInterval, retryOnConnectionFailure);
 
       this.thisNodeID = nodeId;
       
@@ -319,11 +322,14 @@
                               long stateTimeout, long castTimeout,
                               boolean supportsFailover,
                               int maxConcurrentReplications,
-                              boolean failoverOnNodeLeave)
+                              boolean failoverOnNodeLeave,
+                              int maxRetry,
+                              int retryInterval,
+                              boolean retryOnConnectionFailure)
       throws Exception
    {
    	this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
-   		  filterFactory, conditionFactory, channelIDManager, clusterNotifier);
+   	     filterFactory, conditionFactory, channelIDManager, clusterNotifier, maxRetry, retryInterval, retryOnConnectionFailure);
      
       this.clustered = true;
       

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/tx/Transaction.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/tx/Transaction.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/impl/tx/Transaction.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 
+import javax.transaction.xa.XAException;
 import javax.transaction.xa.Xid;
 
 import org.jboss.logging.Logger;
@@ -202,25 +203,52 @@
       if (trace) { log.trace(this + " executing before commit hooks"); }
        
       boolean onePhase = state != STATE_PREPARED;
-      
-      if (firstCallback != null)
+
+      Iterator iter = null;
+
+      try
       {
-         firstCallback.beforeCommit(onePhase);
+
+         if (firstCallback != null)
+         {
+            firstCallback.beforeCommit(onePhase);
+         }
+
+         iter = callbacks.iterator();
+
+         while (iter.hasNext())
+         {
+            TxCallback callback = (TxCallback)iter.next();
+
+            callback.beforeCommit(onePhase);
+         }
+
+         state = STATE_COMMITTED;
+
+         if (trace)
+         {
+            log.trace(this + " committed");
+         }
+
       }
-      
-      Iterator iter = callbacks.iterator();
-      
-      while (iter.hasNext())
+      catch (Exception e)
       {
-         TxCallback callback = (TxCallback)iter.next();
-         
-         callback.beforeCommit(onePhase);
+         // for one-phase commit, we need to rollback the message.
+         if (onePhase)
+         {
+            if (trace)
+            {
+               log.trace(this + " one-phase commit results in rollback.");
+            }
+
+            rollback();
+
+            throw new XAException(XAException.XA_RBOTHER);
+         }
+         // for 2-pc commit, we throw the exception
+         throw e;
       }
       
-      state = STATE_COMMITTED;
-      
-      if (trace) { log.trace(this + " committed"); }
-      
       iter = callbacks.iterator();
       
       if (trace) { log.trace(this + " executing after commit hooks"); }
@@ -353,7 +381,7 @@
          
          if (callback instanceof TxCallbackEx)
          {
-            ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered);            
+            ((TxCallbackEx)callback).afterRollbackEx(onePhase, recovered || onePhase);
          }
          else
          {

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -95,7 +95,8 @@
             new JDBCPersistenceManager(ds, tm, sqlProperties,
                                        createTablesOnStartup, usingBatchUpdates,
                                        usingBinaryStream, usingTrailingByte, maxParams,
-                                       supportsBlobOnSelect, supportsSetNullOnBlobs, detectDuplicates, useNDBFailoverStrategy, idCacheSize);
+                                       supportsBlobOnSelect, supportsSetNullOnBlobs, detectDuplicates, useNDBFailoverStrategy, idCacheSize,
+                                       maxRetry, retryInterval, retryOnConnectionFailure);
 
          persistenceManager.start();
 

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/JDBCServiceSupport.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -61,6 +61,10 @@
    private ObjectName tmObjectName;
    private TransactionManager tm;
 
+   protected int maxRetry = 25;
+   protected int retryInterval = 1000;
+   protected boolean retryOnConnectionFailure = false;
+
    // Constructors ---------------------------------------------------------------------------------
 
    // ServiceMBeanSupport overrides ----------------------------------------------------------------
@@ -161,6 +165,36 @@
       createTablesOnStartup = b;
    }
 
+   public void setMaxRetry(int maxRetry)
+   {
+      this.maxRetry = maxRetry;
+   }
+
+   public int getMaxRetry()
+   {
+      return maxRetry;
+   }
+
+   public void setRetryInterval(int retryInterval)
+   {
+      this.retryInterval = retryInterval;
+   }
+
+   public int getRetryInterval()
+   {
+      return retryInterval;
+   }
+
+   public void setRetryOnConnectionFailure(boolean retryOnConnectionFailure)
+   {
+      this.retryOnConnectionFailure = retryOnConnectionFailure;
+   }
+
+   public boolean getRetryOnConnectionFailure()
+   {
+      return retryOnConnectionFailure;
+   }
+
    // Public ---------------------------------------------------------------------------------------
 
    // Package protected ----------------------------------------------------------------------------

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/src/main/org/jboss/messaging/core/jmx/MessagingPostOfficeService.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -447,7 +447,10 @@
 	                                               stateTimeout, castTimeout,
                                                   serverPeer.isSupportsFailover(),
                                                   maxConcurrentReplications,
-                                                  failoverOnNodeLeave);
+	                                               failoverOnNodeLeave,
+	                                               maxRetry,
+	                                               retryInterval,
+	                                               retryOnConnectionFailure);
          }
          else
          {
@@ -456,7 +459,10 @@
 											                 nodeId, officeName, ms,
 											                 pm,
 											                 tr, ff, cf, idManager,
-											                 clusterNotifier);
+         	                                      clusterNotifier,
+         	                                      maxRetry,
+         	                                      retryInterval,
+         	                                      retryOnConnectionFailure);
          }
 
          postOffice.start();

Copied: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java (from rev 8163, branches/RIM_patch/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java)
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java	                        (rev 0)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/FakeJDBCPersistenceManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2010, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.test.messaging.core;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.tx.Transaction;
+
+/**
+ * A FakeJDBCPersistenceManager
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ * 
+ * Created Dec 24, 2010 12:09:22 PM
+ *
+ *
+ */
+public class FakeJDBCPersistenceManager extends JDBCPersistenceManager
+{
+
+   public FakeJDBCPersistenceManager(DataSource ds,
+                                     TransactionManager tm,
+                                     Properties sqlProperties,
+                                     boolean createTablesOnStartup,
+                                     boolean usingBatchUpdates,
+                                     boolean usingBinaryStream,
+                                     boolean usingTrailingByte,
+                                     int maxParams,
+                                     boolean supportsBlobSelect,
+                                     boolean supportsSetNullOnBlobs)
+   {
+      super(ds,
+            tm,
+            sqlProperties,
+            createTablesOnStartup,
+            usingBatchUpdates,
+            usingBinaryStream,
+            usingTrailingByte,
+            maxParams,
+            supportsBlobSelect,
+            supportsSetNullOnBlobs);
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   boolean poisoned1pc;
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   public void handleBeforeCommit1PC(final List refsToAdd, final List refsToRemove, final Transaction tx) throws Exception
+   {
+      if (poisoned1pc) throw new Exception("Fake exception");
+      super.handleBeforeCommit1PC(refsToAdd, refsToRemove, tx);
+   }
+
+   public void poisonHandleCommit1PC()
+   {
+      poisoned1pc = true;
+   }
+   
+   public void restoreHandleCommit1PC()
+   {
+      poisoned1pc = false;
+   }
+
+}

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/JDBCPersistenceManagerTest.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -31,12 +31,14 @@
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.contract.Channel;
+import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.impl.IDManager;
 import org.jboss.messaging.core.impl.JDBCPersistenceManager;
+import org.jboss.messaging.core.impl.MessagingQueue;
 import org.jboss.messaging.core.impl.message.SimpleMessageStore;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -106,6 +108,33 @@
       return p;
    }
 
+   protected void doSetup2(boolean batch, boolean useBinaryStream, boolean trailingByte, int maxParams) throws Throwable
+   {
+      pm = createPM2(batch, useBinaryStream, trailingByte, maxParams);
+      ms = new SimpleMessageStore();
+   }
+
+   protected JDBCPersistenceManager createPM2(boolean batch,
+                                              boolean useBinaryStream,
+                                              boolean trailingByte,
+                                              int maxParams) throws Throwable
+   {
+      JDBCPersistenceManager p = new FakeJDBCPersistenceManager(sc.getDataSource(),
+                                                                sc.getTransactionManager(),
+                                                                sc.getPersistenceManagerSQLProperties(),
+                                                                true,
+                                                                batch,
+                                                                useBinaryStream,
+                                                                trailingByte,
+                                                                maxParams,
+                                                                !sc.getDatabaseName().equals("oracle") && !sc.getDatabaseName()
+                                                                                                             .equals("db2"),
+                                                                !sc.getDatabaseName().equals("db2"));
+      ((JDBCPersistenceManager)p).injectNodeID(1);
+      p.start();
+      return p;
+   }
+
    public void tearDown() throws Exception
    {
       sc.stop();
@@ -936,7 +965,71 @@
       assertTrue(containsMessage(ms, ref1.getMessage().getMessageID()));
    }
 
+   // https://issues.jboss.org/browse/JBMESSAGING-1837
+   public void testCommitOnePhaseFailure() throws Throwable
+   {
+      doSetup2(true, true, true, 100);
 
+      TransactionRepository txRep = new TransactionRepository(pm, ms, 0);
+      txRep.start();
+
+      MessagingQueue queue = new MessagingQueue(1, "queue1", 1, ms, pm, true, -1, null, false);
+      queue.activate();
+
+      SimpleReceiver r = new SimpleReceiver("AckingReceiver", SimpleReceiver.ACKING);
+      assertTrue(queue.getLocalDistributor().add(r));
+
+      SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
+
+      log.debug("sending a message");
+
+      Message[] messages = createMessages(1);
+
+      Message m1 = messages[0];
+
+      Transaction tx = txRep.createTransaction(new MockXid());
+
+      MessageReference ref1 = ms.reference(m1);
+
+      Delivery delivery = queue.handle(observer, ref1, null);
+
+      assertTrue(r.getMessages().size() == 1);
+
+      // simulating processing transaction
+      delivery.acknowledge(tx);
+
+      // poison pm
+      ((FakeJDBCPersistenceManager)pm).poisonHandleCommit1PC();
+
+      // commit one phase
+      try
+      {
+         tx.commit();
+      }
+      catch (Exception e)
+      {
+         // ignore the exception.
+      }
+
+      // message received again.
+      assertEquals(2, r.getMessages().size());
+      // delivering count 1
+      assertTrue(queue.getDeliveringCount() == 1);
+
+      // restore
+      ((FakeJDBCPersistenceManager)pm).restoreHandleCommit1PC();
+
+      // another tx
+      tx = txRep.createTransaction(new MockXid());
+      delivery.acknowledge(tx);
+      tx.commit();
+
+      // still received twice.
+      assertTrue(r.getMessages().size() == 2);
+      // delivering count 0
+      assertTrue(queue.getDeliveringCount() == 0);
+   }   
+
    protected Message createMessage(byte i, boolean reliable) throws Throwable
    {
       Map headers = generateFilledMap(true);

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/core/PostOfficeTestBase.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -95,7 +95,7 @@
                                  sc.getPostOfficeSQLProperties(), true, nodeID,
                                  "Clustered", ms, pm, tr, ff, cf, idm, cn,
                                  groupName, jChannelFactory,
-                                 stateTimeout, castTimeout, true, 100, false);
+                                 stateTimeout, castTimeout, true, 100, false, 25, 1000, false);
 
       postOffice.start();
 
@@ -114,7 +114,7 @@
       MessagingPostOffice postOffice =
          new MessagingPostOffice(sc.getDataSource(), sc.getTransactionManager(),
                                  sc.getPostOfficeSQLProperties(),
-                                 true, 1, "NonClustered", ms, pm, tr, ff, cf, idm, cn);
+                                 true, 1, "NonClustered", ms, pm, tr, ff, cf, idm, cn, 25, 1000, false);
 
       postOffice.start();
 

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -35,7 +35,6 @@
 
 import org.jboss.jms.client.JBossConnectionFactory;
 import org.jboss.jms.destination.JBossQueue;
-import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Delivery;
 import org.jboss.messaging.core.contract.DeliveryObserver;
 import org.jboss.messaging.core.contract.Distributor;
@@ -154,6 +153,12 @@
       return info.checkMessageSucked(messages);
    }
 
+   public void cleanDirtyMessages(int node) throws JMSException
+   {
+      FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
+      info.cleanMessages();
+   }   
+
    public String checkMessageNotSucked(int node) throws JMSException
    {
       FakeConnectionInfo info = (FakeConnectionInfo)connections.get(new Integer(node));
@@ -521,6 +526,11 @@
          return result;
       }
 
+      public void cleanMessages()
+      {
+         suckBuffer.clear();
+      }
+
       public String checkMessageNotSucked() throws JMSException
       {
          String result = null;

Modified: branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java
===================================================================
--- branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java	2011-01-06 18:01:12 UTC (rev 8165)
+++ branches/JBossMessaging_1_4_0_SP3_CP10_JBMESSAGING-1805_JBMESSAGING-1809_JBMESSAGING-1819_JBMESSAGING-1822_JBMESSAGING-1835_JBMESSAGING-1837_JBMESSAGING-1838/tests/src/org/jboss/test/messaging/jms/clustering/MessageSuckerTest.java	2011-01-06 18:03:00 UTC (rev 8166)
@@ -106,6 +106,8 @@
 
          // Now kill Node 0
          ServerManagement.stop(0);
+         
+         clusterConnMgr.cleanDirtyMessages(0);
 
          // Sucker connection should receive notification
          clusterConnMgr.checkConnectionFailureDetected(0);
@@ -232,6 +234,8 @@
 
          // Now kill Node 0
          ServerManagement.stop(0);
+         
+         clusterConnMgr.cleanDirtyMessages(0);
 
          // Sucker connection should receive notification
          clusterConnMgr.checkConnectionFailureDetected(0);



More information about the jboss-cvs-commits mailing list