[jboss-cvs] JBoss Messaging SVN: r1372 - in trunk:	src/etc/server/default/deploy src/main/org/jboss/jms/message	src/main/org/jboss/jms/server src/main/org/jboss/messaging/core	src/main/org/jboss/messaging/core/message	src/main/org/jboss/messaging/core/plugin	src/main/org/jboss/messaging/core/plugin/contract	src/main/org/jboss/messaging/core/plugin/postoffice/cluster	tests tests/src/org/jboss/test/messaging/core/message	tests/src/org/jboss/test/messaging/core/paging	tests/src/org/jboss/test/messaging/core/plugin	tests/src/org/jboss/test/messaging/jms/persistence	tests/src/org/jboss/test/messaging/tools/jmx/rmi	tests/src/org/jboss/test/messaging/util
    jboss-cvs-commits at lists.jboss.org 
    jboss-cvs-commits at lists.jboss.org
       
    Thu Sep 28 07:44:45 EDT 2006
    
    
  
Author: timfox
Date: 2006-09-28 07:44:25 -0400 (Thu, 28 Sep 2006)
New Revision: 1372
Modified:
   trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
   trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
   trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
   trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java
   trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
   trunk/src/main/org/jboss/jms/message/JBossMessage.java
   trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java
   trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
   trunk/src/main/org/jboss/jms/message/JBossTextMessage.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/messaging/core/Message.java
   trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java
   trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java
   trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
   trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
Log:
Fixed some issues with reference counting
Modified: trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/etc/server/default/deploy/clustered-mysql-persistence-service.xml	2006-09-28 11:44:25 UTC (rev 1372)
@@ -38,12 +38,14 @@
 LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
 UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
 SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
-SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
+SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-UPDATE_MESSAGE_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?
+INC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?
+DEC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?
 DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
 MESSAGEID_COLUMN=MESSAGEID
+MESSAGE_EXISTS=SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ? FOR UPDATE
 INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
 DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
 SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-28 11:44:25 UTC (rev 1372)
@@ -10,55 +10,55 @@
 
    <mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManagerService"
       name="jboss.messaging:service=PersistenceManager"
-      xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">      
+      xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
-      <attribute name="UsingBatchUpdates">true</attribute>            
+      <attribute name="UsingBatchUpdates">true</attribute>
       <attribute name="MaxParams">500</attribute>
    </mbean>
-       
-   <!-- Note that Hypersonic CANNOT be used for clustered post offices -->    
-       
+
+   <!-- Note that Hypersonic CANNOT be used for clustered post offices -->
+
    <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
       name="jboss.messaging:service=QueuePostOffice"
-      xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml"> 
-      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends> 
+      xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml">
+      <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="PostOfficeName">Queue</attribute>
       <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>     
+      <attribute name="CreateTablesOnStartup">true</attribute>
    </mbean>
-   
+
    <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
       name="jboss.messaging:service=TopicPostOffice"
-      xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml"> 
+      xmbean-dd="xmdesc/DefaultPostOffice-xmbean.xml">
       <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="PostOfficeName">Topic</attribute>
       <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>           
-   </mbean> 
-   
+      <attribute name="CreateTablesOnStartup">true</attribute>
+   </mbean>
+
    <mbean code="org.jboss.jms.server.plugin.JDBCJMSUserManagerService"
       name="jboss.messaging:service=JMSUserManager"
       xmbean-dd="xmdesc/JMSUserManager-xmbean.xml">
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>            
-   </mbean>    
-   
-	<mbean code="org.jboss.messaging.core.plugin.JDBCShutdownLoggerService"
+      <attribute name="CreateTablesOnStartup">true</attribute>
+   </mbean>
+
+   <mbean code="org.jboss.messaging.core.plugin.JDBCShutdownLoggerService"
       name="jboss.messaging:service=ShutdownLogger"
       xmbean-dd="xmdesc/JDBCShutdownLogger-xmbean.xml">
       <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
       <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
       <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>      
-   </mbean>  
-   
+      <attribute name="CreateTablesOnStartup">true</attribute>
+   </mbean>
+
 </server>
\ No newline at end of file
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-09-28 11:44:25 UTC (rev 1372)
@@ -9,50 +9,52 @@
 <server>
 
    <mbean code="org.jboss.messaging.core.plugin.JDBCPersistenceManagerService"
-      name="jboss.messaging:service=PersistenceManager"
-      xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
-      <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
-      <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
-      <attribute name="DataSource">java:/DefaultDS</attribute>
-      <attribute name="CreateTablesOnStartup">true</attribute>
-      <attribute name="UsingBatchUpdates">true</attribute>
-      <attribute name="SqlProperties"><![CDATA[
-CREATE_MESSAGE_REFERENCE=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))
-CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)
-CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)
-CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (PAGE_ORD)
-CREATE_IDX_MESSAGE_REF_MESSAGEID=CREATE INDEX JMS_MESSAGE_REF_MESSAGEID ON JMS_MESSAGE_REFERENCE (MESSAGEID)
-CREATE_IDX_MESSAGE_REF_RELIABLE=CREATE INDEX JMS_MESSAGE_REF_RELIABLE ON JMS_MESSAGE_REFERENCE (RELIABLE)
-CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION VARCHAR(255), REPLYTO VARCHAR(255), JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
-CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
-CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
-INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
-DELETE_MESSAGE_REF=DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
-UPDATE_MESSAGE_REF=UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
-UPDATE_PAGE_ORDER=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = ? WHERE MESSAGEID=? AND CHANNELID=?
-COMMIT_MESSAGE_REF1=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'
-COMMIT_MESSAGE_REF2=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='-'
-ROLLBACK_MESSAGE_REF1=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='+'
-ROLLBACK_MESSAGE_REF2=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='-'
-LOAD_PAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, PAGE_ORD, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
-LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
-UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
-SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
-SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?");
-LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
-INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-UPDATE_MESSAGE_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?
-DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
-MESSAGEID_COLUMN=MESSAGEID
-INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
-DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
-SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
-UPDATE_COUNTER=UPDATE JMS_COUNTER SET NEXT_ID = ? WHERE NAME=?
-SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=? FOR UPDATE
-INSERT_COUNTER=INSERT INTO JMS_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
-SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE
-      ]]></attribute>
-      <attribute name="MaxParams">500</attribute>
+         name="jboss.messaging:service=PersistenceManager"
+         xmbean-dd="xmdesc/JDBCPersistenceManager-xmbean.xml">
+         <depends>jboss.jca:service=DataSourceBinding,name=DefaultDS</depends>
+         <depends optional-attribute-name="TransactionManager">jboss:service=TransactionManager</depends>
+         <attribute name="DataSource">java:/DefaultDS</attribute>
+         <attribute name="CreateTablesOnStartup">true</attribute>
+         <attribute name="UsingBatchUpdates">true</attribute>
+         <attribute name="SqlProperties"><![CDATA[
+   CREATE_MESSAGE_REFERENCE=CREATE TABLE JMS_MESSAGE_REFERENCE (CHANNELID BIGINT, MESSAGEID BIGINT, TRANSACTIONID BIGINT, STATE CHAR(1), ORD BIGINT, PAGE_ORD BIGINT, DELIVERYCOUNT INTEGER, RELIABLE CHAR(1), LOADED CHAR(1), PRIMARY KEY(CHANNELID, MESSAGEID))
+   CREATE_IDX_MESSAGE_REF_TX=CREATE INDEX JMS_MESSAGE_REF_TX ON JMS_MESSAGE_REFERENCE (TRANSACTIONID)
+   CREATE_IDX_MESSAGE_REF_ORD=CREATE INDEX JMS_MESSAGE_REF_ORD ON JMS_MESSAGE_REFERENCE (ORD)
+   CREATE_IDX_MESSAGE_REF_PAGE_ORD=CREATE INDEX JMS_MESSAGE_REF_LOADED ON JMS_MESSAGE_REFERENCE (PAGE_ORD)
+   CREATE_IDX_MESSAGE_REF_MESSAGEID=CREATE INDEX JMS_MESSAGE_REF_MESSAGEID ON JMS_MESSAGE_REFERENCE (MESSAGEID)
+   CREATE_IDX_MESSAGE_REF_RELIABLE=CREATE INDEX JMS_MESSAGE_REF_RELIABLE ON JMS_MESSAGE_REFERENCE (RELIABLE)
+   CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION VARCHAR(255), REPLYTO VARCHAR(255), JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
+   CREATE_TRANSACTION=CREATE TABLE JMS_TRANSACTION (TRANSACTIONID BIGINT, BRANCH_QUAL VARBINARY(254), FORMAT_ID INTEGER, GLOBAL_TXID VARBINARY(254), PRIMARY KEY (TRANSACTIONID))
+   CREATE_COUNTER=CREATE TABLE JMS_COUNTER (NAME VARCHAR(255), NEXT_ID BIGINT, PRIMARY KEY(NAME))
+   INSERT_MESSAGE_REF=INSERT INTO JMS_MESSAGE_REFERENCE (CHANNELID, MESSAGEID, TRANSACTIONID, STATE, ORD, PAGE_ORD, DELIVERYCOUNT, RELIABLE) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+   DELETE_MESSAGE_REF=DELETE FROM JMS_MESSAGE_REFERENCE WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
+   UPDATE_MESSAGE_REF=UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'
+   UPDATE_PAGE_ORDER=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = ? WHERE MESSAGEID=? AND CHANNELID=?
+   COMMIT_MESSAGE_REF1=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'
+   COMMIT_MESSAGE_REF2=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='-'
+   ROLLBACK_MESSAGE_REF1=DELETE FROM JMS_MESSAGE_REFERENCE WHERE TRANSACTIONID=? AND STATE='+'
+   ROLLBACK_MESSAGE_REF2=UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='-'
+   LOAD_PAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, PAGE_ORD, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND PAGE_ORD BETWEEN ? AND ? ORDER BY PAGE_ORD
+   LOAD_UNPAGED_REFS=SELECT MESSAGEID, DELIVERYCOUNT, RELIABLE FROM JMS_MESSAGE_REFERENCE WHERE PAGE_ORD IS NULL and CHANNELID = ? ORDER BY ORD
+   UPDATE_RELIABLE_REFS_NOT_PAGED=UPDATE JMS_MESSAGE_REFERENCE SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNELID=?
+   SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ?
+   SELECT_EXISTS_REF=SELECT MESSAGEID FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID = ? AND MESSAGEID = ?
+   LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES FROM JMS_MESSAGE
+   INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+   INC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?
+   DEC_CHANNELCOUNT=UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?
+   DELETE_MESSAGE=DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?
+   MESSAGEID_COLUMN=MESSAGEID
+   MESSAGE_EXISTS=SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ? FOR UPDATE
+   INSERT_TRANSACTION=INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) VALUES(?, ?, ?, ?)
+   DELETE_TRANSACTION=DELETE FROM JMS_TRANSACTION WHERE TRANSACTIONID = ?
+   SELECT_PREPARED_TRANSACTIONS=SELECT TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID FROM JMS_TRANSACTION
+   UPDATE_COUNTER=UPDATE JMS_COUNTER SET NEXT_ID = ? WHERE NAME=?
+   SELECT_COUNTER=SELECT NEXT_ID FROM JMS_COUNTER WHERE NAME=? FOR UPDATE
+   INSERT_COUNTER=INSERT INTO JMS_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
+   SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNELID) FROM JMS_MESSAGE_REFERENCE
+         ]]></attribute>
+         <attribute name="MaxParams">500</attribute>
    </mbean>
 
    <mbean code="org.jboss.messaging.core.plugin.DefaultPostOfficeService"
@@ -65,10 +67,10 @@
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
       ]]></attribute>
    </mbean>
 
@@ -82,10 +84,10 @@
       <attribute name="DataSource">java:/DefaultDS</attribute>
       <attribute name="CreateTablesOnStartup">true</attribute>
       <attribute name="SqlProperties"><![CDATA[
-CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), CONDITION VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
-INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
+CREATE_POSTOFFICE_TABLE=CREATE TABLE JMS_POSTOFFICE (POSTOFFICE_NAME VARCHAR(255), NODE_ID VARCHAR(255), QUEUE_NAME VARCHAR(1023), COND VARCHAR(1023), SELECTOR VARCHAR(1023), CHANNEL_ID BIGINT)
+INSERT_BINDING=INSERT INTO JMS_POSTOFFICE (POSTOFFICE_NAME, NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID) VALUES (?, ?, ?, ?, ?, ?)
 DELETE_BINDING=DELETE FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME=? AND NODE_ID=? AND QUEUE_NAME=?
-LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, CONDITION, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
+LOAD_BINDINGS=SELECT NODE_ID, QUEUE_NAME, COND, SELECTOR, CHANNEL_ID FROM JMS_POSTOFFICE WHERE POSTOFFICE_NAME  = ?
       ]]></attribute>
    </mbean>
 
Modified: trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossBytesMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -98,7 +98,6 @@
                             byte priority,
                             Map coreHeaders,
                             byte[] payloadAsByteArray,
-                            int persistentChannelCount,
                             String jmsType,
                             String correlationID,
                             byte[] correlationIDBytes,
@@ -107,7 +106,7 @@
                             HashMap jmsProperties)
    {
       super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
-            persistentChannelCount,jmsType, correlationID, correlationIDBytes, destination, replyTo,
+            jmsType, correlationID, correlationIDBytes, destination, replyTo,
             jmsProperties);
       
       baos = new ByteArrayOutputStream();
Modified: trunk/src/main/org/jboss/jms/message/JBossMapMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMapMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossMapMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -88,7 +88,6 @@
          byte priority,
          Map coreHeaders,
          byte[] payloadAsByteArray,
-         int persistentChannelCount,
          String jmsType,
          String correlationID,
          byte[] correlationIDBytes,
@@ -97,7 +96,6 @@
          HashMap jmsProperties)
    {
       super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
-            persistentChannelCount,
             jmsType, correlationID, correlationIDBytes, destination, replyTo, 
             jmsProperties);
    }
Modified: trunk/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -243,12 +243,9 @@
    public JBossMessage(long messageID)
    {
       this(messageID, true, 0, System.currentTimeMillis(), (byte)4,
-           null, null, 0, null, null, null, null, null, null);
+           null, null, null, null, null, null, null, null);
    }
 
-   /*
-    * This constructor is used to construct messages when retrieved from persistence storage
-    */
    public JBossMessage(long messageID,
                        boolean reliable,
                        long expiration,
@@ -256,7 +253,6 @@
                        byte priority,    
                        Map coreHeaders,
                        byte[] payloadAsByteArray,
-                       int persistentChannelCount,
                        String jmsType,
                        String correlationID,
                        byte[] correlationIDBytes,
@@ -271,8 +267,7 @@
             priority,
             0,
             coreHeaders,
-            payloadAsByteArray,
-            persistentChannelCount);
+            payloadAsByteArray);
 
       this.jmsType = jmsType;      
       this.correlationID = correlationID;
Modified: trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossObjectMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -77,7 +77,6 @@
          byte priority,
          Map coreHeaders,
          byte[] payloadAsByteArray,         
-         int persistentChannelCount,
          String jmsType,
          String correlationID,
          byte[] correlationIDBytes,
@@ -86,7 +85,6 @@
          HashMap jmsProperties)
    {
       super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
-            persistentChannelCount,
             jmsType, correlationID, correlationIDBytes, destination, replyTo, 
             jmsProperties);
    }
Modified: trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossStreamMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -98,7 +98,6 @@
          byte priority,
          Map coreHeaders,
          byte[] payloadAsByteArray,         
-         int persistentChannelCount,
          String jmsType,
          String correlationID,
          byte[] correlationIDBytes,
@@ -107,7 +106,6 @@
          HashMap jmsProperties)
    {
       super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
-            persistentChannelCount,
             jmsType, correlationID, correlationIDBytes, destination, replyTo, 
             jmsProperties);
    }
Modified: trunk/src/main/org/jboss/jms/message/JBossTextMessage.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/JBossTextMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/message/JBossTextMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -83,7 +83,6 @@
          byte priority,
          Map coreHeaders,
          byte[] payloadAsByteArray,         
-         int persistentChannelCount,
          String jmsType,
          String correlationID,
          byte[] correlationIDBytes,
@@ -92,7 +91,6 @@
          HashMap jmsProperties)
    {
       super(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray,
-            persistentChannelCount,
             jmsType, correlationID, correlationIDBytes, destination, replyTo, 
             jmsProperties);
    }
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -212,9 +212,9 @@
          //MBean dependencies
             
          // Create the wired components
-         messageIdManager = new IdManager("MESSAGE_ID", 8192, persistenceManager);
+         messageIdManager = new IdManager("MESSAGE_ID", 4096, persistenceManager);
          channelIdManager = new IdManager("CHANNEL_ID", 10, persistenceManager);
-         transactionIdManager = new IdManager("TRANSACTION_ID", 4096, persistenceManager);
+         transactionIdManager = new IdManager("TRANSACTION_ID", 1024, persistenceManager);
          destinationJNDIMapper = new DestinationJNDIMapper(this);
          connFactoryJNDIMapper = new ConnectionFactoryJNDIMapper(this);
          connectionManager = new SimpleConnectionManager();
Modified: trunk/src/main/org/jboss/messaging/core/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Message.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/Message.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -41,13 +41,11 @@
    Serializable getPayload();
    
    byte[] getPayloadAsByteArray();
+    
+   boolean isPersisted();
    
-   void incPersistentChannelCount();
+   void setPersisted(boolean persisted);
    
-   void decPersistentChannelCount();
-   
-   int getPersistentChannelCount();
-   
    byte getType();
    
 }
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -340,7 +340,7 @@
       {
          // Now we remove the references we loaded (only the non persistent or persistent in a non-recoverable store)
          
-         pm.removeReferences(channelID, toRemove);
+         pm.removeDepagedReferences(channelID, toRemove);
       }
 
       if (loadedReliable)
@@ -551,7 +551,7 @@
 
       if (!toAdd.isEmpty())
       {
-         pm.addReferences(channelID, toAdd, true);
+         pm.pageReferences(channelID, toAdd, true);
       }
       if (!toUpdate.isEmpty())
       {
Modified: trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/message/CoreMessage.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -53,10 +53,9 @@
                       long timestamp,
                       byte priority,
                       Map headers,
-                      byte[] payload,
-                      int persistentChannelCount)
+                      byte[] payload)
    {
-      super(messageID, reliable, expiration, timestamp, priority, 0, headers, payload, persistentChannelCount);
+      super(messageID, reliable, expiration, timestamp, priority, 0, headers, payload);
    }
 
    // Public --------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageFactory.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -82,74 +82,85 @@
       return m;
    }
    
-   
-   public static Message createJBossMessage(long messageID,
-                                          boolean reliable, 
-                                          long expiration, 
-                                          long timestamp,
-                                          byte priority,
-                                          Map coreHeaders,
-                                          byte[] payloadAsByteArray,                                          
-                                          int persistentChannelCount,
-                                          byte type,
-                                          String jmsType,                                       
-                                          String correlationID,
-                                          byte[] correlationIDBytes,
-                                          JBossDestination destination,
-                                          JBossDestination replyTo,                                                                              
-                                          HashMap jmsProperties)
+   /*
+    * Create a message from persistent storage
+    */
+   public static Message createMessage(long messageID,
+                                       boolean reliable, 
+                                       long expiration, 
+                                       long timestamp,
+                                       byte priority,
+                                       Map coreHeaders,
+                                       byte[] payloadAsByteArray,                                                                                    
+                                       byte type,
+                                       String jmsType,                                       
+                                       String correlationID,
+                                       byte[] correlationIDBytes,
+                                       JBossDestination destination,
+                                       JBossDestination replyTo,                                                                              
+                                       HashMap jmsProperties)
 
    {
-
       Message m = null;
       
-      if (type == JBossMessage.TYPE)
+      switch (type)
       {
-         m = new JBossMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
-                              payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
-                              destination, replyTo, jmsProperties);
+         case JBossMessage.TYPE:
+         {
+            m = new JBossMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+                     payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+                     destination, replyTo, jmsProperties);
+            break;
+         }
+         case JBossObjectMessage.TYPE:
+         {
+            m = new JBossObjectMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+                     payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+                     destination, replyTo, jmsProperties);
+            break;
+         }
+         case JBossTextMessage.TYPE:
+         {
+            m = new JBossTextMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+                     payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+                     destination, replyTo, jmsProperties);
+            break;
+         }
+         case JBossBytesMessage.TYPE:
+         {
+            m = new JBossBytesMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+                     payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+                     destination, replyTo, jmsProperties);
+            break;
+         }
+         case JBossMapMessage.TYPE:
+         {
+            m = new JBossMapMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+                     payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+                     destination, replyTo, jmsProperties);
+            break;
+         }
+         case JBossStreamMessage.TYPE:
+         {
+            m = new JBossStreamMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
+                     payloadAsByteArray, jmsType, correlationID, correlationIDBytes,
+                     destination, replyTo, jmsProperties);
+            break;
+         }
+         case CoreMessage.TYPE:
+         {
+            m = new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray);
+            break;
+         }
+         default:
+         {
+            throw new IllegalArgumentException("Unknown type " + type);       
+         }
       }
-      else if (type == JBossObjectMessage.TYPE)
-      {
-         m = new JBossObjectMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
-               payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
-               destination, replyTo, jmsProperties);
-      }
-      else if (type == JBossTextMessage.TYPE)
-      {
-         m = new JBossTextMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
-               payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
-               destination, replyTo, jmsProperties);
-      }
-      else if (type == JBossBytesMessage.TYPE)
-      {
-         m = new JBossBytesMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
-               payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
-               destination, replyTo, jmsProperties);
-      }
-      else if (type == JBossMapMessage.TYPE)
-      {
-         m = new JBossMapMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
-               payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
-               destination, replyTo, jmsProperties);
-      }
-      else if (type == JBossStreamMessage.TYPE)
-      {
-         m = new JBossStreamMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders,
-               payloadAsByteArray, persistentChannelCount, jmsType, correlationID, correlationIDBytes,
-               destination, replyTo, jmsProperties);
-      }
-      else if (type == CoreMessage.TYPE)
-      {
-         m = new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, payloadAsByteArray, persistentChannelCount);
-      }
-      else
-      {
-          throw new IllegalArgumentException("Unknow type " + type);                       
-      }
-
+      
+      m.setPersisted(true);
+      
       return m;
-
    }
 
    // Attributes ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/message/MessageSupport.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -56,17 +56,8 @@
    // Must be hidden from subclasses
    private byte[] payloadAsByteArray;
    
-  // private transient boolean inStorage;
+   private transient boolean persisted;
    
-   /* 
-   * We maintain a persistent channel count on the message itself.
-   * This is the total number of channels whether loaded in memory or not that hold a reference to the
-   * message and is needed to know when it is safe to remove the message from the db
-   * A channel may not have all it's references loaded into memory at once, also not all
-   * channels might be active at once so we can't rely on the in memory channel count.   
-   */
-   private transient int persistentChannelCount;
-
    // Constructors --------------------------------------------------
 
    /**
@@ -124,12 +115,10 @@
                          byte priority,
                          int deliveryCount,                        
                          Map headers,
-                         byte[] payloadAsByteArray,
-                         int persistentChannelCount)
+                         byte[] payloadAsByteArray)
    {
       super(messageID, reliable, expiration, timestamp, priority, deliveryCount, headers);
       this.payloadAsByteArray = payloadAsByteArray;
-      this.persistentChannelCount = persistentChannelCount;
    }
 
    protected MessageSupport(MessageSupport that)
@@ -238,21 +227,17 @@
       this.payloadAsByteArray = null;
    }
    
-   public synchronized void decPersistentChannelCount()
+   public synchronized boolean isPersisted()
    {
-      persistentChannelCount--;
+      return persisted;
    }
    
-   public synchronized void incPersistentChannelCount()
+   public synchronized void setPersisted(boolean persisted)
    {
-      persistentChannelCount++;
+      this.persisted = persisted;
    }
    
-   public synchronized int getPersistentChannelCount()
-   {
-      return persistentChannelCount;
-   }
-   
+
    // Public --------------------------------------------------------
 
    public boolean equals(Object o)
Modified: trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -161,10 +161,7 @@
    {
       //TODO This will need locking (e.g. SELECT ... FOR UPDATE...) in the clustered case
        
-      if (trace)
-      {
-         log.trace("Getting id block for counter: " + counterName + " ,size: " + size);
-      }
+      if (trace) { log.trace("Getting id block for counter: " + counterName + " ,size: " + size); }
       
       if (size <= 0)
       {
@@ -201,11 +198,8 @@
             
             int rows = ps.executeUpdate();
             
-            if (trace)
-            {
-               log.trace(JDBCUtil.statementToString(getSQLStatement("INSERT_COUNTER"), counterName)
-                     + " inserted " + rows + " rows");
-            }  
+            if (trace) {  log.trace(JDBCUtil.statementToString(getSQLStatement("INSERT_COUNTER"), counterName)
+                           + " inserted " + rows + " rows"); }  
             
             ps.close();            
             ps = null;
@@ -213,10 +207,7 @@
             return 0;
          }
          
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("SELECT_COUNTER"), counterName));
-         }
+         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("SELECT_COUNTER"), counterName)); }
          
          long nextId = rs.getLong(1);
          
@@ -233,12 +224,8 @@
          
          int rows = ps.executeUpdate();
          
-         if (trace)
-         {
-            log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_COUNTER"), new Long(nextId + size),
-                  counterName)
-                  + " updated " + rows + " rows");
-         }        
+         if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_COUNTER"), new Long(nextId + size),
+                               counterName) + " updated " + rows + " rows"); }        
          
          return nextId;
       }
@@ -278,10 +265,8 @@
       
    public void updateReliableReferencesNotPagedInRange(long channelID, long orderStart, long orderEnd, long num) throws Exception
    {
-      if (trace)
-      {
-         log.trace("Updating reliable references for channel " + channelID + " between " + orderStart + " and " + orderEnd);
-      }
+      if (trace) { log.trace("Updating reliable references for channel " + channelID + " between " + orderStart + " and " + orderEnd); }
+      
       Connection conn = null;
       PreparedStatement ps = null;
       TransactionWrapper wrap = new TransactionWrapper();
@@ -308,12 +293,8 @@
             {
                int rows = ps.executeUpdate();
                  
-               if (trace)
-               {
-                  log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"), new Long(channelID),
-                        new Long(orderStart), new Long(orderEnd))
-                        + " updated " + rows + " rows");
-               }
+               if (trace) { log.trace(JDBCUtil.statementToString(getSQLStatement("UPDATE_RELIABLE_REFS_NOT_PAGED"), new Long(channelID),
+                                      new Long(orderStart), new Long(orderEnd)) + " updated " + rows + " rows"); }
                if (tries > 0)
                {
                   log.warn("Update worked after retry");
@@ -330,14 +311,19 @@
             catch (SQLException e)
             {
                log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
+               
                tries++;
+               
                if (tries == MAX_TRIES)
                {
                   log.error("Retried " + tries + " times, now giving up");
+                  
                   throw new IllegalStateException("Failed to update references");
                }
+               
                log.warn("Trying again after a pause");
-               //Now we wait for a random amount of time to minimise risk of deadlock
+               
+               //Now we wait for a random amount of time to minimise risk of deadlock occurring again
                Thread.sleep((long)(Math.random() * 500));
             }  
          }
@@ -384,10 +370,7 @@
     */
    public List getMessages(List messageIds) throws Exception
    {
-      if (trace)
-      {
-         log.trace("Getting batch of messages for " + messageIds);
-      }
+      if (trace) { log.trace("Getting batch of messages for " + messageIds); }
       
       Connection conn = null;
       PreparedStatement ps = null;
@@ -460,6 +443,8 @@
                   byte[] bytes = getBytes(rs, 6);
                   HashMap coreHeaders = bytesToMap(bytes);
                   byte[] payload = getBytes(rs, 7);
+                  
+                  //TODO we don't need this
                   int persistentChannelCount = rs.getInt(8);
                   
                   //TODO - We are mixing concerns here
@@ -507,16 +492,16 @@
                         }
                      }
                          
-                     m = MessageFactory.createJBossMessage(messageId, reliable, expiration, timestamp, priority,
-                                                      coreHeaders, payload, persistentChannelCount,
+                     m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
+                                                      coreHeaders, payload,
                                                       type, jmsType, correlationID, correlationIDBytes,
                                                       dest, replyToDest,
                                                       jmsProperties);
                   }
                   else
                   {
-                     m = MessageFactory.createJBossMessage(messageId, reliable, expiration, timestamp, priority,
-                                                          coreHeaders, payload, persistentChannelCount, type,
+                     m = MessageFactory.createMessage(messageId, reliable, expiration, timestamp, priority,
+                                                          coreHeaders, payload, type,
                                                           null, null, null, null, null, null);
                   }
                   
@@ -532,10 +517,7 @@
             }
          }
          
-         if (trace)
-         {
-            log.trace("Loaded " + msgs.size() + " messages in total");
-         }
+         if (trace) { log.trace("Loaded " + msgs.size() + " messages in total"); }
 
          return msgs;
       }
@@ -580,19 +562,21 @@
       }
    }  
                
-   public void addReferences(long channelID, List references, boolean paged) throws Exception
+   public void pageReferences(long channelID, List references, boolean paged) throws Exception
    {  
       Connection conn = null;
       PreparedStatement psInsertReference = null;  
       PreparedStatement psInsertMessage = null;
       PreparedStatement psUpdateMessage = null;
+      PreparedStatement psMessageExists = null;
+      ResultSet rsMessageExists = null;
       TransactionWrapper wrap = new TransactionWrapper();
             
       //First we order the references in message order
       orderReferences(references);
       
-      List addsToReverse = new ArrayList();
-                        
+      log.info("adding references: " + references.size());
+                       
       try
       {
          //Now we get a lock on all the messages. Since we have ordered the refs we should avoid deadlock
@@ -609,7 +593,7 @@
          {
             psInsertReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
             psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
          }
          
          while (iter.hasNext())
@@ -647,31 +631,57 @@
             if (!usingBatchUpdates)
             {
                psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
             }
                                                                                      
             //Maybe we need to persist the message itself
             Message m = ref.getMessage();
             
-            m.incPersistentChannelCount();
+            //In a paging situation, we cannot use the persisted flag on the messager to determine whether
+            //to insert the message or not.
+            //This is because a channel (possibly on another node) may be paging too and referencing
+            //the same message, and might have removed the message independently, the other
+            //channel will not know about this.
+            //Therefore we have to check if the message is already in the database and insert it if it isn't
             
-            addsToReverse.add(ref);
-  
+            //TODO This is a bit of a hassle -
+            //A cleaner and better solution here is to completely separate out the paging functionality from the
+            //standard persistence functionality since it complicates things considerably.
+            //We should define a paging store which is separate from the persistence store, and
+            //typically not using the database for the paging store - probably use a file based store
+            //e.g HOWL or some other logger
+            
+            //Note when running this with two or more competing channels in the same process, then
+            //we do not need a FOR UPDATE on the select since we lock the messages in memory
+            //However for competing nodes, we do, therefore we require a database that supports
+            //this, this is another reason why we cannot use HSQL in a clustered environment
+            //since it does not have a for update equivalent
+            
+            psMessageExists = conn.prepareStatement(getSQLStatement("MESSAGE_EXISTS"));
+            
+            psMessageExists.setLong(1, m.getMessageID());
+            
+            rsMessageExists = psMessageExists.executeQuery();
+            
             boolean added;
-            if (m.getPersistentChannelCount() == 1)
+            
+            if (rsMessageExists.next())
             {
-               //Hasn't been persisted before so need to persist the message
-               storeMessage(m, psInsertMessage);
+               //Message exists
                
-               added = true;
+               // Update the message with the new channel count
+               incrementChannelCount(m, psUpdateMessage);
+               
+               added = false;
+               
             }
             else
             {
-               //Update the message with the new channel count
-               updateMessageChannelCount(m, psUpdateMessage);
+               //Hasn't been persisted before so need to persist the message
+               storeMessage(m, psInsertMessage);
                
-               added = false;
-            }
+               added = true;
+            }            
             
             if (usingBatchUpdates)
             {
@@ -717,28 +727,19 @@
          {
             int[] rowsReference = psInsertReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
             
             if (messageInsertsInBatch)
             {
                int[] rowsMessage = psInsertMessage.executeBatch();
                
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted");
-               }
+               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
             }
             if (messageUpdatesInBatch)
             {
                int[] rowsMessage = psUpdateMessage.executeBatch();
                
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
-               }
+               if (trace) { logBatchUpdate(getSQLStatement("INC_CHANNELCOUNT"), rowsMessage, "updated"); }
             }
             
             psInsertReference.close();
@@ -747,7 +748,7 @@
             psInsertMessage = null;
             psUpdateMessage.close();
             psUpdateMessage = null;
-         }         
+         }        
       }
       catch (Exception e)
       {
@@ -791,7 +792,6 @@
             try
             {
                conn.close();
-               //dwtwsbtwotl
             }
             catch (Throwable t)
             {
@@ -803,19 +803,13 @@
          }
          finally
          {            
-            if (wrap.isFailed())
-            {
-               //Reverse the incs
-               this.decPersistentCounts(addsToReverse);
-            }
-            
             //And then release locks
             this.releaseLocks(references);
          }         
       }      
    }
          
-   public void removeReferences(long channelID, List references) throws Exception
+   public void removeDepagedReferences(long channelID, List references) throws Exception
    {
       if (trace) { log.trace(this + " Removing " + references.size() + " refs from channel " + channelID); }
           
@@ -827,9 +821,7 @@
       
       //We order the references
       orderReferences(references);
-      
-      List removesToReverse = new ArrayList();
-           
+             
       try
       {
          //We get locks on all the messages - since they are ordered we avoid deadlock
@@ -839,14 +831,11 @@
          
          Iterator iter = references.iterator();
          
-         boolean messageDeletionsInBatch = false;
-         boolean messageUpdatesInBatch = false;
-         
          if (usingBatchUpdates)
          {
             psDeleteReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
             psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
          }
          
          while (iter.hasNext())
@@ -868,10 +857,7 @@
             {
                int rows = psDeleteReference.executeUpdate();
                
-               if (trace)
-               {
-                  log.trace("Deleted " + rows + " rows");
-               }
+               if (trace) { log.trace("Deleted " + rows + " rows"); }
                
                psDeleteReference.close();
                psDeleteReference = null;
@@ -880,66 +866,35 @@
             if (!usingBatchUpdates)
             {
                psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
             }
                
             Message m = ref.getMessage();
                                     
             //Maybe we need to delete the message itself
             
-            m.decPersistentChannelCount();
+            //Update the message with the new channel count
+            decrementChannelCount(m, psUpdateMessage);
             
-            removesToReverse.add(ref);
-  
-            boolean removed;
-            if (m.getPersistentChannelCount() == 0)
+            //Run the remove message update
+            removeMessage(m, psDeleteMessage);
+                        
+            if (usingBatchUpdates)
             {
-               //No more refs so remove the message
-               removeMessage(m, psDeleteMessage);
+               psUpdateMessage.addBatch();
                
-               removed = true;
+               psDeleteMessage.addBatch();
             }
             else
             {
-               //Update the message with the new channel count
-               updateMessageChannelCount(m, psUpdateMessage);
+               int rows = psUpdateMessage.executeUpdate();
+                              
+               if (trace) { log.trace("Updated " + rows + " rows"); }
                
-               removed = false;
-            }
+               rows = psDeleteMessage.executeUpdate();
+               
+               if (trace) { log.trace("Deleted " + rows + " rows"); }
             
-            if (usingBatchUpdates)
-            {
-               if (removed)
-               {
-                  psDeleteMessage.addBatch();
-                  messageDeletionsInBatch = true;
-               }
-               else
-               {
-                  psUpdateMessage.addBatch();
-                  messageUpdatesInBatch = true;
-               }
-            }
-            else
-            {
-               if (removed)
-               {
-                  int rows = psDeleteMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("Deleted " + rows + " rows");
-                  }
-               }
-               else
-               {
-                  int rows = psUpdateMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("Updated " + rows + " rows");
-                  }
-               }
                psDeleteMessage.close();
                psDeleteMessage = null;
                psUpdateMessage.close();
@@ -951,30 +906,16 @@
          {
             int[] rowsReference = psDeleteReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted"); }
             
-            if (messageDeletionsInBatch)
-            {
-               int[] rowsMessage = psDeleteMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rowsMessage, "deleted");
-               }
-            }
-            if (messageUpdatesInBatch)
-            {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
-               }
-            }
+            rowsReference = psUpdateMessage.executeBatch();
             
+            if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rowsReference, "updated"); }
+            
+            rowsReference = psDeleteMessage.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rowsReference, "deleted"); }
+                                    
             psDeleteReference.close();
             psDeleteReference = null;
             psDeleteMessage.close();
@@ -1036,12 +977,6 @@
          }
          finally
          {     
-            if (wrap.isFailed())
-            {
-               //Reverse the decs
-               this.incPersistentCounts(removesToReverse);
-            }
-            
             //And then release locks
             this.releaseLocks(references);
          }         
@@ -1050,10 +985,7 @@
       
    public List getPagedReferenceInfos(long channelID, long orderStart, long number) throws Exception
    {
-      if (trace)
-      {
-         log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number);
-      }
+      if (trace) { log.trace("loading message reference info for channel " + channelID + " from " + orderStart + " number " + number);      }
                     
       List refs = new ArrayList();
       
@@ -1152,14 +1084,10 @@
     */
    public InitialLoadInfo getInitialReferenceInfos(long channelID, int fullSize) throws Exception
    {
-      if (trace)
-      {
-         log.trace("loading initial reference infos for channel " + channelID);
-      }
+      if (trace) { log.trace("loading initial reference infos for channel " + channelID);  }
                     
       List refs = new ArrayList();
-      List extraRefs = new ArrayList();
-      
+ 
       Connection conn = null;
       PreparedStatement ps = null;
       ResultSet rs = null;
@@ -1272,10 +1200,7 @@
       PreparedStatement psUpdateReference = null;  
       TransactionWrapper wrap = new TransactionWrapper();
       
-      if (trace)
-      {
-         log.trace("Updating page order for channel:" + channelID);
-      }
+      if (trace) { log.trace("Updating page order for channel:" + channelID); }
         
       try
       {
@@ -1311,10 +1236,7 @@
             {
                int rows = psUpdateReference.executeUpdate();
                
-               if (trace)
-               {
-                  log.trace("Updated " + rows + " rows");
-               }
+               if (trace) { log.trace("Updated " + rows + " rows"); }
                
                psUpdateReference.close();
                psUpdateReference = null;
@@ -1325,10 +1247,7 @@
          {
             int[] rowsReference = psUpdateReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("UPDATE_PAGE_ORDER"), rowsReference, "updated"); }
                         
             psUpdateReference.close();
             psUpdateReference = null;
@@ -1382,6 +1301,8 @@
       {         
          //No tx so add the ref directly in the db
          
+         log.info("Adding ref: " + ref);
+         
          TransactionWrapper wrap = new TransactionWrapper();
          
          PreparedStatement psReference = null;
@@ -1390,9 +1311,7 @@
          Connection conn = ds.getConnection();
          
          Message m = ref.getMessage();     
-         
-         boolean incremented = false;
-         
+           
          try
          {            
             // Get lock on message
@@ -1405,34 +1324,35 @@
             
             int rows = psReference.executeUpdate();            
             
-            if (trace)
-            {
-               log.trace("Inserted " + rows + " rows");
-            }
+            if (trace) { log.trace("Inserted " + rows + " rows"); }
             
-            m.incPersistentChannelCount();
-            incremented = true;
-                           
-            if (m.getPersistentChannelCount() == 1)
+            log.info("Inserted " + rows + " rows");
+               
+            if (!m.isPersisted())
             {
+               log.info(m + " Not persisted so inserting message");
                // First time so persist the message
                psMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
                
-               storeMessage(m, psMessage);        
+               storeMessage(m, psMessage);
+               
+               m.setPersisted(true);
             }
             else
             {
+               log.info("Already persisted so updating message");
+               
                //Update the message's channel count
-               psMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
                
-               updateMessageChannelCount(m, psMessage);
+               incrementChannelCount(m, psMessage);
             }
                            
             rows = psMessage.executeUpdate();
-            if (trace)
-            {
-               log.trace("Inserted/updated " + rows + " rows");
-            }                                     
+            
+            if (trace) { log.trace("Inserted/updated " + rows + " rows"); }     
+            
+            log.trace("message Inserted/updated " + rows + " rows");
          }
          catch (Exception e)
          {
@@ -1477,12 +1397,6 @@
             }
             finally
             {   
-               if (wrap.isFailed() && incremented)
-               {
-                  //reverse the inc                  
-                  m.decPersistentChannelCount();                  
-               }
-               
                //Release Lock
                LockMap.instance.releaseLock(m);
             }
@@ -1507,14 +1421,13 @@
          TransactionWrapper wrap = new TransactionWrapper();
          
          PreparedStatement psReference = null;
+         PreparedStatement psUpdate = null;
          PreparedStatement psMessage = null;
          
          Connection conn = ds.getConnection();
          
          Message m = ref.getMessage();         
          
-         boolean decremented = false;
-         
          try
          {
             //get lock on message
@@ -1522,6 +1435,8 @@
                               
             psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
             
+            log.info("Removing ref " + ref + " from channel " + channelID);
+            
             //Remove the message reference
             removeReference(channelID, ref, psReference);
             
@@ -1532,40 +1447,27 @@
                throw new IllegalStateException("Failed to remove row for: " + ref);
             }
             
-            if (trace)
-            {
-               log.trace("Deleted " + rows + " rows");
-            }
+            if (trace) { log.trace("Deleted " + rows + " rows"); }
             
-            m.decPersistentChannelCount();
-            decremented = true;
-                           
-            if (m.getPersistentChannelCount() == 0)
-            {
-               //No other channels have a reference so we can delete the message
-               psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-                                 
-               removeMessage(m, psMessage);
-            }
-            else
-            {
-               //Other channel(s) still have hold references so update the channel count
-               psMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
-               
-               updateMessageChannelCount(m, psMessage);
-            }
-                              
-            rows = psMessage.executeUpdate();
+            //Update the messages channel count
             
-            if (rows != 1)
-            {
-               throw new IllegalStateException("Failed to update/delete message row for ref " + ref);
-            }
+            psUpdate = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
             
-            if (trace)
-            {
-               log.trace("Delete/updated " + rows + " rows");
-            }                           
+            decrementChannelCount(m, psUpdate);
+            
+            rows = psUpdate.executeUpdate();
+            
+            if (trace) { log.trace("Updated " + rows + " rows"); } 
+            
+            //Delete the message (if necessary)
+            
+            psMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
+            
+            removeMessage(m, psMessage);
+                       
+            rows = psMessage.executeUpdate();
+            
+            if (trace) { log.trace("Delete " + rows + " rows"); }                           
          }
          catch (Exception e)
          {
@@ -1584,6 +1486,16 @@
                {
                }
             }
+            if (psUpdate != null)
+            {
+               try
+               {
+                  psUpdate.close();
+               }
+               catch (Throwable t)
+               {
+               }
+            }
             if (psMessage != null)
             {
                try
@@ -1610,11 +1522,6 @@
             }
             finally
             {      
-               if (wrap.isFailed() && decremented)
-               {
-                  //Reverse decrement
-                  m.incPersistentChannelCount();
-               }
                //release the lock
                LockMap.instance.releaseLock(m);
             }
@@ -1797,6 +1704,8 @@
    protected void handleBeforeCommit1PC(List refsToAdd, List refsToRemove, Transaction tx)
       throws Exception
    {
+      log.info("Before commit 1pc: I have " + refsToAdd.size() + " refs to add and " + refsToRemove.size() + " refs to remove");
+      
       //TODO - A slight optimisation - it's possible we have refs referring to the same message
       //so we will end up acquiring the lock more than once which is unnecessary
       //If find unique set of messages can avoid this
@@ -1825,13 +1734,11 @@
       Connection conn = null;
       PreparedStatement psReference = null;
       PreparedStatement psInsertMessage = null;
-      PreparedStatement psUpdateMessage = null;
+      PreparedStatement psIncMessage = null;
+      PreparedStatement psDecMessage = null;
       PreparedStatement psDeleteMessage = null;
       TransactionWrapper wrap = new TransactionWrapper();
       
-      List addsToReverse = new ArrayList();
-      List removesToReverse = new ArrayList();
-      
       try
       {
          conn = ds.getConnection();
@@ -1852,7 +1759,7 @@
          {
             psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
             psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psIncMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
          }
          
          while (iter.hasNext())
@@ -1867,6 +1774,7 @@
             }
             
             //Now store the reference
+            log.info("Adding ref: " + ref + " in channel: " + pair.channelId);
             addReference(pair.channelId, ref, psReference, false);
               
             if (batch)
@@ -1877,10 +1785,7 @@
             {
                int rows = psReference.executeUpdate();
                
-               if (trace)
-               {
-                  log.trace("Inserted " + rows + " rows");
-               }
+               if (trace) { log.trace("Inserted " + rows + " rows"); }                              
                
                psReference.close();
                psReference = null;
@@ -1891,25 +1796,26 @@
             if (!batch)
             {
                psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psIncMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
             }
                          
-            m.incPersistentChannelCount();
-            addsToReverse.add(ref);
-            
             boolean added;
-            if (m.getPersistentChannelCount() == 1)
+            if (!m.isPersisted())
             {
+               log.info("The message " + m.getMessageID() + " hasn't been persisted before so i'm gonna persist it");
                //First time so add message
                storeMessage(m, psInsertMessage);
-
+               
                added = true;
+               
+               m.setPersisted(true);
             }
             else
             {
+               log.info("The message " + m.getMessageID() + " has been peristed before so just updating it's channel count");
                //Update message channel count
-               updateMessageChannelCount(m, psUpdateMessage);
- 
+               incrementChannelCount(m, psIncMessage);
+               
                added = false;
             }
             
@@ -1918,11 +1824,13 @@
                if (added)
                {
                   psInsertMessage.addBatch();
+                  
                   messageInsertsInBatch = true;
                }
                else
-               {
-                  psUpdateMessage.addBatch();
+               { 
+                  psIncMessage.addBatch();
+                  
                   messageUpdatesInBatch = true;
                }
             }
@@ -1932,24 +1840,18 @@
                {
                   int rows = psInsertMessage.executeUpdate();
                   
-                  if (trace)
-                  {
-                     log.trace("Inserted " + rows + " rows");
-                  }
+                  if (trace) { log.trace("Inserted " + rows + " rows"); }
                }
                else
                {
-                  int rows = psUpdateMessage.executeUpdate();
+                  int rows = psIncMessage.executeUpdate();
                   
-                  if (trace)
-                  {
-                     log.trace("Updated " + rows + " rows");
-                  }
+                  if (trace) { log.trace("Updated " + rows + " rows"); }
                }
                psInsertMessage.close();
                psInsertMessage = null;
-               psUpdateMessage.close();
-               psUpdateMessage = null;
+               psIncMessage.close();
+               psIncMessage = null;
             }
          }         
          
@@ -1957,36 +1859,27 @@
          {
             int[] rowsReference = psReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
             
             if (messageInsertsInBatch)
             {
                int[] rowsMessage = psInsertMessage.executeBatch();
                
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted");
-               }
+               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
             }
             if (messageUpdatesInBatch)
             {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
+               int[] rowsMessage = psIncMessage.executeBatch();
                
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
-               }
+               if (trace) { logBatchUpdate(getSQLStatement("INC_CHANNELCOUNT"), rowsMessage, "updated"); }
             }
             
             psReference.close();
             psReference = null;
             psInsertMessage.close();
             psInsertMessage = null;
-            psUpdateMessage.close();
-            psUpdateMessage = null;
+            psIncMessage.close();
+            psIncMessage = null;
          }
          
          //Now the removes
@@ -1994,17 +1887,15 @@
          iter = refsToRemove.iterator();
          
          batch = usingBatchUpdates && refsToRemove.size() > 0;
-         boolean messageDeletionsInBatch = false;
-         messageUpdatesInBatch = false;
+
          psReference = null;
-         psUpdateMessage = null;
          psDeleteMessage = null;
          
          if (batch)
          {
             psReference = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE_REF"));
             psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psDecMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
          }
          
          while (iter.hasNext())
@@ -2026,10 +1917,7 @@
             {
                int rows = psReference.executeUpdate();
                
-               if (trace)
-               {
-                  log.trace("Deleted " + rows + " rows");
-               }
+               if (trace) { log.trace("Deleted " + rows + " rows"); }
                
                psReference.close();
                psReference = null;
@@ -2038,106 +1926,62 @@
             if (!batch)
             {
                psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psDecMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
             }
             
             Message m = pair.ref.getMessage();
                                 
-            m.decPersistentChannelCount();
+            //Update the channel count
             
-            removesToReverse.add(pair.ref);
+            decrementChannelCount(m, psDecMessage);
             
-            boolean removed;
+            //Delete the message (if necessary)
             
-            if (m.getPersistentChannelCount() == 0)
+            removeMessage(m, psDeleteMessage);
+                       
+            if (batch)
             {
-               //No more refs - message can be deleted
-               removeMessage(m, psDeleteMessage);
-                  
-               removed = true;
-            }
-            else
-            {
-               //Update channel count for message
-               updateMessageChannelCount(m, psUpdateMessage);
+               psDecMessage.addBatch();
                
-               removed = false;
+               psDeleteMessage.addBatch();                              
             }
-            
-            if (batch)
-            {
-               if (removed)
-               {
-                  psDeleteMessage.addBatch();
-                  messageDeletionsInBatch = true;
-               }
-               else
-               {
-                  psUpdateMessage.addBatch();
-                  messageUpdatesInBatch = true;                        
-               }
-            }
             else
             {
-               if (removed)
-               {
-                  int rows = psDeleteMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("Deleted " + rows + " rows");
-                  }
-               }
-               else
-               {
-                  int rows = psUpdateMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("Updated " + rows + " rows");
-                  }
-               }
+               int rows = psDecMessage.executeUpdate();
+               
+               if (trace) { log.trace("Updated " + rows + " rows"); }
+               
+               rows = psDeleteMessage.executeUpdate();
+               
+               if (trace) { log.trace("Deleted " + rows + " rows"); }
+
                psDeleteMessage.close();
                psDeleteMessage = null;
-               psUpdateMessage.close();
-               psUpdateMessage = null;
+               psDecMessage.close();
+               psDecMessage = null;
             }            
          }
          
          if (batch)
          {
-            int[] rowsReference = psReference.executeBatch();
+            int[] rows = psReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rowsReference, "deleted");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE_REF"), rows, "deleted"); }
             
-            if (messageDeletionsInBatch)
-            {
-               int[] rowsMessage = psDeleteMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rowsMessage, "deleted");
-               }
-            }
-            if (messageUpdatesInBatch)
-            {
-               int[] rowsMessage = psUpdateMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
-               }
-            }
+            rows = psDecMessage.executeBatch();
             
+            if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rows, "updated"); }
+
+            rows = psDeleteMessage.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
+
             psReference.close();
             psReference = null;
             psDeleteMessage.close();
             psDeleteMessage = null;
-            psUpdateMessage.close();
-            psUpdateMessage = null;
+            psDecMessage.close();
+            psDecMessage = null;
          }         
       }
       catch (Exception e)
@@ -2168,16 +2012,26 @@
             {
             }
          }
-         if (psUpdateMessage != null)
+         if (psIncMessage != null)
          {
             try
             {
-               psUpdateMessage.close();
+               psIncMessage.close();
             }
             catch (Throwable t)
             {
             }
          }
+         if (psDecMessage != null)
+         {
+            try
+            {
+               psDecMessage.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }
          if (psDeleteMessage != null)
          {
             try
@@ -2203,14 +2057,7 @@
             wrap.end();                        
          }
          finally
-         {
-            if (wrap.isFailed())
-            {
-               //Reverse any incs and decs we made
-               this.decPersistentCounts(addsToReverse);
-               this.incPersistentCounts(removesToReverse);
-            }
-            
+         {  
             //Release the locks
             this.releaseLocks(allRefs);
          }
@@ -2235,8 +2082,6 @@
             
       orderReferences(refs);      
       
-      List removesToReverse = new ArrayList();
-      
       try
       {
          //get locks on all the refs
@@ -2254,13 +2099,11 @@
          commitPreparedTransaction(tx, conn);
          
          boolean batch = usingBatchUpdates && refsToRemove.size() > 0;
-         boolean messageDeletionsInBatch = false;
-         boolean messageUpdatesInBatch = false;
-      
+
          if (batch)
          {
             psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
          }
                   
          iter = refsToRemove.iterator();
@@ -2273,67 +2116,37 @@
             if (!batch)
             {
                psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
             }
             
             Message m = ref.getMessage();
                                    
             //We may need to remove the message itself
             
-            m.decPersistentChannelCount();
-            removesToReverse.add(ref);
+            //Update the channel count
             
-            boolean removed;
-            if (m.getPersistentChannelCount() == 0)
-            {
-               //We can remove the message
-               removeMessage(m, psDeleteMessage);
-               
-               removed = true;
-            }
-            else
-            {
-               //Decrement channel count
-               updateMessageChannelCount(m, psUpdateMessage);
-               
-               removed = false;
-            }
+            decrementChannelCount(m, psUpdateMessage);
+            
+            //Remove the message (if necessary)
+            
+            removeMessage(m, psDeleteMessage);          
                            
             if (batch)
             {
-               if (removed)
-               {
-                  psDeleteMessage.addBatch();
-                  
-                  messageDeletionsInBatch = true;
-               }
-               else
-               {
-                  psUpdateMessage.addBatch();
-                  
-                  messageUpdatesInBatch = true;
-               }
+               psUpdateMessage.addBatch();
+                
+               psDeleteMessage.addBatch(); 
             }
             else
             {
-               if (removed)
-               {
-                  int rows = psDeleteMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("Deleted " + rows + " rows");
-                  }
-               }
-               else
-               {
-                  int rows = psUpdateMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("Updated " + rows + " rows");
-                  }
-               }
+               int rows = psUpdateMessage.executeUpdate();
+               
+               if (trace) { log.trace("Updated " + rows + " rows"); }
+               
+               rows = psDeleteMessage.executeUpdate();
+               
+               if (trace) { log.trace("Deleted " + rows + " rows"); }
+               
                psDeleteMessage.close();
                psDeleteMessage = null;
                psUpdateMessage.close();
@@ -2343,30 +2156,19 @@
          
          if (batch)
          {
-            if (messageDeletionsInBatch)
-            {
-               int[] rows = psDeleteMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted");
-               }
-               
-               psDeleteMessage.close();
-               psDeleteMessage = null;
-            }
-            if (messageUpdatesInBatch)
-            {
-               int[] rows = psUpdateMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rows, "updated");
-               }
-               
-               psUpdateMessage.close();
-               psUpdateMessage = null;
-            }
+            int[] rows = psUpdateMessage.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rows, "updated"); }
+            
+            psUpdateMessage.close();
+            psUpdateMessage = null;
+            
+            rows = psDeleteMessage.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
+            
+            psDeleteMessage.close();
+            psDeleteMessage = null;                           
          }         
       }
       catch (Exception e)
@@ -2412,11 +2214,6 @@
          }
          finally
          {
-            if (wrap.isFailed())
-            {
-               //reverse any decs
-               this.incPersistentCounts(removesToReverse);
-            }
             //release the locks
             this.releaseLocks(refs);
          }
@@ -2438,8 +2235,6 @@
       
       orderReferences(refs);
       
-      List addsToReverse = new ArrayList();
-      
       //We insert a tx record and
       //a row for each ref with +
       //and update the row for each delivery with "-"
@@ -2472,7 +2267,7 @@
          {
             psReference = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE_REF"));
             psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
          }
          
          while (iter.hasNext())
@@ -2494,10 +2289,7 @@
             {
                int rows = psReference.executeUpdate();
                
-               if (trace)
-               {
-                  log.trace("Inserted " + rows + " rows");
-               }
+               if (trace) { log.trace("Inserted " + rows + " rows"); }
                
                psReference.close();
                psReference = null;
@@ -2506,16 +2298,14 @@
             if (!batch)
             {
                psInsertMessage = conn.prepareStatement(getSQLStatement("INSERT_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psUpdateMessage = conn.prepareStatement(getSQLStatement("INC_CHANNELCOUNT"));
             }
             
             Message m = pair.ref.getMessage();
+                   
+            boolean added;         
             
-            m.incPersistentChannelCount();
-            addsToReverse.add(pair.ref);
-                        
-            boolean added;
-            if (m.getPersistentChannelCount() == 1)
+            if (!m.isPersisted())
             {
                //First time so persist the message
                storeMessage(m, psInsertMessage);
@@ -2525,7 +2315,7 @@
             else
             {
                //Update message channel count
-               updateMessageChannelCount(m, psUpdateMessage);
+               incrementChannelCount(m, psUpdateMessage);
                
                added = false;
             }
@@ -2549,19 +2339,13 @@
                {
                   int rows = psInsertMessage.executeUpdate();
                   
-                  if (trace)
-                  {
-                     log.trace("Inserted " + rows + " rows");
-                  }
+                  if (trace) { log.trace("Inserted " + rows + " rows"); }
                }
                else
                {
                   int rows = psUpdateMessage.executeUpdate();
                   
-                  if (trace)
-                  {
-                     log.trace("Updated " + rows + " rows");
-                  }
+                  if (trace) { log.trace("Updated " + rows + " rows"); }
                }
                psInsertMessage.close();
                psInsertMessage = null;
@@ -2574,28 +2358,19 @@
          {
             int[] rowsReference = psReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE_REF"), rowsReference, "inserted"); }
             
             if (messageInsertsInBatch)
             {
                int[] rowsMessage = psInsertMessage.executeBatch();
                
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted");
-               }
+               if (trace) { logBatchUpdate(getSQLStatement("INSERT_MESSAGE"), rowsMessage, "inserted"); }
             }
             if (messageUpdatesInBatch)
             {
                int[] rowsMessage = psUpdateMessage.executeBatch();
                
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rowsMessage, "updated");
-               }
+               if (trace) { logBatchUpdate(getSQLStatement("INC_CHANNELCOUNT"), rowsMessage, "updated"); }
             }
             
             psReference.close();
@@ -2615,7 +2390,7 @@
          {
             psReference = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_REF"));
          }
-         
+
          while (iter.hasNext())
          {
             ChannelRefPair pair = (ChannelRefPair) iter.next();
@@ -2635,10 +2410,7 @@
             {
                int rows = psReference.executeUpdate();
                
-               if (trace)
-               {
-                  log.trace("updated " + rows + " rows");
-               }
+               if (trace) { log.trace("updated " + rows + " rows"); }
                
                psReference.close();
                psReference = null;
@@ -2649,10 +2421,7 @@
          {
             int[] rows = psReference.executeBatch();
             
-            if (trace)
-            {
-               logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_REF"), rows, "updated");
-            }
+            if (trace) { logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_REF"), rows, "updated"); }
             
             psReference.close();
             psReference = null;
@@ -2711,12 +2480,6 @@
          }
          finally
          {
-            if (wrap.isFailed())
-            {
-               //reverse any incs
-               this.decPersistentCounts(addsToReverse);
-            }
-            
             //release the locks
             
             this.releaseLocks(refs);
@@ -2746,8 +2509,6 @@
       
       orderReferences(refs);
       
-      List removesToReverse = new ArrayList();
-      
       try
       {
          this.getLocks(refs);
@@ -2759,12 +2520,11 @@
          iter = refsToAdd.iterator();
          
          boolean batch = usingBatchUpdates && refsToAdd.size() > 1;
-         boolean messageDeletionsInBatch = false;
-         boolean messageUpdatesInBatch = false;
+
          if (batch)
          {
             psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-            psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+            psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
          }
                                  
          while (iter.hasNext())
@@ -2774,65 +2534,37 @@
             if (!batch)
             {
                psDeleteMessage = conn.prepareStatement(getSQLStatement("DELETE_MESSAGE"));
-               psUpdateMessage = conn.prepareStatement(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"));
+               psUpdateMessage = conn.prepareStatement(getSQLStatement("DEC_CHANNELCOUNT"));
             }
             
             Message m = pair.ref.getMessage();
                                          
             //We may need to remove the message for messages added during the prepare stage
                         
-            m.decPersistentChannelCount();
-            removesToReverse.add(pair.ref);
+            //update the channel count
             
-            boolean removed;
-            if (m.getPersistentChannelCount() == 0)
+            decrementChannelCount(m, psUpdateMessage);
+            
+            //remove the message (if necessary)
+            
+            removeMessage(m, psDeleteMessage);
+                                        
+            if (batch)
             {
-               //remove message
-               removeMessage(m, psDeleteMessage);
+               psUpdateMessage.addBatch();
                
-               removed = true;                    
+               psDeleteMessage.addBatch();
             }
             else
             {
-               //update message channel count
-               updateMessageChannelCount(m, psUpdateMessage);
+               int rows = psUpdateMessage.executeUpdate();
                
-               removed = false;
-            }
-                              
-            if (batch)
-            {
-               if (removed)
-               {
-                  psDeleteMessage.addBatch();
-                  messageDeletionsInBatch = true;
-               }
-               else
-               {
-                  psUpdateMessage.addBatch();
-                  messageUpdatesInBatch = true;
-               }
-            }
-            else
-            {
-               if (removed)
-               {
-                  int rows = psDeleteMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("deleted " + rows + " rows");
-                  }
-               }
-               else
-               {
-                  int rows = psUpdateMessage.executeUpdate();
-                  
-                  if (trace)
-                  {
-                     log.trace("updated " + rows + " rows");
-                  }
-               }
+               if (trace) { log.trace("updated " + rows + " rows"); }
+               
+               rows = psDeleteMessage.executeUpdate();
+               
+               if (trace) { log.trace("deleted " + rows + " rows"); }
+               
                psDeleteMessage.close();
                psDeleteMessage = null;
                psUpdateMessage.close();
@@ -2842,30 +2574,19 @@
          
          if (batch)
          {
-            if (messageDeletionsInBatch)
-            {
-               int[] rows = psDeleteMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted");
-               }
-               
-               psDeleteMessage.close();
-               psDeleteMessage = null;
-            }
-            if (messageUpdatesInBatch)
-            {
-               int[] rows = psUpdateMessage.executeBatch();
-               
-               if (trace)
-               {
-                  logBatchUpdate(getSQLStatement("UPDATE_MESSAGE_CHANNELCOUNT"), rows, "updated");
-               }
-               
-               psUpdateMessage.close();
-               psUpdateMessage = null;
-            }
+            int[] rows = psUpdateMessage.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("DEC_CHANNELCOUNT"), rows, "updated"); }
+            
+            rows = psDeleteMessage.executeBatch();
+            
+            if (trace) { logBatchUpdate(getSQLStatement("DELETE_MESSAGE"), rows, "deleted"); }
+            
+            psDeleteMessage.close();
+            psDeleteMessage = null;
+            
+            psUpdateMessage.close();
+            psUpdateMessage = null;            
          }
       }
       catch (Exception e)
@@ -2911,11 +2632,6 @@
          }
          finally
          {
-            if (wrap.isFailed())
-            {
-               //reverse any removes
-               this.incPersistentCounts(removesToReverse);
-            }
             //release locks
             this.releaseLocks(refs);
          }
@@ -3216,12 +2932,18 @@
       return map;
    }
    
-   protected void updateMessageChannelCount(Message m, PreparedStatement ps) throws Exception
+
+   //TODO - combine these
+   protected void incrementChannelCount(Message m, PreparedStatement ps) throws Exception
    {
-      ps.setInt(1, m.getPersistentChannelCount());
-      ps.setLong(2, m.getMessageID());
+      ps.setLong(1, m.getMessageID());
    }
    
+   protected void decrementChannelCount(Message m, PreparedStatement ps) throws Exception
+   {
+      ps.setLong(1, m.getMessageID());
+   }
+   
    /**
     * Stores the message in the MESSAGE table.
     */
@@ -3443,47 +3165,6 @@
       }
    }
    
-   protected void incPersistentCounts(List refs)
-   {
-      Iterator iter = refs.iterator();
-      
-      while (iter.hasNext())
-      {
-         Object obj = iter.next();
-         MessageReference ref;
-         if (obj instanceof MessageReference)
-         {
-            ref = (MessageReference)obj;
-         }
-         else
-         {
-            ref = ((ChannelRefPair)obj).ref;
-         }
-         ref.getMessage().incPersistentChannelCount();
-      }
-   }
-   
-   protected void decPersistentCounts(List refs)
-   {
-      Iterator iter = refs.iterator();
-      
-      while (iter.hasNext())
-      {
-         Object obj = iter.next();
-         MessageReference ref;
-         if (obj instanceof MessageReference)
-         {
-            ref = (MessageReference)obj;
-         }
-         else
-         {
-            ref = ((ChannelRefPair)obj).ref;
-         }
-         ref.getMessage().decPersistentChannelCount();
-      }
-      
-   }
-   
    protected void logBatchUpdate(String name, int[] rows, String action)
    {
       int count = 0;
@@ -3564,9 +3245,11 @@
               "TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, " +
               "CORRELATIONID_BYTES, DESTINATION, REPLYTO, JMSPROPERTIES) " +
               "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" );
-      map.put("UPDATE_MESSAGE_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT=? WHERE MESSAGEID=?");
-      map.put("DELETE_MESSAGE", "DELETE FROM JMS_MESSAGE WHERE MESSAGEID=?");
+      map.put("INC_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT + 1 WHERE MESSAGEID=?");
+      map.put("DEC_CHANNELCOUNT", "UPDATE JMS_MESSAGE SET CHANNELCOUNT = CHANNELCOUNT - 1 WHERE MESSAGEID=?");
+      map.put("DELETE_MESSAGE", "DELETE FROM JMS_MESSAGE WHERE MESSAGEID=? AND CHANNELCOUNT = 0");
       map.put("MESSAGEID_COLUMN", "MESSAGEID");
+      map.put("MESSAGE_EXISTS", "SELECT MESSAGEID FROM JMS_MESSAGE WHERE MESSAGEID = ?");
       //Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JMS_TRANSACTION (TRANSACTIONID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) " +
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -47,9 +47,9 @@
       
    // Paging functionality - TODO we should split this out into its own interface
    
-   void addReferences(long channelID, List references, boolean paged) throws Exception;
+   void pageReferences(long channelID, List references, boolean paged) throws Exception;
    
-   void removeReferences(long channelID, List refs) throws Exception;
+   void removeDepagedReferences(long channelID, List refs) throws Exception;
     
    void updatePageOrder(long channelID, List references) throws Exception;
    
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -608,6 +608,12 @@
       MessageReference ref = null;
       try
       {
+         if (message.isReliable())
+         {
+            // It will already have been persisted on the sender's side
+            message.setPersisted(true);
+         }
+         
          ref = ms.reference(message);
               
          // We route on the condition
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -39,7 +39,6 @@
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.messaging.util.Future;
 import org.jboss.messaging.util.StreamUtils;
-import org.jgroups.Address;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -325,6 +324,12 @@
       {
          org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
          
+         if (msg.isReliable())
+         {
+            //It will alerady have been persisted on the other node
+            msg.setPersisted(true);
+         }
+         
          MessageReference ref = null;
          
          try
@@ -346,10 +351,11 @@
                ref.releaseMemoryReference();
             }
          }
+                 
+         //Acknowledge on the remote queue stub
+         Delivery del = new SimpleDelivery(theQueue, ref);
          
-         Delivery del = new SimpleDelivery(this, ref);
-         
-         acknowledgeInternal(del, tx, true, true);
+         del.acknowledge(tx);        
       }
       
       tx.commit();
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -116,6 +116,7 @@
             //If the message is persistent and we are recoverable then we persist here, *before*
             //the message is sent across the network
 
+            //This will increment any channelcount on the message in storage
             pm.addReference(id, reference, tx);
          }
          catch (Exception e)
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/build.xml	2006-09-28 11:44:25 UTC (rev 1372)
@@ -609,7 +609,7 @@
       <antcall target="crash-test">
          <param name="crash.test.name" value="org.jboss.test.messaging.jms.crash.ClientCrashTwoConnectionsTest"/>
       </antcall>
-   	
+
       <antcall target="start-rmi-server"/>
 
       <antcall target="crash-test">
Modified: trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/core/message/JBossMessageTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -56,8 +56,8 @@
 
    protected void setUp() throws Exception
    {
-      rs = (MessageSupport)MessageFactory.createJBossMessage(0, false, 0, 0, (byte)4, null,
-                                                             null, 0, JBossMessage.TYPE,
+      rs = (MessageSupport)MessageFactory.createMessage(0, false, 0, 0, (byte)4, null,
+                                                             null, JBossMessage.TYPE,
                                                              null, null, null, null, null, null);
       super.setUp();
       log.debug("setup done");
Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -291,8 +291,10 @@
       
       //Now consume them all
       
+      log.info("Consuming them all from 1");
       this.consume(queue1, 0, refs1, 150);
        
+      log.info("Consuming them all from 2");
       this.consume(queue2, 0, refs2, 150);
       
       //    Queue1
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -335,7 +335,7 @@
       refs.add(ref9);
       refs.add(ref10);
       
-      pm.addReferences(channel1.getChannelID(), refs, false);
+      pm.pageReferences(channel1.getChannelID(), refs, false);
       
       refs = new ArrayList();
       refs.add(ref11);
@@ -344,7 +344,7 @@
       refs.add(ref14);
       refs.add(ref15);
     
-      pm.addReferences(channel2.getChannelID(), refs, false);
+      pm.pageReferences(channel2.getChannelID(), refs, false);
                   
       List refIds = getReferenceIds(channel1.getChannelID());
       assertNotNull(refIds);
@@ -405,7 +405,7 @@
       refs.add(ref13);
       refs.add(ref14);
       refs.add(ref15);
-      pm.removeReferences(channel2.getChannelID(), refs);
+      pm.removeDepagedReferences(channel2.getChannelID(), refs);
       
       refIds = getReferenceIds(channel2.getChannelID());
       assertNotNull(refIds);
@@ -433,7 +433,7 @@
       refs.add(ref1);
       refs.add(ref2);
       refs.add(ref3);
-      pm.removeReferences(channel1.getChannelID(), refs);
+      pm.removeDepagedReferences(channel1.getChannelID(), refs);
       
       refIds = getReferenceIds(channel1.getChannelID());
       assertNotNull(refIds);
@@ -463,7 +463,7 @@
       
       refs = new ArrayList();
       refs.add(ref11);
-      pm.removeReferences(channel2.getChannelID(), refs);
+      pm.removeDepagedReferences(channel2.getChannelID(), refs);
       
       refs = new ArrayList();
       refs.add(ref4);
@@ -473,7 +473,7 @@
       refs.add(ref8);
       refs.add(ref9);
       refs.add(ref10);
-      pm.removeReferences(channel1.getChannelID(), refs);
+      pm.removeDepagedReferences(channel1.getChannelID(), refs);
       
       ms = getMessageIds();
       assertNotNull(ms);
@@ -512,7 +512,7 @@
       refs.add(ref9);
       refs.add(ref10);
       
-      pm.addReferences(channel.getChannelID(), refs, false); 
+      pm.pageReferences(channel.getChannelID(), refs, false); 
       
       ref1.setPagingOrder(0);
       ref2.setPagingOrder(1);
@@ -680,7 +680,7 @@
       refs.add(ref9);
       refs.add(ref10);
       
-      pm.addReferences(channel.getChannelID(), refs, false); 
+      pm.pageReferences(channel.getChannelID(), refs, false); 
       
       //First load exactly 10
       PersistenceManager.InitialLoadInfo info = pm.getInitialReferenceInfos(channel.getChannelID(), 10);
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/BytesMessagePersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -71,7 +71,6 @@
          i,
          coreHeaders,
          null,
-         0,
          i % 2 == 0 ? new GUID().toString() : null,
          genCorrelationID(i),
          i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/MapMessagePersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -72,7 +72,6 @@
                i,
                coreHeaders,
                null,
-               0,
                i % 2 == 0 ? new GUID().toString() : null,
                genCorrelationID(i),
                i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/MessagePersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -130,7 +130,6 @@
             i,
             coreHeaders,
             null,
-            0,
             i % 2 == 0 ? new GUID().toString() : null,
             genCorrelationID(i),
             i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/ObjectMessagePersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -71,7 +71,6 @@
                i,
                coreHeaders,
                null,
-               0,
                i % 2 == 0 ? new GUID().toString() : null,
                genCorrelationID(i),
                i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/StreamMessagePersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -72,7 +72,6 @@
                i,
                coreHeaders,
                null,
-               0,
                i % 2 == 0 ? new GUID().toString() : null,
                genCorrelationID(i),
                i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/jms/persistence/TextMessagePersistenceManagerTest.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -71,7 +71,6 @@
                i,
                coreHeaders,
                null,
-               0,
                i % 2 == 0 ? new GUID().toString() : null,
                genCorrelationID(i),
                i % 3 == 2 ? randByteArray(50) : null,
Modified: trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -233,6 +233,9 @@
          String databaseType = sc.getDatabaseType();
          String persistenceConfigFile =
             "server/default/deploy/" + databaseType + "-persistence-service.xml";
+         
+         log.info("********* LOADING CONFIG FILE: " + persistenceConfigFile);
+         
          URL persistenceConfigFileURL = getClass().getClassLoader().getResource(persistenceConfigFile);
          if (persistenceConfigFileURL == null)
          {
Modified: trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java	2006-09-28 02:09:05 UTC (rev 1371)
+++ trunk/tests/src/org/jboss/test/messaging/util/CoreMessageFactory.java	2006-09-28 11:44:25 UTC (rev 1372)
@@ -41,14 +41,14 @@
 
    public static CoreMessage createCoreMessage(long messageID)
    {
-      return createCoreMessage(messageID, false, 0, 0, (byte)4, null, null, 0);
+      return createCoreMessage(messageID, false, 0, 0, (byte)4, null, null);
    }
 
    public static CoreMessage createCoreMessage(long messageID,
                                                boolean reliable,
                                                Serializable payload)
    {
-      return createCoreMessage(messageID, reliable, 0, 0, (byte)4, null, payload, 0);
+      return createCoreMessage(messageID, reliable, 0, 0, (byte)4, null, payload);
    }
 
    public static CoreMessage createCoreMessage(long messageID,
@@ -57,27 +57,14 @@
                                                long timestamp,
                                                byte priority,
                                                Map coreHeaders,
-                                               Serializable payload,
-                                               int persistentChannelCount)
+                                               Serializable payload)
    {
       CoreMessage cm =
-         new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, null, persistentChannelCount);
+         new CoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, null);
       cm.setPayload(payload);
       return cm;
    }
-   
-   public static CoreMessage createCoreMessage(long messageID,
-         boolean reliable,
-         long expiration,
-         long timestamp,
-         byte priority,
-         Map coreHeaders,
-         Serializable payload)
-   {
-      return createCoreMessage(messageID, reliable, expiration, timestamp, priority, coreHeaders, payload, 0);
-   }
-   
-   
+         
    // Attributes ----------------------------------------------------
    
    // Constructors --------------------------------------------------
    
    
More information about the jboss-cvs-commits
mailing list