[jboss-cvs] JBoss Messaging SVN: r1372 - in trunk: src/etc/server/default/deploy src/main/org/jboss/jms/message src/main/org/jboss/jms/server src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/message src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/core/message tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/jms/persistence tests/src/org/jboss/test/messaging/tools/jmx/rmi tests/src/org/jboss/test/messaging/util
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Sep 28 07:44:45 EDT 2006
Author: timfox
Date: 2006-09-28 07:44:25 -0400 (Thu, 28 Sep 2006)
New Revision: 1372
Modified:
trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java
trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
trunk/src/main/org/jboss/jms/message/JBossMessage.java
trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java
trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
trunk/src/main/org/jboss/jms/message/JBossTextMessage.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/messaging/core/Message.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java
trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java
trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java
trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java
trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
Log:
Fixed some issues with reference counting
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml 2006-09-28 11:44:25 UTC (rev 1372)
@@ -38,12 +38,14 @@
LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
-SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
+SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?
LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-UPDATE_MESSAGE_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?
+INC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?
+DEC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?
DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
MESSAGEID_COLUMN=MESSAGEID
+MESSAGE_EXISTS=SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ? FOR UPDATE
INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml 2006-09-28 11:44:25 UTC (rev 1372)
@@ -10,55 +10,55 @@
<mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManagerService"
name="jboss.messaging:service=PersistenceManager"
- xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
+ xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
- <attribute name="UsingBatchUpdates">true</attribute>
+ <attribute name="UsingBatchUpdates">true</attribute>
<attribute name="MaxParams">500</attribute>
</mbean>
-
- <!-- Note that Hypersonic CANNOT be used for clustered post offices -->
-
+
+ <!-- Note that Hypersonic CANNOT be used for clustered post offices -->
+
<mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
name="jboss.messaging:service=QueuePostOffice"
- xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml">
- <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
+ xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml">
+ <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="PostOfficeName">Queue</attribute>
<attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="CreateTablesOnStartup">true</attribute>
</mbean>
-
+
<mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
name="jboss.messaging:service=TopicPostOffice"
- xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml">
+ xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml">
<depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="PostOfficeName">Topic</attribute>
<attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
- </mbean>
-
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ </mbean>
+
<mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManagerService"
name="jboss.messaging:service=JMSUserManager"
xmbean-dd="xmdesc/JMSUserManager-xmbean.xml">
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
- </mbean>
-
- <mbean code="org.jboss.messaging.core.plugin.JDBCShutdownLoggerService"
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ </mbean>
+
+ <mbean code="org.jboss.messaging.core.plugin.JDBCShutdownLoggerService"
name="jboss.messaging:service=ShutdownLogger"
xmbean-dd="xmdesc/JDBCShutdownLogger-xmbean.xml">
<depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
<depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
<attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
- </mbean>
-
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ </mbean>
+
</server>
\ No newline at end of file
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2006-09-28 11:44:25 UTC (rev 1372)
@@ -9,50 +9,52 @@
<server>
<mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManagerService"
- name="jboss.messaging:service=PersistenceManager"
- xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
- <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
- <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
- <attribute name="DataSource">java:/DefaultDS</attribute>
- <attribute name="CreateTablesOnStartup">true</attribute>
- <attribute name="UsingBatchUpdates">true</attribute>
- <attribute name="SqlProperties"><![CDATA[
-CREATE_MESSAGE_REFERENCE=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))
-CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)
-CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)
-CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (PAGE_ORD)
-CREATE_IDX_MESSAGE_REF_MESSAGEID=CREATE INDEX JMS_MESSAGE_REF_MESSAGEID ON JMS_MESSAGE_REFERENCE (MESSAGEID)
-CREATE_IDX_MESSAGE_REF_RELIABLE=CREATE INDEX JMS_MESSAGE_REF_RELIABLE ON JMS_MESSAGE_REFERENCE (RELIABLE)
-CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION VARCHAR(255), REPLYTO VARCHAR(255), JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
-CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
-CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
-INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
-DELETE_MESSAGE_REF=DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
-UPDATE_MESSAGE_REF=UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
-UPDATE_PAGE_ORDER=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = ? WHERE MESSAGEID=? AND CHANNELID=?
-COMMIT_MESSAGE_REF1=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'
-COMMIT_MESSAGE_REF2=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='-'
-ROLLBACK_MESSAGE_REF1=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='+'
-ROLLBACK_MESSAGE_REF2=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='-'
-LOAD_PAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, PAGE_ORD, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
-LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
-UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
-SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
-SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
-LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
-INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-UPDATE_MESSAGE_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?
-DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
-MESSAGEID_COLUMN=MESSAGEID
-INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
-DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
-SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
-UPDATE_COUNTER=UPDATE JMS_COUNTER SET NEXT_ID = ? WHERE NAME=?
-SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=? FOR UPDATE
-INSERT_COUNTER=INSERT INTO JMS_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
-SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE
- ]]></attribute>
- <attribute name="MaxParams">500</attribute>
+ name="jboss.messaging:service=PersistenceManager"
+ xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
+ <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+ <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+ <attribute name="DataSource">java:/DefaultDS</attribute>
+ <attribute name="CreateTablesOnStartup">true</attribute>
+ <attribute name="UsingBatchUpdates">true</attribute>
+ <attribute name="SqlProperties"><![CDATA[
+ CREATE_MESSAGE_REFERENCE=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))
+ CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)
+ CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)
+ CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (PAGE_ORD)
+ CREATE_IDX_MESSAGE_REF_MESSAGEID=CREATE INDEX JMS_MESSAGE_REF_MESSAGEID ON JMS_MESSAGE_REFERENCE (MESSAGEID)
+ CREATE_IDX_MESSAGE_REF_RELIABLE=CREATE INDEX JMS_MESSAGE_REF_RELIABLE ON JMS_MESSAGE_REFERENCE (RELIABLE)
+ CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION VARCHAR(255), REPLYTO VARCHAR(255), JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
+ CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
+ CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
+ INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ DELETE_MESSAGE_REF=DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
+ UPDATE_MESSAGE_REF=UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
+ UPDATE_PAGE_ORDER=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = ? WHERE MESSAGEID=? AND CHANNELID=?
+ COMMIT_MESSAGE_REF1=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'
+ COMMIT_MESSAGE_REF2=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='-'
+ ROLLBACK_MESSAGE_REF1=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='+'
+ ROLLBACK_MESSAGE_REF2=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='-'
+ LOAD_PAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, PAGE_ORD, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
+ LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
+ UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
+ SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
+ SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?
+ LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
+ INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ INC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?
+ DEC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?
+ DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
+ MESSAGEID_COLUMN=MESSAGEID
+ MESSAGE_EXISTS=SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ? FOR UPDATE
+ INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
+ DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
+ SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
+ UPDATE_COUNTER=UPDATE JMS_COUNTER SET NEXT_ID = ? WHERE NAME=?
+ SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=? FOR UPDATE
+ INSERT_COUNTER=INSERT INTO JMS_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
+ SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE
+ ]]></attribute>
+ <attribute name="MaxParams">500</attribute>
</mbean>
<mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
@@ -65,10 +67,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
</mbean>
@@ -82,10 +84,10 @@
<attribute name="DataSource">java:/DefaultDS</attribute>
<attribute name="CreateTablesOnStartup">true</attribute>
<attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME = ?
]]></attribute>
</mbean>
Modified: trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -98,7 +98,6 @@
byte priority,
Map coreHeaders,
byte[] payloadAsByteArray,
- int persistentChannelCount,
String jmsType,
String correlationID,
byte[] correlationIDBytes,
@@ -107,7 +106,7 @@
HashMap jmsProperties)
{
super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
- persistentChannelCount,jmsType, correlationID, correlationIDBytes, destination, replyTo,
+ jmsType, correlationID, correlationIDBytes, destination, replyTo,
jmsProperties);
baos = new ByteArrayOutputStream();
Modified: trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMapMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossMapMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -88,7 +88,6 @@
byte priority,
Map coreHeaders,
byte[] payloadAsByteArray,
- int persistentChannelCount,
String jmsType,
String correlationID,
byte[] correlationIDBytes,
@@ -97,7 +96,6 @@
HashMap jmsProperties)
{
super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
- persistentChannelCount,
jmsType, correlationID, correlationIDBytes, destination, replyTo,
jmsProperties);
}
Modified: trunk/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -243,12 +243,9 @@
public JBossMessage(long messageID)
{
this(messageID, true, 0, System.currentTimeMillis(), (byte)4,
- null, null, 0, null, null, null, null, null, null);
+ null, null, null, null, null, null, null, null);
}
- /*
- * This constructor is used to construct messages when retrieved from persistence storage
- */
public JBossMessage(long messageID,
boolean reliable,
long expiration,
@@ -256,7 +253,6 @@
byte priority,
Map coreHeaders,
byte[] payloadAsByteArray,
- int persistentChannelCount,
String jmsType,
String correlationID,
byte[] correlationIDBytes,
@@ -271,8 +267,7 @@
priority,
0,
coreHeaders,
- payloadAsByteArray,
- persistentChannelCount);
+ payloadAsByteArray);
this.jmsType = jmsType;
this.correlationID = correlationID;
Modified: trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -77,7 +77,6 @@
byte priority,
Map coreHeaders,
byte[] payloadAsByteArray,
- int persistentChannelCount,
String jmsType,
String correlationID,
byte[] correlationIDBytes,
@@ -86,7 +85,6 @@
HashMap jmsProperties)
{
super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
- persistentChannelCount,
jmsType, correlationID, correlationIDBytes, destination, replyTo,
jmsProperties);
}
Modified: trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -98,7 +98,6 @@
byte priority,
Map coreHeaders,
byte[] payloadAsByteArray,
- int persistentChannelCount,
String jmsType,
String correlationID,
byte[] correlationIDBytes,
@@ -107,7 +106,6 @@
HashMap jmsProperties)
{
super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
- persistentChannelCount,
jmsType, correlationID, correlationIDBytes, destination, replyTo,
jmsProperties);
}
Modified: trunk/src/main/org/jboss/jms/message/JBossTextMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossTextMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossTextMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -83,7 +83,6 @@
byte priority,
Map coreHeaders,
byte[] payloadAsByteArray,
- int persistentChannelCount,
String jmsType,
String correlationID,
byte[] correlationIDBytes,
@@ -92,7 +91,6 @@
HashMap jmsProperties)
{
super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
- persistentChannelCount,
jmsType, correlationID, correlationIDBytes, destination, replyTo,
jmsProperties);
}
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -212,9 +212,9 @@
//MBean dependencies
// Create the wired components
- messageIdManager = new IdManager("MESSAGE_ID", 8192, persistenceManager);
+ messageIdManager = new IdManager("MESSAGE_ID", 4096, persistenceManager);
channelIdManager = new IdManager("CHANNEL_ID", 10, persistenceManager);
- transactionIdManager = new IdManager("TRANSACTION_ID", 4096, persistenceManager);
+ transactionIdManager = new IdManager("TRANSACTION_ID", 1024, persistenceManager);
destinationJNDIMapper = new DestinationJNDIMapper(this);
connFactoryJNDIMapper = new ConnectionFactoryJNDIMapper(this);
connectionManager = new SimpleConnectionManager();
Modified: trunk/src/main/org/jboss/messaging/core/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Message.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/Message.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -41,13 +41,11 @@
Serializable getPayload();
byte[] getPayloadAsByteArray();
+
+ boolean isPersisted();
- void incPersistentChannelCount();
+ void setPersisted(boolean persisted);
- void decPersistentChannelCount();
-
- int getPersistentChannelCount();
-
byte getType();
}
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -340,7 +340,7 @@
{
// Now we remove the references we loaded (only the non persistent or persistent in a non-recoverable store)
- pm.removeReferences(channelID, toRemove);
+ pm.removeDepagedReferences(channelID, toRemove);
}
if (loadedReliable)
@@ -551,7 +551,7 @@
if (!toAdd.isEmpty())
{
- pm.addReferences(channelID, toAdd, true);
+ pm.pageReferences(channelID, toAdd, true);
}
if (!toUpdate.isEmpty())
{
Modified: trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -53,10 +53,9 @@
long timestamp,
byte priority,
Map headers,
- byte[] payload,
- int persistentChannelCount)
+ byte[] payload)
{
- super(messageID, reliable, expiration, timestamp, priority, 0, headers, payload, persistentChannelCount);
+ super(messageID, reliable, expiration, timestamp, priority, 0, headers, payload);
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -82,74 +82,85 @@
return m;
}
-
- public static Message createJBossMessage(long messageID,
- boolean reliable,
- long expiration,
- long timestamp,
- byte priority,
- Map coreHeaders,
- byte[] payloadAsByteArray,
- int persistentChannelCount,
- byte type,
- String jmsType,
- String correlationID,
- byte[] correlationIDBytes,
- JBossDestination destination,
- JBossDestination replyTo,
- HashMap jmsProperties)
+ /*
+ * Create a message from persistent storage
+ */
+ public static Message createMessage(long messageID,
+ boolean reliable,
+ long expiration,
+ long timestamp,
+ byte priority,
+ Map coreHeaders,
+ byte[] payloadAsByteArray,
+ byte type,
+ String jmsType,
+ String correlationID,
+ byte[] correlationIDBytes,
+ JBossDestination destination,
+ JBossDestination replyTo,
+ HashMap jmsProperties)
{
-
Message m = null;
- if (type == JBossMessage.TYPE)
+ switch (type)
{
- m = new JBossMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
- payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
- destination, replyTo, jmsProperties);
+ case JBossMessage.TYPE:
+ {
+ m = new JBossMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+ payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+ destination, replyTo, jmsProperties);
+ break;
+ }
+ case JBossObjectMessage.TYPE:
+ {
+ m = new JBossObjectMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+ payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+ destination, replyTo, jmsProperties);
+ break;
+ }
+ case JBossTextMessage.TYPE:
+ {
+ m = new JBossTextMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+ payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+ destination, replyTo, jmsProperties);
+ break;
+ }
+ case JBossBytesMessage.TYPE:
+ {
+ m = new JBossBytesMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+ payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+ destination, replyTo, jmsProperties);
+ break;
+ }
+ case JBossMapMessage.TYPE:
+ {
+ m = new JBossMapMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+ payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+ destination, replyTo, jmsProperties);
+ break;
+ }
+ case JBossStreamMessage.TYPE:
+ {
+ m = new JBossStreamMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+ payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+ destination, replyTo, jmsProperties);
+ break;
+ }
+ case CoreMessage.TYPE:
+ {
+ m = new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray);
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Unknown type " + type);
+ }
}
- else if (type == JBossObjectMessage.TYPE)
- {
- m = new JBossObjectMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
- payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
- destination, replyTo, jmsProperties);
- }
- else if (type == JBossTextMessage.TYPE)
- {
- m = new JBossTextMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
- payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
- destination, replyTo, jmsProperties);
- }
- else if (type == JBossBytesMessage.TYPE)
- {
- m = new JBossBytesMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
- payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
- destination, replyTo, jmsProperties);
- }
- else if (type == JBossMapMessage.TYPE)
- {
- m = new JBossMapMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
- payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
- destination, replyTo, jmsProperties);
- }
- else if (type == JBossStreamMessage.TYPE)
- {
- m = new JBossStreamMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
- payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
- destination, replyTo, jmsProperties);
- }
- else if (type == CoreMessage.TYPE)
- {
- m = new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray, persistentChannelCount);
- }
- else
- {
- throw new IllegalArgumentException("Unknow type " + type);
- }
-
+
+ m.setPersisted(true);
+
return m;
-
}
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -56,17 +56,8 @@
// Must be hidden from subclasses
private byte[] payloadAsByteArray;
- // private transient boolean inStorage;
+ private transient boolean persisted;
- /*
- * We maintain a persistent channel count on the message itself.
- * This is the total number of channels whether loaded in memory or not that hold a reference to the
- * message and is needed to know when it is safe to remove the message from the db
- * A channel may not have all it's references loaded into memory at once, also not all
- * channels might be active at once so we can't rely on the in memory channel count.
- */
- private transient int persistentChannelCount;
-
// Constructors --------------------------------------------------
/**
@@ -124,12 +115,10 @@
byte priority,
int deliveryCount,
Map headers,
- byte[] payloadAsByteArray,
- int persistentChannelCount)
+ byte[] payloadAsByteArray)
{
super(messageID, reliable, expiration, timestamp, priority, deliveryCount, headers);
this.payloadAsByteArray = payloadAsByteArray;
- this.persistentChannelCount = persistentChannelCount;
}
protected MessageSupport(MessageSupport that)
@@ -238,21 +227,17 @@
this.payloadAsByteArray = null;
}
- public synchronized void decPersistentChannelCount()
+ public synchronized boolean isPersisted()
{
- persistentChannelCount--;
+ return persisted;
}
- public synchronized void incPersistentChannelCount()
+ public synchronized void setPersisted(boolean persisted)
{
- persistentChannelCount++;
+ this.persisted = persisted;
}
- public synchronized int getPersistentChannelCount()
- {
- return persistentChannelCount;
- }
-
+
// Public --------------------------------------------------------
public boolean equals(Object o)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -161,10 +161,7 @@
{
//TODO This will need locking (e.g. SELECT ... FOR UPDATE...) in the clustered case
- if (trace)
- {
- log.trace("Getting id block for counter: " + counterName + " ,size: " + size);
- }
+ if (trace) { log.trace("Getting id block for counter: " + counterName + " ,size: " + size); }
if (size <= 0)
{
@@ -201,11 +198,8 @@
int rows = ps.executeUpdate();
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("INSERT_COUNTER"), counterName)
- + " inserted " + rows + " rows");
- }
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("INSERT_COUNTER"), counterName)
+ + " inserted " + rows + " rows"); }
ps.close();
ps = null;
@@ -213,10 +207,7 @@
return 0;
}
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("SELECT_COUNTER"), counterName));
- }
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("SELECT_COUNTER"), counterName)); }
long nextId = rs.getLong(1);
@@ -233,12 +224,8 @@
int rows = ps.executeUpdate();
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_COUNTER"), new Long(nextId + size),
- counterName)
- + " updated " + rows + " rows");
- }
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_COUNTER"), new Long(nextId + size),
+ counterName) + " updated " + rows + " rows"); }
return nextId;
}
@@ -278,10 +265,8 @@
public void updateReliableReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
{
- if (trace)
- {
- log.trace("Updating reliable references for channel " + channelID + " between " + orderStart + " and " + orderEnd);
- }
+ if (trace) { log.trace("Updating reliable references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
+
Connection conn = null;
PreparedStatement ps = null;
TransactionWrapper wrap = new TransactionWrapper();
@@ -308,12 +293,8 @@
{
int rows = ps.executeUpdate();
- if (trace)
- {
- log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"), new Long(channelID),
- new Long(orderStart), new Long(orderEnd))
- + " updated " + rows + " rows");
- }
+ if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"), new Long(channelID),
+ new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
if (tries > 0)
{
log.warn("Update worked after retry");
@@ -330,14 +311,19 @@
catch (SQLException e)
{
log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+
tries++;
+
if (tries == MAX_TRIES)
{
log.error("Retried " + tries + " times, now giving up");
+
throw new IllegalStateException("Failed to update references");
}
+
log.warn("Trying again after a pause");
- //Now we wait for a random amount of time to minimise risk of deadlock
+
+ //Now we wait for a random amount of time to minimise risk of deadlock occurring again
Thread.sleep((long)(Math.random() * 500));
}
}
@@ -384,10 +370,7 @@
*/
public List getMessages(List messageIds) throws Exception
{
- if (trace)
- {
- log.trace("Getting batch of messages for " + messageIds);
- }
+ if (trace) { log.trace("Getting batch of messages for " + messageIds); }
Connection conn = null;
PreparedStatement ps = null;
@@ -460,6 +443,8 @@
byte[] bytes = getBytes(rs, 6);
HashMap coreHeaders = bytesToMap(bytes);
byte[] payload = getBytes(rs, 7);
+
+ //TODO we don't need this
int persistentChannelCount = rs.getInt(8);
//TODO - We are mixing concerns here
@@ -507,16 +492,16 @@
}
}
- m = MessageFactory.createJBossMessage(messageId, reliable, expiration, timestamp, priority,
- coreHeaders, payload, persistentChannelCount,
+ m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
+ coreHeaders, payload,
type, jmsType, correlationID, correlationIDBytes,
dest, replyToDest,
jmsProperties);
}
else
{
- m = MessageFactory.createJBossMessage(messageId, reliable, expiration, timestamp, priority,
- coreHeaders, payload, persistentChannelCount, type,
+ m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
+ coreHeaders, payload, type,
null, null, null, null, null, null);
}
@@ -532,10 +517,7 @@
}
}
- if (trace)
- {
- log.trace("Loaded " + msgs.size() + " messages in total");
- }
+ if (trace) { log.trace("Loaded " + msgs.size() + " messages in total"); }
return msgs;
}
@@ -580,19 +562,21 @@
}
}
- public void addReferences(long channelID, List references, boolean paged) throws Exception
+ public void pageReferences(long channelID, List references, boolean paged) throws Exception
{
Connection conn = null;
PreparedStatement psInsertReference = null;
PreparedStatement psInsertMessage = null;
PreparedStatement psUpdateMessage = null;
+ PreparedStatement psMessageExists = null;
+ ResultSet rsMessageExists = null;
TransactionWrapper wrap = new TransactionWrapper();
//First we order the references in message order
orderReferences(references);
- List addsToReverse = new ArrayList();
-
+ log.info("adding references: " + references.size());
+
try
{
//Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
@@ -609,7 +593,7 @@
{
psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
}
while (iter.hasNext())
@@ -647,31 +631,57 @@
if (!usingBatchUpdates)
{
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
}
//Maybe we need to persist the message itself
Message m = ref.getMessage();
- m.incPersistentChannelCount();
+ //In a paging situation, we cannot use the persisted flag on the messager to determine whether
+ //to insert the message or not.
+ //This is because a channel (possibly on another node) may be paging too and referencing
+ //the same message, and might have removed the message independently, the other
+ //channel will not know about this.
+ //Therefore we have to check if the message is already in the database and insert it if it isn't
- addsToReverse.add(ref);
-
+ //TODO This is a bit of a hassle -
+ //A cleaner and better solution here is to completely separate out the paging functionality from the
+ //standard persistence functionality since it complicates things considerably.
+ //We should define a paging store which is separate from the persistence store, and
+ //typically not using the database for the paging store - probably use a file based store
+ //e.g HOWL or some other logger
+
+ //Note when running this with two or more competing channels in the same process, then
+ //we do not need a FOR UPDATE on the select since we lock the messages in memory
+ //However for competing nodes, we do, therefore we require a database that supports
+ //this, this is another reason why we cannot use HSQL in a clustered environment
+ //since it does not have a for update equivalent
+
+ psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));
+
+ psMessageExists.setLong(1, m.getMessageID());
+
+ rsMessageExists = psMessageExists.executeQuery();
+
boolean added;
- if (m.getPersistentChannelCount() == 1)
+
+ if (rsMessageExists.next())
{
- //Hasn't been persisted before so need to persist the message
- storeMessage(m, psInsertMessage);
+ //Message exists
- added = true;
+ // Update the message with the new channel count
+ incrementChannelCount(m, psUpdateMessage);
+
+ added = false;
+
}
else
{
- //Update the message with the new channel count
- updateMessageChannelCount(m, psUpdateMessage);
+ //Hasn't been persisted before so need to persist the message
+ storeMessage(m, psInsertMessage);
- added = false;
- }
+ added = true;
+ }
if (usingBatchUpdates)
{
@@ -717,28 +727,19 @@
{
int[] rowsReference = psInsertReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
if (messageInsertsInBatch)
{
int[] rowsMessage = psInsertMessage.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
}
if (messageUpdatesInBatch)
{
int[] rowsMessage = psUpdateMessage.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INC_CHANNELCOUNT"), rowsMessage, "updated"); }
}
psInsertReference.close();
@@ -747,7 +748,7 @@
psInsertMessage = null;
psUpdateMessage.close();
psUpdateMessage = null;
- }
+ }
}
catch (Exception e)
{
@@ -791,7 +792,6 @@
try
{
conn.close();
- //dwtwsbtwotl
}
catch (Throwable t)
{
@@ -803,19 +803,13 @@
}
finally
{
- if (wrap.isFailed())
- {
- //Reverse the incs
- this.decPersistentCounts(addsToReverse);
- }
-
//And then release locks
this.releaseLocks(references);
}
}
}
- public void removeReferences(long channelID, List references) throws Exception
+ public void removeDepagedReferences(long channelID, List references) throws Exception
{
if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
@@ -827,9 +821,7 @@
//We order the references
orderReferences(references);
-
- List removesToReverse = new ArrayList();
-
+
try
{
//We get locks on all the messages - since they are ordered we avoid deadlock
@@ -839,14 +831,11 @@
Iterator iter = references.iterator();
- boolean messageDeletionsInBatch = false;
- boolean messageUpdatesInBatch = false;
-
if (usingBatchUpdates)
{
psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
while (iter.hasNext())
@@ -868,10 +857,7 @@
{
int rows = psDeleteReference.executeUpdate();
- if (trace)
- {
- log.trace("Deleted " + rows + " rows");
- }
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
psDeleteReference.close();
psDeleteReference = null;
@@ -880,66 +866,35 @@
if (!usingBatchUpdates)
{
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
Message m = ref.getMessage();
//Maybe we need to delete the message itself
- m.decPersistentChannelCount();
+ //Update the message with the new channel count
+ decrementChannelCount(m, psUpdateMessage);
- removesToReverse.add(ref);
-
- boolean removed;
- if (m.getPersistentChannelCount() == 0)
+ //Run the remove message update
+ removeMessage(m, psDeleteMessage);
+
+ if (usingBatchUpdates)
{
- //No more refs so remove the message
- removeMessage(m, psDeleteMessage);
+ psUpdateMessage.addBatch();
- removed = true;
+ psDeleteMessage.addBatch();
}
else
{
- //Update the message with the new channel count
- updateMessageChannelCount(m, psUpdateMessage);
+ int rows = psUpdateMessage.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
- removed = false;
- }
+ rows = psDeleteMessage.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
- if (usingBatchUpdates)
- {
- if (removed)
- {
- psDeleteMessage.addBatch();
- messageDeletionsInBatch = true;
- }
- else
- {
- psUpdateMessage.addBatch();
- messageUpdatesInBatch = true;
- }
- }
- else
- {
- if (removed)
- {
- int rows = psDeleteMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("Deleted " + rows + " rows");
- }
- }
- else
- {
- int rows = psUpdateMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("Updated " + rows + " rows");
- }
- }
psDeleteMessage.close();
psDeleteMessage = null;
psUpdateMessage.close();
@@ -951,30 +906,16 @@
{
int[] rowsReference = psDeleteReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
- if (messageDeletionsInBatch)
- {
- int[] rowsMessage = psDeleteMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rowsMessage, "deleted");
- }
- }
- if (messageUpdatesInBatch)
- {
- int[] rowsMessage = psUpdateMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
- }
- }
+ rowsReference = psUpdateMessage.executeBatch();
+ if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rowsReference, "updated"); }
+
+ rowsReference = psDeleteMessage.executeBatch();
+
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rowsReference, "deleted"); }
+
psDeleteReference.close();
psDeleteReference = null;
psDeleteMessage.close();
@@ -1036,12 +977,6 @@
}
finally
{
- if (wrap.isFailed())
- {
- //Reverse the decs
- this.incPersistentCounts(removesToReverse);
- }
-
//And then release locks
this.releaseLocks(references);
}
@@ -1050,10 +985,7 @@
public List getPagedReferenceInfos(long channelID, long orderStart, long number) throws Exception
{
- if (trace)
- {
- log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number);
- }
+ if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number); }
List refs = new ArrayList();
@@ -1152,14 +1084,10 @@
*/
public InitialLoadInfo getInitialReferenceInfos(long channelID, int fullSize) throws Exception
{
- if (trace)
- {
- log.trace("loading initial reference infos for channel " + channelID);
- }
+ if (trace) { log.trace("loading initial reference infos for channel " + channelID); }
List refs = new ArrayList();
- List extraRefs = new ArrayList();
-
+
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
@@ -1272,10 +1200,7 @@
PreparedStatement psUpdateReference = null;
TransactionWrapper wrap = new TransactionWrapper();
- if (trace)
- {
- log.trace("Updating page order for channel:" + channelID);
- }
+ if (trace) { log.trace("Updating page order for channel:" + channelID); }
try
{
@@ -1311,10 +1236,7 @@
{
int rows = psUpdateReference.executeUpdate();
- if (trace)
- {
- log.trace("Updated " + rows + " rows");
- }
+ if (trace) { log.trace("Updated " + rows + " rows"); }
psUpdateReference.close();
psUpdateReference = null;
@@ -1325,10 +1247,7 @@
{
int[] rowsReference = psUpdateReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated"); }
psUpdateReference.close();
psUpdateReference = null;
@@ -1382,6 +1301,8 @@
{
//No tx so add the ref directly in the db
+ log.info("Adding ref: " + ref);
+
TransactionWrapper wrap = new TransactionWrapper();
PreparedStatement psReference = null;
@@ -1390,9 +1311,7 @@
Connection conn = ds.getConnection();
Message m = ref.getMessage();
-
- boolean incremented = false;
-
+
try
{
// Get lock on message
@@ -1405,34 +1324,35 @@
int rows = psReference.executeUpdate();
- if (trace)
- {
- log.trace("Inserted " + rows + " rows");
- }
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
- m.incPersistentChannelCount();
- incremented = true;
-
- if (m.getPersistentChannelCount() == 1)
+ log.info("Inserted " + rows + " rows");
+
+ if (!m.isPersisted())
{
+ log.info(m + " Not persisted so inserting message");
// First time so persist the message
psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- storeMessage(m, psMessage);
+ storeMessage(m, psMessage);
+
+ m.setPersisted(true);
}
else
{
+ log.info("Already persisted so updating message");
+
//Update the message's channel count
- psMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
- updateMessageChannelCount(m, psMessage);
+ incrementChannelCount(m, psMessage);
}
rows = psMessage.executeUpdate();
- if (trace)
- {
- log.trace("Inserted/updated " + rows + " rows");
- }
+
+ if (trace) { log.trace("Inserted/updated " + rows + " rows"); }
+
+ log.trace("message Inserted/updated " + rows + " rows");
}
catch (Exception e)
{
@@ -1477,12 +1397,6 @@
}
finally
{
- if (wrap.isFailed() && incremented)
- {
- //reverse the inc
- m.decPersistentChannelCount();
- }
-
//Release Lock
LockMap.instance.releaseLock(m);
}
@@ -1507,14 +1421,13 @@
TransactionWrapper wrap = new TransactionWrapper();
PreparedStatement psReference = null;
+ PreparedStatement psUpdate = null;
PreparedStatement psMessage = null;
Connection conn = ds.getConnection();
Message m = ref.getMessage();
- boolean decremented = false;
-
try
{
//get lock on message
@@ -1522,6 +1435,8 @@
psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
+ log.info("Removing ref " + ref + " from channel " + channelID);
+
//Remove the message reference
removeReference(channelID, ref, psReference);
@@ -1532,40 +1447,27 @@
throw new IllegalStateException("Failed to remove row for: " + ref);
}
- if (trace)
- {
- log.trace("Deleted " + rows + " rows");
- }
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
- m.decPersistentChannelCount();
- decremented = true;
-
- if (m.getPersistentChannelCount() == 0)
- {
- //No other channels have a reference so we can delete the message
- psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-
- removeMessage(m, psMessage);
- }
- else
- {
- //Other channel(s) still have hold references so update the channel count
- psMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
-
- updateMessageChannelCount(m, psMessage);
- }
-
- rows = psMessage.executeUpdate();
+ //Update the messages channel count
- if (rows != 1)
- {
- throw new IllegalStateException("Failed to update/delete message row for ref " + ref);
- }
+ psUpdate = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
- if (trace)
- {
- log.trace("Delete/updated " + rows + " rows");
- }
+ decrementChannelCount(m, psUpdate);
+
+ rows = psUpdate.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
+
+ //Delete the message (if necessary)
+
+ psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+
+ removeMessage(m, psMessage);
+
+ rows = psMessage.executeUpdate();
+
+ if (trace) { log.trace("Delete " + rows + " rows"); }
}
catch (Exception e)
{
@@ -1584,6 +1486,16 @@
{
}
}
+ if (psUpdate != null)
+ {
+ try
+ {
+ psUpdate.close();
+ }
+ catch (Throwable t)
+ {
+ }
+ }
if (psMessage != null)
{
try
@@ -1610,11 +1522,6 @@
}
finally
{
- if (wrap.isFailed() && decremented)
- {
- //Reverse decrement
- m.incPersistentChannelCount();
- }
//release the lock
LockMap.instance.releaseLock(m);
}
@@ -1797,6 +1704,8 @@
protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
throws Exception
{
+ log.info("Before commit 1pc: I have " + refsToAdd.size() + " refs to add and " + refsToRemove.size() + " refs to remove");
+
//TODO - A slight optimisation - it's possible we have refs referring to the same message
//so we will end up acquiring the lock more than once which is unnecessary
//If find unique set of messages can avoid this
@@ -1825,13 +1734,11 @@
Connection conn = null;
PreparedStatement psReference = null;
PreparedStatement psInsertMessage = null;
- PreparedStatement psUpdateMessage = null;
+ PreparedStatement psIncMessage = null;
+ PreparedStatement psDecMessage = null;
PreparedStatement psDeleteMessage = null;
TransactionWrapper wrap = new TransactionWrapper();
- List addsToReverse = new ArrayList();
- List removesToReverse = new ArrayList();
-
try
{
conn = ds.getConnection();
@@ -1852,7 +1759,7 @@
{
psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psIncMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
}
while (iter.hasNext())
@@ -1867,6 +1774,7 @@
}
//Now store the reference
+ log.info("Adding ref: " + ref + " in channel: " + pair.channelId);
addReference(pair.channelId, ref, psReference, false);
if (batch)
@@ -1877,10 +1785,7 @@
{
int rows = psReference.executeUpdate();
- if (trace)
- {
- log.trace("Inserted " + rows + " rows");
- }
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
psReference.close();
psReference = null;
@@ -1891,25 +1796,26 @@
if (!batch)
{
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psIncMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
}
- m.incPersistentChannelCount();
- addsToReverse.add(ref);
-
boolean added;
- if (m.getPersistentChannelCount() == 1)
+ if (!m.isPersisted())
{
+ log.info("The message " + m.getMessageID() + " hasn't been persisted before so i'm gonna persist it");
//First time so add message
storeMessage(m, psInsertMessage);
-
+
added = true;
+
+ m.setPersisted(true);
}
else
{
+ log.info("The message " + m.getMessageID() + " has been peristed before so just updating it's channel count");
//Update message channel count
- updateMessageChannelCount(m, psUpdateMessage);
-
+ incrementChannelCount(m, psIncMessage);
+
added = false;
}
@@ -1918,11 +1824,13 @@
if (added)
{
psInsertMessage.addBatch();
+
messageInsertsInBatch = true;
}
else
- {
- psUpdateMessage.addBatch();
+ {
+ psIncMessage.addBatch();
+
messageUpdatesInBatch = true;
}
}
@@ -1932,24 +1840,18 @@
{
int rows = psInsertMessage.executeUpdate();
- if (trace)
- {
- log.trace("Inserted " + rows + " rows");
- }
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
}
else
{
- int rows = psUpdateMessage.executeUpdate();
+ int rows = psIncMessage.executeUpdate();
- if (trace)
- {
- log.trace("Updated " + rows + " rows");
- }
+ if (trace) { log.trace("Updated " + rows + " rows"); }
}
psInsertMessage.close();
psInsertMessage = null;
- psUpdateMessage.close();
- psUpdateMessage = null;
+ psIncMessage.close();
+ psIncMessage = null;
}
}
@@ -1957,36 +1859,27 @@
{
int[] rowsReference = psReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
if (messageInsertsInBatch)
{
int[] rowsMessage = psInsertMessage.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
}
if (messageUpdatesInBatch)
{
- int[] rowsMessage = psUpdateMessage.executeBatch();
+ int[] rowsMessage = psIncMessage.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INC_CHANNELCOUNT"), rowsMessage, "updated"); }
}
psReference.close();
psReference = null;
psInsertMessage.close();
psInsertMessage = null;
- psUpdateMessage.close();
- psUpdateMessage = null;
+ psIncMessage.close();
+ psIncMessage = null;
}
//Now the removes
@@ -1994,17 +1887,15 @@
iter = refsToRemove.iterator();
batch = usingBatchUpdates && refsToRemove.size() > 0;
- boolean messageDeletionsInBatch = false;
- messageUpdatesInBatch = false;
+
psReference = null;
- psUpdateMessage = null;
psDeleteMessage = null;
if (batch)
{
psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psDecMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
while (iter.hasNext())
@@ -2026,10 +1917,7 @@
{
int rows = psReference.executeUpdate();
- if (trace)
- {
- log.trace("Deleted " + rows + " rows");
- }
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
psReference.close();
psReference = null;
@@ -2038,106 +1926,62 @@
if (!batch)
{
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psDecMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
Message m = pair.ref.getMessage();
- m.decPersistentChannelCount();
+ //Update the channel count
- removesToReverse.add(pair.ref);
+ decrementChannelCount(m, psDecMessage);
- boolean removed;
+ //Delete the message (if necessary)
- if (m.getPersistentChannelCount() == 0)
+ removeMessage(m, psDeleteMessage);
+
+ if (batch)
{
- //No more refs - message can be deleted
- removeMessage(m, psDeleteMessage);
-
- removed = true;
- }
- else
- {
- //Update channel count for message
- updateMessageChannelCount(m, psUpdateMessage);
+ psDecMessage.addBatch();
- removed = false;
+ psDeleteMessage.addBatch();
}
-
- if (batch)
- {
- if (removed)
- {
- psDeleteMessage.addBatch();
- messageDeletionsInBatch = true;
- }
- else
- {
- psUpdateMessage.addBatch();
- messageUpdatesInBatch = true;
- }
- }
else
{
- if (removed)
- {
- int rows = psDeleteMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("Deleted " + rows + " rows");
- }
- }
- else
- {
- int rows = psUpdateMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("Updated " + rows + " rows");
- }
- }
+ int rows = psDecMessage.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
+
+ rows = psDeleteMessage.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
+
psDeleteMessage.close();
psDeleteMessage = null;
- psUpdateMessage.close();
- psUpdateMessage = null;
+ psDecMessage.close();
+ psDecMessage = null;
}
}
if (batch)
{
- int[] rowsReference = psReference.executeBatch();
+ int[] rows = psReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rows, "deleted"); }
- if (messageDeletionsInBatch)
- {
- int[] rowsMessage = psDeleteMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rowsMessage, "deleted");
- }
- }
- if (messageUpdatesInBatch)
- {
- int[] rowsMessage = psUpdateMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
- }
- }
+ rows = psDecMessage.executeBatch();
+ if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rows, "updated"); }
+
+ rows = psDeleteMessage.executeBatch();
+
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
+
psReference.close();
psReference = null;
psDeleteMessage.close();
psDeleteMessage = null;
- psUpdateMessage.close();
- psUpdateMessage = null;
+ psDecMessage.close();
+ psDecMessage = null;
}
}
catch (Exception e)
@@ -2168,16 +2012,26 @@
{
}
}
- if (psUpdateMessage != null)
+ if (psIncMessage != null)
{
try
{
- psUpdateMessage.close();
+ psIncMessage.close();
}
catch (Throwable t)
{
}
}
+ if (psDecMessage != null)
+ {
+ try
+ {
+ psDecMessage.close();
+ }
+ catch (Throwable t)
+ {
+ }
+ }
if (psDeleteMessage != null)
{
try
@@ -2203,14 +2057,7 @@
wrap.end();
}
finally
- {
- if (wrap.isFailed())
- {
- //Reverse any incs and decs we made
- this.decPersistentCounts(addsToReverse);
- this.incPersistentCounts(removesToReverse);
- }
-
+ {
//Release the locks
this.releaseLocks(allRefs);
}
@@ -2235,8 +2082,6 @@
orderReferences(refs);
- List removesToReverse = new ArrayList();
-
try
{
//get locks on all the refs
@@ -2254,13 +2099,11 @@
commitPreparedTransaction(tx, conn);
boolean batch = usingBatchUpdates && refsToRemove.size() > 0;
- boolean messageDeletionsInBatch = false;
- boolean messageUpdatesInBatch = false;
-
+
if (batch)
{
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
iter = refsToRemove.iterator();
@@ -2273,67 +2116,37 @@
if (!batch)
{
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
Message m = ref.getMessage();
//We may need to remove the message itself
- m.decPersistentChannelCount();
- removesToReverse.add(ref);
+ //Update the channel count
- boolean removed;
- if (m.getPersistentChannelCount() == 0)
- {
- //We can remove the message
- removeMessage(m, psDeleteMessage);
-
- removed = true;
- }
- else
- {
- //Decrement channel count
- updateMessageChannelCount(m, psUpdateMessage);
-
- removed = false;
- }
+ decrementChannelCount(m, psUpdateMessage);
+
+ //Remove the message (if necessary)
+
+ removeMessage(m, psDeleteMessage);
if (batch)
{
- if (removed)
- {
- psDeleteMessage.addBatch();
-
- messageDeletionsInBatch = true;
- }
- else
- {
- psUpdateMessage.addBatch();
-
- messageUpdatesInBatch = true;
- }
+ psUpdateMessage.addBatch();
+
+ psDeleteMessage.addBatch();
}
else
{
- if (removed)
- {
- int rows = psDeleteMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("Deleted " + rows + " rows");
- }
- }
- else
- {
- int rows = psUpdateMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("Updated " + rows + " rows");
- }
- }
+ int rows = psUpdateMessage.executeUpdate();
+
+ if (trace) { log.trace("Updated " + rows + " rows"); }
+
+ rows = psDeleteMessage.executeUpdate();
+
+ if (trace) { log.trace("Deleted " + rows + " rows"); }
+
psDeleteMessage.close();
psDeleteMessage = null;
psUpdateMessage.close();
@@ -2343,30 +2156,19 @@
if (batch)
{
- if (messageDeletionsInBatch)
- {
- int[] rows = psDeleteMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted");
- }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
- if (messageUpdatesInBatch)
- {
- int[] rows = psUpdateMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rows, "updated");
- }
-
- psUpdateMessage.close();
- psUpdateMessage = null;
- }
+ int[] rows = psUpdateMessage.executeBatch();
+
+ if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rows, "updated"); }
+
+ psUpdateMessage.close();
+ psUpdateMessage = null;
+
+ rows = psDeleteMessage.executeBatch();
+
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
+
+ psDeleteMessage.close();
+ psDeleteMessage = null;
}
}
catch (Exception e)
@@ -2412,11 +2214,6 @@
}
finally
{
- if (wrap.isFailed())
- {
- //reverse any decs
- this.incPersistentCounts(removesToReverse);
- }
//release the locks
this.releaseLocks(refs);
}
@@ -2438,8 +2235,6 @@
orderReferences(refs);
- List addsToReverse = new ArrayList();
-
//We insert a tx record and
//a row for each ref with +
//and update the row for each delivery with "-"
@@ -2472,7 +2267,7 @@
{
psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
}
while (iter.hasNext())
@@ -2494,10 +2289,7 @@
{
int rows = psReference.executeUpdate();
- if (trace)
- {
- log.trace("Inserted " + rows + " rows");
- }
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
psReference.close();
psReference = null;
@@ -2506,16 +2298,14 @@
if (!batch)
{
psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
}
Message m = pair.ref.getMessage();
+
+ boolean added;
- m.incPersistentChannelCount();
- addsToReverse.add(pair.ref);
-
- boolean added;
- if (m.getPersistentChannelCount() == 1)
+ if (!m.isPersisted())
{
//First time so persist the message
storeMessage(m, psInsertMessage);
@@ -2525,7 +2315,7 @@
else
{
//Update message channel count
- updateMessageChannelCount(m, psUpdateMessage);
+ incrementChannelCount(m, psUpdateMessage);
added = false;
}
@@ -2549,19 +2339,13 @@
{
int rows = psInsertMessage.executeUpdate();
- if (trace)
- {
- log.trace("Inserted " + rows + " rows");
- }
+ if (trace) { log.trace("Inserted " + rows + " rows"); }
}
else
{
int rows = psUpdateMessage.executeUpdate();
- if (trace)
- {
- log.trace("Updated " + rows + " rows");
- }
+ if (trace) { log.trace("Updated " + rows + " rows"); }
}
psInsertMessage.close();
psInsertMessage = null;
@@ -2574,28 +2358,19 @@
{
int[] rowsReference = psReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
if (messageInsertsInBatch)
{
int[] rowsMessage = psInsertMessage.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
}
if (messageUpdatesInBatch)
{
int[] rowsMessage = psUpdateMessage.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("INC_CHANNELCOUNT"), rowsMessage, "updated"); }
}
psReference.close();
@@ -2615,7 +2390,7 @@
{
psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
}
-
+
while (iter.hasNext())
{
ChannelRefPair pair = (ChannelRefPair) iter.next();
@@ -2635,10 +2410,7 @@
{
int rows = psReference.executeUpdate();
- if (trace)
- {
- log.trace("updated " + rows + " rows");
- }
+ if (trace) { log.trace("updated " + rows + " rows"); }
psReference.close();
psReference = null;
@@ -2649,10 +2421,7 @@
{
int[] rows = psReference.executeBatch();
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_REF"), rows, "updated");
- }
+ if (trace) { logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_REF"), rows, "updated"); }
psReference.close();
psReference = null;
@@ -2711,12 +2480,6 @@
}
finally
{
- if (wrap.isFailed())
- {
- //reverse any incs
- this.decPersistentCounts(addsToReverse);
- }
-
//release the locks
this.releaseLocks(refs);
@@ -2746,8 +2509,6 @@
orderReferences(refs);
- List removesToReverse = new ArrayList();
-
try
{
this.getLocks(refs);
@@ -2759,12 +2520,11 @@
iter = refsToAdd.iterator();
boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
- boolean messageDeletionsInBatch = false;
- boolean messageUpdatesInBatch = false;
+
if (batch)
{
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
while (iter.hasNext())
@@ -2774,65 +2534,37 @@
if (!batch)
{
psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
- psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+ psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
}
Message m = pair.ref.getMessage();
//We may need to remove the message for messages added during the prepare stage
- m.decPersistentChannelCount();
- removesToReverse.add(pair.ref);
+ //update the channel count
- boolean removed;
- if (m.getPersistentChannelCount() == 0)
+ decrementChannelCount(m, psUpdateMessage);
+
+ //remove the message (if necessary)
+
+ removeMessage(m, psDeleteMessage);
+
+ if (batch)
{
- //remove message
- removeMessage(m, psDeleteMessage);
+ psUpdateMessage.addBatch();
- removed = true;
+ psDeleteMessage.addBatch();
}
else
{
- //update message channel count
- updateMessageChannelCount(m, psUpdateMessage);
+ int rows = psUpdateMessage.executeUpdate();
- removed = false;
- }
-
- if (batch)
- {
- if (removed)
- {
- psDeleteMessage.addBatch();
- messageDeletionsInBatch = true;
- }
- else
- {
- psUpdateMessage.addBatch();
- messageUpdatesInBatch = true;
- }
- }
- else
- {
- if (removed)
- {
- int rows = psDeleteMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("deleted " + rows + " rows");
- }
- }
- else
- {
- int rows = psUpdateMessage.executeUpdate();
-
- if (trace)
- {
- log.trace("updated " + rows + " rows");
- }
- }
+ if (trace) { log.trace("updated " + rows + " rows"); }
+
+ rows = psDeleteMessage.executeUpdate();
+
+ if (trace) { log.trace("deleted " + rows + " rows"); }
+
psDeleteMessage.close();
psDeleteMessage = null;
psUpdateMessage.close();
@@ -2842,30 +2574,19 @@
if (batch)
{
- if (messageDeletionsInBatch)
- {
- int[] rows = psDeleteMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted");
- }
-
- psDeleteMessage.close();
- psDeleteMessage = null;
- }
- if (messageUpdatesInBatch)
- {
- int[] rows = psUpdateMessage.executeBatch();
-
- if (trace)
- {
- logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rows, "updated");
- }
-
- psUpdateMessage.close();
- psUpdateMessage = null;
- }
+ int[] rows = psUpdateMessage.executeBatch();
+
+ if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rows, "updated"); }
+
+ rows = psDeleteMessage.executeBatch();
+
+ if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
+
+ psDeleteMessage.close();
+ psDeleteMessage = null;
+
+ psUpdateMessage.close();
+ psUpdateMessage = null;
}
}
catch (Exception e)
@@ -2911,11 +2632,6 @@
}
finally
{
- if (wrap.isFailed())
- {
- //reverse any removes
- this.incPersistentCounts(removesToReverse);
- }
//release locks
this.releaseLocks(refs);
}
@@ -3216,12 +2932,18 @@
return map;
}
- protected void updateMessageChannelCount(Message m, PreparedStatement ps) throws Exception
+
+ //TODO - combine these
+ protected void incrementChannelCount(Message m, PreparedStatement ps) throws Exception
{
- ps.setInt(1, m.getPersistentChannelCount());
- ps.setLong(2, m.getMessageID());
+ ps.setLong(1, m.getMessageID());
}
+ protected void decrementChannelCount(Message m, PreparedStatement ps) throws Exception
+ {
+ ps.setLong(1, m.getMessageID());
+ }
+
/**
* Stores the message in the MESSAGE table.
*/
@@ -3443,47 +3165,6 @@
}
}
- protected void incPersistentCounts(List refs)
- {
- Iterator iter = refs.iterator();
-
- while (iter.hasNext())
- {
- Object obj = iter.next();
- MessageReference ref;
- if (obj instanceof MessageReference)
- {
- ref = (MessageReference)obj;
- }
- else
- {
- ref = ((ChannelRefPair)obj).ref;
- }
- ref.getMessage().incPersistentChannelCount();
- }
- }
-
- protected void decPersistentCounts(List refs)
- {
- Iterator iter = refs.iterator();
-
- while (iter.hasNext())
- {
- Object obj = iter.next();
- MessageReference ref;
- if (obj instanceof MessageReference)
- {
- ref = (MessageReference)obj;
- }
- else
- {
- ref = ((ChannelRefPair)obj).ref;
- }
- ref.getMessage().decPersistentChannelCount();
- }
-
- }
-
protected void logBatchUpdate(String name, int[] rows, String action)
{
int count = 0;
@@ -3564,9 +3245,11 @@
"TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, " +
"CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" );
- map.put("UPDATE_MESSAGE_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?");
- map.put("DELETE_MESSAGE", "DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?");
+ map.put("INC_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?");
+ map.put("DEC_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?");
+ map.put("DELETE_MESSAGE", "DELETE FROM JMS_MESSAGE WHERE MESSAGEID=? AND CHANNELCOUNT = 0");
map.put("MESSAGEID_COLUMN", "MESSAGEID");
+ map.put("MESSAGE_EXISTS", "SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ?");
//Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -47,9 +47,9 @@
// Paging functionality - TODO we should split this out into its own interface
- void addReferences(long channelID, List references, boolean paged) throws Exception;
+ void pageReferences(long channelID, List references, boolean paged) throws Exception;
- void removeReferences(long channelID, List refs) throws Exception;
+ void removeDepagedReferences(long channelID, List refs) throws Exception;
void updatePageOrder(long channelID, List references) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -608,6 +608,12 @@
MessageReference ref = null;
try
{
+ if (message.isReliable())
+ {
+ // It will already have been persisted on the sender's side
+ message.setPersisted(true);
+ }
+
ref = ms.reference(message);
// We route on the condition
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -39,7 +39,6 @@
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.StreamUtils;
-import org.jgroups.Address;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
@@ -325,6 +324,12 @@
{
org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
+ if (msg.isReliable())
+ {
+ //It will alerady have been persisted on the other node
+ msg.setPersisted(true);
+ }
+
MessageReference ref = null;
try
@@ -346,10 +351,11 @@
ref.releaseMemoryReference();
}
}
+
+ //Acknowledge on the remote queue stub
+ Delivery del = new SimpleDelivery(theQueue, ref);
- Delivery del = new SimpleDelivery(this, ref);
-
- acknowledgeInternal(del, tx, true, true);
+ del.acknowledge(tx);
}
tx.commit();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -116,6 +116,7 @@
//If the message is persistent and we are recoverable then we persist here, *before*
//the message is sent across the network
+ //This will increment any channelcount on the message in storage
pm.addReference(id, reference, tx);
}
catch (Exception e)
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/build.xml 2006-09-28 11:44:25 UTC (rev 1372)
@@ -609,7 +609,7 @@
<antcall target="crash-test">
<param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashTwoConnectionsTest"/>
</antcall>
-
+
<antcall target="start-rmi-server"/>
<antcall target="crash-test">
Modified: trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -56,8 +56,8 @@
protected void setUp() throws Exception
{
- rs = (MessageSupport)MessageFactory.createJBossMessage(0, false, 0, 0, (byte)4, null,
- null, 0, JBossMessage.TYPE,
+ rs = (MessageSupport)MessageFactory.createMessage(0, false, 0, 0, (byte)4, null,
+ null, JBossMessage.TYPE,
null, null, null, null, null, null);
super.setUp();
log.debug("setup done");
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -291,8 +291,10 @@
//Now consume them all
+ log.info("Consuming them all from 1");
this.consume(queue1, 0, refs1, 150);
+ log.info("Consuming them all from 2");
this.consume(queue2, 0, refs2, 150);
// Queue1
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -335,7 +335,7 @@
refs.add(ref9);
refs.add(ref10);
- pm.addReferences(channel1.getChannelID(), refs, false);
+ pm.pageReferences(channel1.getChannelID(), refs, false);
refs = new ArrayList();
refs.add(ref11);
@@ -344,7 +344,7 @@
refs.add(ref14);
refs.add(ref15);
- pm.addReferences(channel2.getChannelID(), refs, false);
+ pm.pageReferences(channel2.getChannelID(), refs, false);
List refIds = getReferenceIds(channel1.getChannelID());
assertNotNull(refIds);
@@ -405,7 +405,7 @@
refs.add(ref13);
refs.add(ref14);
refs.add(ref15);
- pm.removeReferences(channel2.getChannelID(), refs);
+ pm.removeDepagedReferences(channel2.getChannelID(), refs);
refIds = getReferenceIds(channel2.getChannelID());
assertNotNull(refIds);
@@ -433,7 +433,7 @@
refs.add(ref1);
refs.add(ref2);
refs.add(ref3);
- pm.removeReferences(channel1.getChannelID(), refs);
+ pm.removeDepagedReferences(channel1.getChannelID(), refs);
refIds = getReferenceIds(channel1.getChannelID());
assertNotNull(refIds);
@@ -463,7 +463,7 @@
refs = new ArrayList();
refs.add(ref11);
- pm.removeReferences(channel2.getChannelID(), refs);
+ pm.removeDepagedReferences(channel2.getChannelID(), refs);
refs = new ArrayList();
refs.add(ref4);
@@ -473,7 +473,7 @@
refs.add(ref8);
refs.add(ref9);
refs.add(ref10);
- pm.removeReferences(channel1.getChannelID(), refs);
+ pm.removeDepagedReferences(channel1.getChannelID(), refs);
ms = getMessageIds();
assertNotNull(ms);
@@ -512,7 +512,7 @@
refs.add(ref9);
refs.add(ref10);
- pm.addReferences(channel.getChannelID(), refs, false);
+ pm.pageReferences(channel.getChannelID(), refs, false);
ref1.setPagingOrder(0);
ref2.setPagingOrder(1);
@@ -680,7 +680,7 @@
refs.add(ref9);
refs.add(ref10);
- pm.addReferences(channel.getChannelID(), refs, false);
+ pm.pageReferences(channel.getChannelID(), refs, false);
//First load exactly 10
PersistenceManager.InitialLoadInfo info = pm.getInitialReferenceInfos(channel.getChannelID(), 10);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -71,7 +71,6 @@
i,
coreHeaders,
null,
- 0,
i % 2 == 0 ? new GUID().toString() : null,
genCorrelationID(i),
i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -72,7 +72,6 @@
i,
coreHeaders,
null,
- 0,
i % 2 == 0 ? new GUID().toString() : null,
genCorrelationID(i),
i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -130,7 +130,6 @@
i,
coreHeaders,
null,
- 0,
i % 2 == 0 ? new GUID().toString() : null,
genCorrelationID(i),
i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -71,7 +71,6 @@
i,
coreHeaders,
null,
- 0,
i % 2 == 0 ? new GUID().toString() : null,
genCorrelationID(i),
i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -72,7 +72,6 @@
i,
coreHeaders,
null,
- 0,
i % 2 == 0 ? new GUID().toString() : null,
genCorrelationID(i),
i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -71,7 +71,6 @@
i,
coreHeaders,
null,
- 0,
i % 2 == 0 ? new GUID().toString() : null,
genCorrelationID(i),
i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -233,6 +233,9 @@
String databaseType = sc.getDatabaseType();
String persistenceConfigFile =
"server/default/deploy/" + databaseType + "-persistence-service.xml";
+
+ log.info("********* LOADING CONFIG FILE: " + persistenceConfigFile);
+
URL persistenceConfigFileURL = getClass().getClassLoader().getResource(persistenceConfigFile);
if (persistenceConfigFileURL == null)
{
Modified: trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java 2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java 2006-09-28 11:44:25 UTC (rev 1372)
@@ -41,14 +41,14 @@
public static CoreMessage createCoreMessage(long messageID)
{
- return createCoreMessage(messageID, false, 0, 0, (byte)4, null, null, 0);
+ return createCoreMessage(messageID, false, 0, 0, (byte)4, null, null);
}
public static CoreMessage createCoreMessage(long messageID,
boolean reliable,
Serializable payload)
{
- return createCoreMessage(messageID, reliable, 0, 0, (byte)4, null, payload, 0);
+ return createCoreMessage(messageID, reliable, 0, 0, (byte)4, null, payload);
}
public static CoreMessage createCoreMessage(long messageID,
@@ -57,27 +57,14 @@
long timestamp,
byte priority,
Map coreHeaders,
- Serializable payload,
- int persistentChannelCount)
+ Serializable payload)
{
CoreMessage cm =
- new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, null, persistentChannelCount);
+ new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, null);
cm.setPayload(payload);
return cm;
}
-
- public static CoreMessage createCoreMessage(long messageID,
- boolean reliable,
- long expiration,
- long timestamp,
- byte priority,
- Map coreHeaders,
- Serializable payload)
- {
- return createCoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, payload, 0);
- }
-
-
+
// Attributes ----------------------------------------------------
// Constructors --------------------------------------------------
More information about the jboss-cvs-commits
mailing list