[jboss-cvs] JBoss Messaging SVN: r8114 - in branches/Branch_1_4: integration/EAP4/etc/server/default/deploy and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 1 08:42:23 EDT 2010
Author: gaohoward
Date: 2010-11-01 08:42:21 -0400 (Mon, 01 Nov 2010)
New Revision: 8114
Modified:
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/db2-persistence-service.xml
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/ndb-persistence-service.xml
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/oracle-persistence-service.xml
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/postgresql-persistence-service.xml
branches/Branch_1_4/integration/AS5/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml
branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml
branches/Branch_1_4/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Log:
JBMESSAGING-1822
fix and test
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/db2-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/db2-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -66,7 +66,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT (CAST(? AS DECIMAL(19,0))), (CAST(? AS CHAR)), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))) FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID DECIMAL(19, 0) NOT NULL, START_TIME DECIMAL(19, 0), PRIMARY KEY (TRANSACTION_ID))
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mssql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mssql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -85,6 +85,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID BIGINT, START_TIME BIGINT, PRIMARY KEY (TRANSACTION_ID))
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mysql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/mysql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -85,6 +85,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID BIGINT, START_TIME BIGINT, PRIMARY KEY (TRANSACTION_ID)) ENGINE = INNODB
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/ndb-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/ndb-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -85,6 +85,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID BIGINT, START_TIME BIGINT, PRIMARY KEY (TRANSACTION_ID)) ENGINE = NDBCLUSTER
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/oracle-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/oracle-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -66,7 +66,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID INTEGER, START_TIME INTEGER, PRIMARY KEY (TRANSACTION_ID))
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/postgresql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/postgresql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -85,6 +85,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID BIGINT, START_TIME BIGINT, PRIMARY KEY (TRANSACTION_ID))
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/AS5/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/server/default/deploy/sybase-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/AS5/etc/server/default/deploy/sybase-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -67,7 +67,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -90,6 +90,13 @@
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
+ CREATE_TRANSACTION_EXTRA=CREATE TABLE JBM_TX_EX (TRANSACTION_ID DECIMAL(19, 0) NOT NULL, START_TIME DECIMAL(19, 0), PRIMARY KEY (TRANSACTION_ID))
+ SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
+ INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
+ DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/db2-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -66,7 +66,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT (CAST(? AS DECIMAL(19,0))), (CAST(? AS CHAR)), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))) FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -93,6 +93,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mssql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/ndb-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/oracle-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -66,7 +66,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -93,6 +93,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/postgresql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/sybase-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -67,7 +67,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -94,6 +94,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -66,7 +66,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT (CAST(? AS DECIMAL(19,0))), (CAST(? AS CHAR)), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))), (CAST(? AS DECIMAL(19,0))) FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -93,6 +93,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -66,7 +66,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -93,6 +93,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -62,7 +62,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -89,6 +89,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml 2010-11-01 12:42:21 UTC (rev 8114)
@@ -67,7 +67,7 @@
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
- MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ?, STATE='C' WHERE MESSAGE_ID = ? AND CHANNEL_ID = ? AND STATE='S'
LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)
@@ -94,6 +94,9 @@
SELECT_TRANSACTION_START_TIME_EXTRA=SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX
INSERT_TRANSACTION_EXTRA=INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?, ?)
DELETE_TRANSACTION_EXTRA=DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?
+ UPDATE_MESSAGE_STATE=UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?
+ CLAIM_MESSAGE_IN_SUCK=UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'
+ LOAD_REFS_IN_SUCK=SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -485,7 +485,14 @@
}
try
{
- conn.close();
+ if (conn instanceof ServerConnectionEndpoint)
+ {
+ ((ServerConnectionEndpoint)conn).close(true);
+ }
+ else
+ {
+ conn.close();
+ }
}
catch (Throwable ignore)
{
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -361,9 +361,14 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " stop");
}
}
-
+
public void close() throws JMSException
{
+ close(false);
+ }
+
+ public void close(boolean isFromFailure) throws JMSException
+ {
try
{
//reason for synchronization
@@ -392,7 +397,7 @@
{
ServerSessionEndpoint sess = (ServerSessionEndpoint)i.next();
- sess.localClose();
+ sess.localClose(isFromFailure);
}
sessions.clear();
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -21,6 +21,8 @@
*/
package org.jboss.jms.server.endpoint;
+import java.util.List;
+
import javax.jms.IllegalStateException;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
@@ -36,10 +38,12 @@
import org.jboss.jms.wireformat.Dispatcher;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
+import org.jboss.messaging.core.contract.Channel;
import org.jboss.messaging.core.contract.Delivery;
import org.jboss.messaging.core.contract.DeliveryObserver;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
+import org.jboss.messaging.core.contract.PersistenceManager;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.contract.Receiver;
@@ -54,6 +58,7 @@
*
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
* @version <tt>$Revision$</tt> $Id$
*/
public class ServerConsumerEndpoint implements Receiver, ConsumerEndpoint
@@ -323,6 +328,26 @@
firstTime = false;
}
+ //now for a remote sucker, we need to update the messages status
+ if (remote)
+ {
+ PersistenceManager pm = sessionEndpoint.getPersistenceManager();
+ if (ref.getMessage().isReliable() && messageQueue.isRecoverable())
+ {
+ try
+ {
+ pm.updateMessageState(messageQueue.getChannelID(), ref, "S");
+ }
+ catch (Exception e)
+ {
+ //we need to stop the sucking process. the message should be re-delivered.
+ log.error("Failed to update state for message: " + ref, e);
+ return null;
+ }
+ }
+ delivery.setSucked(true);
+ }
+
try
{
sessionEndpoint.handleDelivery(delivery, this);
@@ -653,6 +678,16 @@
sessionEndpoint.promptDelivery(messageQueue);
}
+ public long getChannelID()
+ {
+ return messageQueue.getChannelID();
+ }
+
+ public Channel getChannel()
+ {
+ return messageQueue;
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
@@ -74,11 +75,13 @@
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.MessageStore;
import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PersistenceManager.ReferenceInfo;
import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.contract.Queue;
import org.jboss.messaging.core.contract.Replicator;
import org.jboss.messaging.core.impl.IDManager;
import org.jboss.messaging.core.impl.MessagingQueue;
+import org.jboss.messaging.core.impl.SimpleDelivery;
import org.jboss.messaging.core.impl.tx.Transaction;
import org.jboss.messaging.core.impl.tx.TransactionException;
import org.jboss.messaging.core.impl.tx.TransactionRepository;
@@ -187,7 +190,11 @@
private long lastSequence = -1;
+ private Map<Long, Long> failureCanceledDels;
+ private AtomicBoolean isSuckerSession = new AtomicBoolean(false);
+
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -236,6 +243,8 @@
defaultRedeliveryDelay = sp.getDefaultRedeliveryDelay();
deliveries = new ConcurrentHashMap();
+
+ failureCanceledDels = new HashMap<Long, Long>();
}
// SessionDelegate implementation ---------------------------------------------------------------
@@ -1133,9 +1142,14 @@
}
}
}
-
+
void localClose() throws Throwable
{
+ localClose(false);
+ }
+
+ void localClose(boolean isFromFailure) throws Throwable
+ {
if (closed)
{
@@ -1154,9 +1168,13 @@
consumersClone = new HashMap(consumers);
}
+ List<Channel> curChannels = new ArrayList<Channel>();
+
for( Iterator i = consumersClone.values().iterator(); i.hasNext(); )
{
- ((ServerConsumerEndpoint)i.next()).localClose();
+ ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
+ curChannels.add(consumer.getChannel());
+ consumer.localClose();
}
consumers.clear();
@@ -1213,6 +1231,27 @@
DeliveryRecord rec = (DeliveryRecord)entry.getValue();
+ //for a suck delivery, we need to update the state back to 'C'
+ if (rec.del.isSucked())
+ {
+ //we need to reverse the message (if still there). If reverse failed, we don't do
+ //cancel.
+ if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+ {
+ try
+ {
+ //now ask pm to do it.
+ pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+ }
+ catch (Exception e)
+ {
+ //if update failed, it must be a DB failure, we log the error and let others be canceled
+ log.error("Failed to update message " + rec.del.getReference() + " to state C", e);
+ continue;
+ }
+ }
+ }
+
/*
* https://jira.jboss.org/jira/browse/JBMESSAGING-1440
*/
@@ -1222,7 +1261,53 @@
}
channels.add(rec.del.getObserver());
+
+ if (isFromFailure)
+ {
+ failureCanceledDels.put(rec.deliveryID, rec.deliveryID);
+ }
}
+
+ if (isSuckerSession.get())
+ {
+ //here we handle rare cases where a sucker acked a message but then crashed.
+ //so the message won't be updated to target channel and also the session already
+ //forgets it. We take this chance here
+ //to load those messages into channels and redeliver
+ for (Channel ch : curChannels)
+ {
+ List<ReferenceInfo> refs = pm.claimMessagesInSuck(ch.getChannelID());
+
+ if (refs.size() > 0)
+ {
+ List<Long> mids = new ArrayList<Long>();
+ Map<Long, ReferenceInfo> refInfoMap = new HashMap<Long, ReferenceInfo>();
+
+ for (ReferenceInfo refInfo : refs)
+ {
+ mids.add(refInfo.getMessageId());
+ refInfoMap.put(refInfo.getMessageId(), refInfo);
+ }
+
+ List messages = pm.getMessages(mids);
+
+ iter = messages.iterator();
+
+ while (iter.hasNext())
+ {
+ Message m = (Message)iter.next();
+ MessageReference mref = ms.reference(m);
+
+ ReferenceInfo mInfo = refInfoMap.get(m.getMessageID());
+ mref.setDeliveryCount(mInfo.getDeliveryCount());
+ mref.setScheduledDeliveryTime(mInfo.getScheduledDelivery());
+
+ Delivery del = new SimpleDelivery(ch, mref, true, true);
+ del.cancel();
+ }
+ }
+ }
+ }
promptDelivery(channels);
@@ -1600,19 +1685,40 @@
synchronized (deliveries)
{
rec = (DeliveryRecord)deliveries.remove(new Long(cancel.getDeliveryId()));
+ if (rec == null)
+ {
+ //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
+ //and has failed over since recoverDeliveries won't have been called
+ if (trace)
+ {
+ log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
+ }
+ return null;
+ }
+ //now we check for suckers
+ if (rec.del.isSucked())
+ {
+ //we need to reverse the message (if still there). If reverse failed, we don't do
+ //cancel.
+ if (rec.del.getReference().getMessage().isReliable() && rec.getConsumer().getChannel().isRecoverable())
+ {
+ try
+ {
+ //now ask pm to do it.
+ pm.updateMessageState(rec.getConsumer().getChannelID(), rec.del.getReference(), "C");
+ }
+ catch (Exception e)
+ {
+ if (trace)
+ {
+ log.trace("Failed to update message " + rec.del.getReference() + " to state C");
+ }
+ return null;
+ }
+ }
+ }
}
- if (rec == null)
- {
- //The delivery might not be found, if the session is not replicated (i.e. auto_ack or dups_ok)
- //and has failed over since recoverDeliveries won't have been called
- if (trace)
- {
- log.trace("Cannot find delivery to cancel, session probably failed over and is not replicated");
- }
- return null;
- }
-
//Note we check the flag *and* evaluate again, this is because the server and client clocks may
//be out of synch and don't want to send back to the client a message it thought it has sent to
//the expiry queue
@@ -1762,15 +1868,29 @@
synchronized (deliveries)
{
rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
+ if (rec == null)
+ {
+ //This can happen in one of the two cases:
+ //
+ //1. If an ack comes in after failover, or
+ //2. The session is closed due to server side connection failure notification processing.
+ //When a connection failure is detected at the server end, it will close all related server side
+ //sessions. As part of closing, any un-acked message will be canceled.
+ //if a normal client side ack comes in just after the session is thus being closed, and this ack
+ //has just been canceled, the client side ack will end up here.
+ //
+ //We treat the cases differently. For case 1, we can safely ignore it.
+ //For case 2, we must throw an exception to indicating that the ack failed and the message will be re-delivered.
+ if (failureCanceledDels.remove(ack.getDeliveryID()) != null)
+ {
+ //ack should fail
+ throw new JMSException("Message already canceled before this ack " + ack + " and the message will be redelivered.");
+ }
+ log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
+ return false;
+ }
}
- if (rec == null)
- {
- //This can happen if an ack comes in after failover
- log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
- return false;
- }
-
ServerConsumerEndpoint consumer = rec.getConsumer();
if (consumer != null && consumer.isRemote())
@@ -1856,6 +1976,8 @@
}
log.trace(this + " created and registered " + ep);
+
+ isSuckerSession.set(true);
return stub;
}
@@ -2409,4 +2531,9 @@
}
}
+ public PersistenceManager getPersistenceManager()
+ {
+ return pm;
+ }
+
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Delivery.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -53,4 +53,8 @@
* Mark if this delivery is with a prepared XA transaction.
*/
boolean isXAPrepared();
+
+ boolean isSucked();
+
+ void setSucked(boolean isSucked);
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -99,6 +99,12 @@
//if supports transaction creation time
boolean supportsTxAge();
+ //update the status of the message
+ void updateMessageState(long channelID, MessageReference ref, String state) throws Exception;
+
+ //update messages state to 'C' of the channel whose state is 'S', and return their messages ids
+ List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception;
+
// Interface value classes ----------------------------------------------------------------------
class MessageChannelPair
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -36,6 +36,7 @@
import org.jboss.messaging.util.StreamUtils;
import org.jboss.messaging.util.Util;
+import javax.jms.JMSException;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import javax.transaction.xa.Xid;
@@ -52,6 +53,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
* @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:hgao at jboss.org">Howard Gao</a>
*
* @version <tt>1.1</tt>
*
@@ -1636,6 +1638,12 @@
{
log.trace("Updated " + rows + " rows");
}
+
+ if (rows == 0)
+ {
+ //no message updated, should be canceled back already
+ throw new JMSException("Failed to move message " + ref.getMessage().getMessageID());
+ }
return null;
}
@@ -2946,6 +2954,11 @@
map.put("SELECT_TRANSACTION_START_TIME_EXTRA", "SELECT TRANSACTION_ID, START_TIME FROM JBM_TX_EX");
map.put("INSERT_TRANSACTION_EXTRA", "INSERT INTO JBM_TX_EX (TRANSACTION_ID, START_TIME) VALUES(?,?)");
map.put("DELETE_TRANSACTION_EXTRA", "DELETE FROM JBM_TX_EX WHERE TRANSACTION_ID=?");
+
+ //sucker use
+ map.put("UPDATE_MESSAGE_STATE", "UPDATE JBM_MSG_REF SET STATE = ? WHERE MESSAGE_ID = ? AND CHANNEL_ID = ?");
+ map.put("CLAIM_MESSAGE_IN_SUCK", "UPDATE JBM_MSG_REF SET STATE='C' WHERE CHANNEL_ID = ? AND STATE='S'");
+ map.put("LOAD_REFS_IN_SUCK", "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'S' AND CHANNEL_ID = ? ORDER BY ORD");
return map;
}
@@ -3308,4 +3321,96 @@
return supportsTxAge;
}
+ public void updateMessageState(final long channelID, final MessageReference ref, final String c) throws Exception
+ {
+ class UpdateMessageStateRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement psReference = null;
+
+ try
+ {
+ psReference = conn
+ .prepareStatement(getSQLStatement("UPDATE_MESSAGE_STATE"));
+
+ psReference.setString(1, c);
+
+ psReference.setLong(2, ref.getMessage().getMessageID());
+
+ psReference.setLong(3, channelID);
+
+ int rows = psReference.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Updated " + rows + " rows");
+ }
+
+ if (rows != 1)
+ {
+ throw new JMSException("Failed to update message " + ref.getMessage().getMessageID() + " to state " + c);
+ }
+
+ return null;
+ }
+ finally
+ {
+ closeStatement(psReference);
+ }
+ }
+ }
+
+ new UpdateMessageStateRunner().executeWithRetry();
+ }
+
+ public List<ReferenceInfo> claimMessagesInSuck(final long channelID) throws Exception
+ {
+ final List<ReferenceInfo> msgIDs = new ArrayList<ReferenceInfo>();
+
+ class MessageClaimRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ PreparedStatement ps2 = null;
+ ResultSet rs = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS_IN_SUCK"));
+ ps.setLong(1, channelID);
+ rs = ps.executeQuery();
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+ int deliveryCount = rs.getInt(2);
+ long sched = rs.getLong(3);
+
+ msgIDs.add(new ReferenceInfo(msgId, deliveryCount, sched));
+ }
+
+ ps2 = conn.prepareStatement(getSQLStatement("CLAIM_MESSAGE_IN_SUCK"));
+ ps2.setLong(1, channelID);
+ int rows = ps2.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Message in suck claimed " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return msgIDs;
+ }
+ }
+ new MessageClaimRunner().executeWithRetry();
+ return msgIDs;
+ }
+
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -231,6 +231,8 @@
synchronized (lock)
{
flushDownCache();
+
+ pm.claimMessagesInSuck(theChannelID);
PersistenceManager.InitialLoadInfo ili =
pm.mergeAndLoad(theChannelID, this.channelID, fullSize - messageRefs.size(),
@@ -502,6 +504,11 @@
return recoverDeliveriesTimeout;
}
+ public void load() throws Exception
+ {
+ super.load(clustered);
+ }
+
//testing only
public Map getRecoveryArea()
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -228,6 +228,15 @@
{
}
+ public void updateMessageState(long channelID, MessageReference ref, String state) throws Exception
+ {
+ }
+
+ public List<ReferenceInfo> claimMessagesInSuck(long channelID) throws Exception
+ {
+ return Collections.emptyList();
+ }
+
}
class IDCounter
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/PagingChannelSupport.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -194,7 +194,7 @@
}
}
- public void load() throws Exception
+ public void load(boolean isClustered) throws Exception
{
synchronized (lock)
{
@@ -207,6 +207,12 @@
unload();
+ if (isClustered)
+ {
+ //claim possible sucked messages
+ // pm.claimMessagesInSuck(channelID);
+ }
+
//Load the unpaged references
InitialLoadInfo ili = pm.loadFromStart(channelID, fullSize);
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/SimpleDelivery.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -51,6 +51,7 @@
private MessageReference reference;
private boolean recovered;
private Transaction tx;
+ private boolean isSucked;
private boolean trace = log.isTraceEnabled();
@@ -136,6 +137,16 @@
return false;
}
+ public void setSucked(boolean sucked)
+ {
+ isSucked = sucked;
+ }
+
+ public boolean isSucked()
+ {
+ return isSucked;
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -27,7 +27,6 @@
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
-import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.jboss.jms.client.JBossSession;
@@ -217,6 +216,8 @@
consumer = null;
clientConsumer = null;
+
+ if (trace) { log.trace(this + " suspended"); }
}
@@ -310,8 +311,6 @@
public void onMessage(Message msg)
{
- Transaction tx = null;
-
try
{
if (trace) { log.trace(this + " sucked message " + msg + " JMSDestination - " + msg.getJMSDestination()); }
@@ -358,15 +357,6 @@
catch (Exception e)
{
log.error("Failed to forward message", e);
-
- try
- {
- if (tx != null) tm.rollback();
- }
- catch (Throwable t)
- {
- if (trace) { log.trace("Failed to rollback tx", t); }
- }
}
}
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2010-11-01 11:57:14 UTC (rev 8113)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2010-11-01 12:42:21 UTC (rev 8114)
@@ -22,17 +22,23 @@
package org.jboss.test.messaging.jms.clustering;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.test.messaging.tools.ServerManagement;
/**
@@ -382,6 +388,133 @@
}
}
+ //https://jira.jboss.org/browse/JBMESSAGING-1822
+ //send 1000 messages to node 0, receive on node 1 as such
+ // 1. when receiving, kill node1
+ // 2. restart node1 and continue receive
+ // 3. repeat step 1 and 2 for 5 times
+ // check all messages are received.
+ public void testSuckFailureHandling() throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+
+ Map<String, TextMessage> msgs = new ConcurrentHashMap<String, TextMessage>();
+ JBossConnectionFactory cf0;
+
+ try
+ {
+ //we need to kill node 2, only leave two nodes.
+ //otherwise messages may be merged to node2 and never sucked again.
+ ServerManagement.kill(2);
+
+ cf0 = (JBossConnectionFactory)ic[0].lookup("/ConnectionFactory");
+
+ conn0 = cf0.createConnection();
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ final int NUM_MESSAGES = 1000;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("suckmsg-" + i);
+ prod0.send(tm);
+ }
+
+ startReceive(msgs);
+ startReceive(msgs);
+ startReceive(msgs);
+ startReceive(msgs);
+ startReceive(msgs);
+
+ JBossConnectionFactory cf1 = (JBossConnectionFactory)ic[1].lookup("/ConnectionFactory");
+ conn1 = cf1.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ conn1.start();
+
+ TextMessage rm = (TextMessage)cons1.receive(5000);
+
+ log.info("message num sucked: " + msgs.size());
+ while (rm != null)
+ {
+ rm.acknowledge();
+ log.info("message sucked: " + rm + " text=" + rm.getText());
+ msgs.put(rm.getText(), rm);
+ rm = (TextMessage)cons1.receive(5000);
+ }
+
+ log.info("all received: " + msgs.size());
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ if (msgs.get("suckmsg-" + i) == null)
+ {
+ log.error("==== missing: " + "suckmsg-" + i);
+ }
+ }
+
+ assertEquals(NUM_MESSAGES, msgs.size());
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+ private void startReceive(final Map<String, TextMessage> msgs) throws Exception
+ {
+ JBossConnectionFactory cf1 = (JBossConnectionFactory)ic[1].lookup("/ConnectionFactory");
+ Connection conn1 = cf1.createConnection();
+ Session sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+ cons1.setMessageListener(new MessageListener() {
+ public void onMessage(Message m)
+ {
+ try
+ {
+ log.error("receiving " + m);
+ TextMessage tm = (TextMessage)m;
+ log.info("message sucked: " + tm + " text: " + tm);
+ msgs.put(tm.getText(), tm);
+ m.acknowledge();
+ }
+ catch (JMSException e)
+ {
+ log.error("failed to ack " + m, e);
+ }
+ }
+ });
+ conn1.start();
+
+ try
+ {
+ Thread.sleep(800);
+ }
+ catch (InterruptedException e)
+ {
+ //ignore
+ }
+
+ //kill server 1
+ ServerManagement.kill(1);
+
+ ServerManagement.start(1, "all+http", false);
+ ServerManagement.deployQueue("testDistributedQueue", 1);
+ ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+ queue[1] = (Queue)ic[1].lookup("queue/testDistributedQueue");
+
+ conn1.close();
+ }
+
// Package private ---------------------------------------------
// protected ----------------------------------------------------
More information about the jboss-cvs-commits
mailing list