[jboss-cvs] JBoss Messaging SVN: r1564 - in branches/Branch_1_0_CP1: . src/etc src/etc/server/default/deploy src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/message src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/message src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/message

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 16 04:12:24 EST 2006


Author: timfox
Date: 2006-11-16 04:11:57 -0500 (Thu, 16 Nov 2006)
New Revision: 1564

Modified:
   branches/Branch_1_0_CP1/.project
   branches/Branch_1_0_CP1/src/etc/aop-messaging-client.xml
   branches/Branch_1_0_CP1/src/etc/server/default/deploy/mssql-persistence-service.xml
   branches/Branch_1_0_CP1/src/etc/server/default/deploy/mysql-persistence-service.xml
   branches/Branch_1_0_CP1/src/etc/server/default/deploy/oracle-persistence-service.xml
   branches/Branch_1_0_CP1/src/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/Branch_1_0_CP1/src/etc/server/default/deploy/sybase-persistence-service.xml
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossSession.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/AsfAspect.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/TransactionAspect.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/state/ConsumerState.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/JBossMessage.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/MessageProxy.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/AckInfo.java
   branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/ChannelSupport.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/Routable.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/message/RoutableSupport.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/SimpleMessageReference.java
   branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/base/MessageStoreTestBase.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/JMSTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/SessionTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
Log:
DLQ implementation and deliverycount refactoring



Modified: branches/Branch_1_0_CP1/.project
===================================================================
--- branches/Branch_1_0_CP1/.project	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/.project	2006-11-16 09:11:57 UTC (rev 1564)
@@ -1,6 +1,6 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>jboss-messaging</name>
+	<name>jboss-messaging-10-CP1</name>
 	<comment></comment>
 	<projects>
 	</projects>

Modified: branches/Branch_1_0_CP1/src/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/aop-messaging-client.xml	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/etc/aop-messaging-client.xml	2006-11-16 09:11:57 UTC (rev 1564)
@@ -111,7 +111,7 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->preDeliver(..))">
       <advice name="handlePreDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver())">
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">

Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/mssql-persistence-service.xml	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/mssql-persistence-service.xml	2006-11-16 09:11:57 UTC (rev 1564)
@@ -38,6 +38,7 @@
 UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?
 SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'
 DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
+UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
 CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS IMAGE, PAYLOAD IMAGE, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES IMAGE, DESTINATION_ID BIGINT, REPLYTO_ID BIGINT, JMSPROPERTIES IMAGE, PRIMARY KEY (MESSAGEID))
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/mysql-persistence-service.xml	2006-11-16 09:11:57 UTC (rev 1564)
@@ -38,6 +38,7 @@
 UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?  
 SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'          
 DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
+UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
 CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY TINYINT, COREHEADERS MEDIUMBLOB, PAYLOAD LONGBLOB, CHANNELCOUNT INTEGER, TYPE TINYINT, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES VARBINARY(254), DESTINATION_ID BIGINT, REPLYTO_ID BIGINT, JMSPROPERTIES MEDIUMBLOB, PRIMARY KEY (MESSAGEID))
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/oracle-persistence-service.xml	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/oracle-persistence-service.xml	2006-11-16 09:11:57 UTC (rev 1564)
@@ -38,6 +38,7 @@
 UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?   
 SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'             
 DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
+UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
 CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID INTEGER, RELIABLE CHAR(1), EXPIRATION INTEGER, TIMESTAMP INTEGER, PRIORITY INTEGER, COREHEADERS BLOB, PAYLOAD BLOB, CHANNELCOUNT INTEGER, TYPE INTEGER, JMSTYPE VARCHAR2(255), CORRELATIONID VARCHAR2(255), CORRELATIONID_BYTES RAW(254), DESTINATION_ID INTEGER, REPLYTO_ID INTEGER, JMSPROPERTIES BLOB, PRIMARY KEY (MESSAGEID))
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/postgresql-persistence-service.xml	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/postgresql-persistence-service.xml	2006-11-16 09:11:57 UTC (rev 1564)
@@ -38,6 +38,7 @@
 UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?
 SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'
 DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
+UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
 CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID BIGINT, RELIABLE CHAR(1), EXPIRATION BIGINT, TIMESTAMP BIGINT, PRIORITY int2, COREHEADERS BYTEA, PAYLOAD BYTEA, CHANNELCOUNT INTEGER, TYPE int2, JMSTYPE VARCHAR(255), CORRELATIONID VARCHAR(255), CORRELATIONID_BYTES BYTEA, DESTINATION_ID BIGINT, REPLYTO_ID BIGINT, JMSPROPERTIES BYTEA, PRIMARY KEY (MESSAGEID))
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

Modified: branches/Branch_1_0_CP1/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_1_0_CP1/src/etc/server/default/deploy/sybase-persistence-service.xml	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/etc/server/default/deploy/sybase-persistence-service.xml	2006-11-16 09:11:57 UTC (rev 1564)
@@ -47,6 +47,7 @@
 UPDATE_RELIABLE_REFS_NOT_LOADED=UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE CHANNELID=?
 SELECT_MIN_ORDERING=SELECT MIN(ORD) FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? AND STATE <> '+' AND LOADED = 'N'
 DELETE_UNRELIABLE_REFS=DELETE FROM JMS_MESSAGE_REFERENCE WHERE RELIABLE = 'N'
+UPDATE_DELIVERYCOUNT=UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?
 CREATE_MESSAGE=CREATE TABLE JMS_MESSAGE (MESSAGEID INTEGER, RELIABLE CHAR(1) NULL, EXPIRATION INTEGER NULL, TIMESTAMP NUMERIC(20,0) NULL, PRIORITY INTEGER NULL, COREHEADERS IMAGE NULL, PAYLOAD IMAGE NULL, CHANNELCOUNT INTEGER NULL, TYPE INTEGER NULL, JMSTYPE VARCHAR(255) NULL, CORRELATIONID VARCHAR(255) NULL, CORRELATIONID_BYTES VARBINARY(254) NULL, DESTINATION_ID INTEGER NULL, REPLYTO_ID INTEGER NULL, JMSPROPERTIES IMAGE NULL, PRIMARY KEY (MESSAGEID))
 LOAD_MESSAGES=SELECT MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES FROM JMS_MESSAGE
 INSERT_MESSAGE=INSERT INTO JMS_MESSAGE (MESSAGEID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, COREHEADERS, PAYLOAD, CHANNELCOUNT, TYPE, JMSTYPE, CORRELATIONID, CORRELATIONID_BYTES, DESTINATION_ID, REPLYTO_ID, JMSPROPERTIES) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -102,6 +102,8 @@
    
    protected Object closeLock = new Object();      
    
+   protected int maxDeliveries;
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
@@ -151,6 +153,8 @@
       ConsumerState state = (ConsumerState)((DelegateSupport)cons).getState();
 
       this.consumerID = state.getConsumerID();      
+      
+      this.maxDeliveries = state.getMaxDeliveries();
 
       id = threadId.increment();
       internalThread = new Thread(this, "Connection Consumer for dest " + destination + " id=" + id);
@@ -298,7 +302,7 @@
                for (int i = 0; i < mesList.size(); i++)
                {
                   MessageProxy m = (MessageProxy)mesList.get(i);
-                  session.addAsfMessage(m, consumerID, cons);
+                  session.addAsfMessage(m, consumerID, cons, maxDeliveries);
                   if (trace) { log.trace("added " + m + " to session"); }
                }
 

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossSession.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/JBossSession.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -502,9 +502,9 @@
     * This method is used by the JBossConnectionConsumer to load up the session
     * with messages to be processed by the session's run() method
     */
-   void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons)
+   void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons, int maxDeliveries)
    {
-      delegate.addAsfMessage(m, consumerID, cons);
+      delegate.addAsfMessage(m, consumerID, cons, maxDeliveries);
    }
       
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/AsfAspect.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/AsfAspect.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/AsfAspect.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -130,6 +130,7 @@
       MessageProxy m = (MessageProxy)mi.getArguments()[0];
       int theConsumerID = ((Integer)mi.getArguments()[1]).intValue();
       ConsumerDelegate cons = (ConsumerDelegate)mi.getArguments()[2];
+      int maxDeliveries = ((Integer)mi.getArguments()[3]).intValue();
       
       if (m == null)
       {
@@ -140,6 +141,7 @@
       holder.msg = m;
       holder.consumerID = theConsumerID;
       holder.consumerDelegate = cons;
+      holder.maxDeliveries = maxDeliveries;
       
       msgs.add(holder);
 
@@ -163,7 +165,7 @@
          if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
          
          MessageCallbackHandler.callOnMessage(del, sessionListener, holder.consumerID, false,
-                                              holder.msg, ackMode);                          
+                                              holder.msg, ackMode, holder.maxDeliveries);                          
       }
       
       return null;
@@ -187,5 +189,6 @@
       MessageProxy msg;
       int consumerID;
       ConsumerDelegate consumerDelegate;
+      int maxDeliveries;
    }
 }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -79,11 +79,12 @@
       int consumerID = consumerState.getConsumerID();
       int prefetchSize = consumerState.getPrefetchSize();
       QueuedExecutor sessionExecutor = sessionState.getExecutor();
+      int maxDeliveries = consumerState.getMaxDeliveries();
       
       MessageCallbackHandler messageHandler =
          new MessageCallbackHandler(isCC, sessionState.getAcknowledgeMode(),
                                     sessionDelegate, consumerDelegate, consumerID,
-                                    prefetchSize, sessionExecutor);
+                                    prefetchSize, sessionExecutor, maxDeliveries);
       
       sessionState.addCallbackHandler(messageHandler);
       

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/SessionAspect.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -21,14 +21,13 @@
   */
 package org.jboss.jms.client.container;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Iterator;
-import java.util.ArrayList;
 
 import javax.jms.IllegalStateException;
 import javax.jms.Session;
-import javax.jms.JMSException;
 
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
@@ -69,10 +68,46 @@
 
    public Object handleClosing(Invocation invocation) throws Throwable
    {
-      // Send to the server all acknowledgments accumulated in toAck. This is useful, for example,
-      // when a message listener close the session from its onMessage().
-      acknowledgeOnClosing(invocation);
+      MethodInvocation mi = (MethodInvocation)invocation;
+      SessionState state = getState(invocation);
+      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+      
+      int ackMode = state.getAcknowledgeMode();
 
+      // select eligible acknowledgments
+      List acks = new ArrayList();
+      List cancels = new ArrayList();
+      for(Iterator i = state.getToAck().iterator(); i.hasNext(); )
+      {
+         AckInfo ack = (AckInfo)i.next();
+         if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+             ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {
+            acks.add(ack);            
+         }
+         else
+         {
+            cancels.add(ack);
+         }
+         i.remove();
+      }
+      
+      //On closing we acknowlege any client or dups ok, since the session might have closed
+      //before the onMessage had finished executing
+      
+      //And any client ack, or transactional we cancel, we do this explicitly so we can pass the
+      //updated delivery count information from client to server.
+      //We could just do this on the server but we would lose delivery count info
+
+      if (!acks.isEmpty())
+      {
+         del.acknowledgeBatch(acks);
+      }
+      if (!cancels.isEmpty())
+      {
+         del.cancelDeliveries(cancels);
+      }
+
       return invocation.invokeNext();
    }
 
@@ -111,7 +146,7 @@
          Object[] args = mi.getArguments();
          MessageProxy mp = (MessageProxy)args[0];
          int consumerID = ((Integer)args[1]).intValue();
-         AckInfo info = new AckInfo(mp, consumerID, ackMode);
+         AckInfo info = new AckInfo(mp, consumerID);
          
          state.getToAck().add(info);
          
@@ -144,6 +179,13 @@
       
       int ackMode = state.getAcknowledgeMode();
       
+      boolean cancel = ((Boolean)mi.getArguments()[0]).booleanValue();
+      
+      if (cancel && ackMode != Session.AUTO_ACKNOWLEDGE && ackMode != Session.DUPS_OK_ACKNOWLEDGE)
+      {
+         throw new IllegalStateException("Ack mode must be auto ack or dups ok");
+      }
+      
       if (ackMode == Session.AUTO_ACKNOWLEDGE ||
           ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
           ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null)
@@ -156,10 +198,24 @@
          if (!state.isRecoverCalled())
          {
             if (trace) { log.trace("acknowledging NON-transactionally"); }
-
+                        
             List acks = state.getToAck();
+            
+            //Sanity check
+            if (acks.size() != 1)
+            {
+               throw new IllegalStateException("Should only be one entry in list. There are " + acks.size());
+            }
+            
             AckInfo ack = (AckInfo)acks.get(0);
-            sd.acknowledge(ack);
+            if (cancel)
+            {
+               sd.cancelDeliveries(acks);
+            }
+            else
+            {
+               sd.acknowledge(ack);
+            }
             state.getToAck().clear();
          }
          else
@@ -253,13 +309,8 @@
       for (int i = toRedeliver.size() - 1; i >= 0; i--)
       {
          AckInfo info = (AckInfo)toRedeliver.get(i);
-         MessageProxy proxy = info.getMessage();
-         proxy.setJMSRedelivered(true);
+         MessageProxy proxy = info.getMessage();        
          
-         //TODO delivery count although optional should be global so we need to send it back to the
-         //     server but this has performance hit so perhaps we just don't support it?
-         proxy.incDeliveryCount();
-         
          MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
               
          if (handler == null)
@@ -314,35 +365,6 @@
       return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
    }
 
-   /**
-    * The method sends to server all eligible acknowlegments (those that are NOT CLIIENT_ACKNOWLEDGE
-    * for example)
-    */
-   private void acknowledgeOnClosing(Invocation invocation) throws JMSException
-   {
-      MethodInvocation mi = (MethodInvocation)invocation;
-      SessionState state = getState(invocation);
-      SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
-      // select eligible acknowledgments
-      List acks = new ArrayList();
-      for(Iterator i = state.getToAck().iterator(); i.hasNext(); )
-      {
-         AckInfo ack = (AckInfo)i.next();
-         if (ack.getAckMode() == Session.AUTO_ACKNOWLEDGE ||
-             ack.getAckMode() == Session.DUPS_OK_ACKNOWLEDGE)
-         {
-            acks.add(ack);
-            i.remove();
-         }
-      }
-
-      if (!acks.isEmpty())
-      {
-         del.acknowledgeBatch(acks);
-      }
-   }
-    
    // Inner Classes -------------------------------------------------
    
 }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -140,9 +140,12 @@
       int prefetchSize =
          ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE)).intValue();
       
+      int maxDeliveries = 
+         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.MAX_DELIVERIES)).intValue();
+      
       ConsumerState consumerState =
          new ConsumerState(sessionState, consumerDelegate, dest, selector,
-                           noLocal, consumerID, connectionConsumer, prefetchSize);
+                           noLocal, consumerID, connectionConsumer, prefetchSize, maxDeliveries);
       
       delegate.setState(consumerState);
       return consumerDelegate;

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/container/TransactionAspect.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -190,7 +190,7 @@
          MethodInvocation mi = (MethodInvocation)invocation;
          MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
          int consumerID = ((Integer)mi.getArguments()[1]).intValue();
-         AckInfo info = new AckInfo(proxy, consumerID, Session.SESSION_TRANSACTED);
+         AckInfo info = new AckInfo(proxy, consumerID);
          ConnectionState connState = (ConnectionState)state.getParent();
 
          if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -51,15 +51,18 @@
    // Attributes ----------------------------------------------------
    
    protected int bufferSize;
+   
+   protected int maxDeliveries;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ClientConsumerDelegate(int objectID, int bufferSize)
+   public ClientConsumerDelegate(int objectID, int bufferSize, int maxDeliveries)
    {
       super(objectID);
       this.bufferSize = bufferSize;
+      this.maxDeliveries = maxDeliveries;
    }
    
    public ClientConsumerDelegate()
@@ -167,6 +170,8 @@
                                 new Integer(id), PayloadKey.TRANSIENT);
       getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE,
                                 new Integer(bufferSize), PayloadKey.TRANSIENT);
+      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.MAX_DELIVERIES,
+                                new Integer(maxDeliveries), PayloadKey.TRANSIENT);
    }
 
    public String toString()

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -302,7 +302,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void postDeliver() throws JMSException
+   public void postDeliver(boolean cancel) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
@@ -401,7 +401,7 @@
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */
-   public void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons)
+   public void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons, int maxDeliveries)
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -56,7 +56,7 @@
    
    // Static --------------------------------------------------------
    
-   private static boolean trace;
+   private static boolean trace;      
    
    static
    {
@@ -64,27 +64,29 @@
       trace = log.isTraceEnabled();
    }
      
-   // Hardcoded for now
-   private static final int MAX_REDELIVERIES = 10;
-      
    public static void callOnMessage(SessionDelegate sess,
                                     MessageListener listener,
                                     int consumerID,
                                     boolean isConnectionConsumer,
                                     MessageProxy m,
-                                    int ackMode)
+                                    int ackMode,
+                                    int maxDeliveries)
       throws JMSException
    {
       preDeliver(sess, consumerID, m, isConnectionConsumer);
                   
       int tries = 0;
       
+      boolean cancel = false;
+      
       while (true)
       {
          try
          {
             if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
-
+            
+            m.incDeliveryCount();
+            
             listener.onMessage(m);
 
             if (trace) { log.trace("listener's onMessage() finished"); }
@@ -101,24 +103,20 @@
    
             if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
             {
-               // We redeliver at certain number of times
-               if (tries < MAX_REDELIVERIES)
-               {
-                  m.setJMSRedelivered(true);
-                  
-                  // TODO delivery count although optional should be global so we need to send it
-                  // back to the server but this has performance hit so perhaps we just don't
-                  // support it?
-                  m.incDeliveryCount();
-                  
+               // We redeliver a certain number of times
+               if (tries < maxDeliveries)
+               {                            
                   tries++;
                }
                else
                {
                   log.error("Max redeliveries has occurred for message: " + m.getJMSMessageID());
                   
-                  //TODO - Send to DLQ
+                  //postdeliver will do a cancel rather than an ack which will cause the ref to end
+                  //up in the dlq
                   
+                  cancel = true;
+                  
                   break;
                }
             }
@@ -136,7 +134,7 @@
       if (!sess.isClosed())
       {
          // postDeliver only if the session is not closed
-         postDeliver(sess, isConnectionConsumer);
+         postDeliver(sess, isConnectionConsumer, cancel);
       }
    }
    
@@ -154,14 +152,14 @@
       }         
    }
    
-   protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer)
+   protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer, boolean cancel)
       throws JMSException
    {
       // If this is the callback-handler for a connection consumer we don't want to acknowledge or
       // add anything to the tx for this session
       if (!isConnectionConsumer)
       {
-         sess.postDeliver();
+         sess.postDeliver(cancel);
       }         
    }
    
@@ -194,12 +192,15 @@
    private QueuedExecutor sessionExecutor;
    
    private boolean listenerRunning;
+   
+   private int maxDeliveries;
         
    // Constructors --------------------------------------------------
 
    public MessageCallbackHandler(boolean isCC, int ackMode,                                
                                  SessionDelegate sess, ConsumerDelegate cons, int consumerID,
-                                 int bufferSize, QueuedExecutor sessionExecutor)
+                                 int bufferSize, QueuedExecutor sessionExecutor,
+                                 int maxDeliveries)
    {
       if (bufferSize < 1)
       {
@@ -225,6 +226,8 @@
       mainLock = new Object();                  
       
       this.sessionExecutor = sessionExecutor;
+      
+      this.maxDeliveries = maxDeliveries;
    }
         
    // Public --------------------------------------------------------
@@ -453,7 +456,7 @@
                // message is acknowledged so it gets removed from the queue/subscription.
                preDeliver(sessionDelegate, consumerID, m, isConnectionConsumer);
                
-               postDeliver(sessionDelegate, isConnectionConsumer);
+               postDeliver(sessionDelegate, isConnectionConsumer, false);
                
                if (!m.getMessage().isExpired())
                {
@@ -488,6 +491,8 @@
          consumerDelegate.more();
       }
       
+      m.incDeliveryCount();
+      
       return m;
    }    
    
@@ -725,7 +730,7 @@
          {
             try
             {
-               callOnMessage(sessionDelegate, listener, consumerID, false, mp, ackMode);
+               callOnMessage(sessionDelegate, listener, consumerID, false, mp, ackMode, maxDeliveries);
             }
             catch (JMSException e)
             {

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -56,10 +56,12 @@
    private MessageCallbackHandler messageCallbackHandler;
    
    private int prefetchSize;
+   
+   private int maxDeliveries;
     
    public ConsumerState(SessionState parent, ConsumerDelegate delegate, Destination dest,
                         String selector, boolean noLocal, int consumerID, boolean isCC,
-                        int prefetchSize)
+                        int prefetchSize, int maxDeliveries)
    {
       super(parent, delegate);
       children = Collections.EMPTY_SET;
@@ -69,6 +71,7 @@
       this.consumerID = consumerID;
       this.isConnectionConsumer = isCC;
       this.prefetchSize = prefetchSize;
+      this.maxDeliveries = maxDeliveries;
    }
     
    public Destination getDestination()
@@ -115,6 +118,11 @@
    {
       return prefetchSize;
    }
+   
+   public int getMaxDeliveries()
+   {
+      return maxDeliveries;
+   }
     
 }
 

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/delegate/SessionDelegate.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -67,7 +67,7 @@
    
    void preDeliver(MessageProxy proxy, int consumerID) throws JMSException;
    
-   void postDeliver() throws JMSException;
+   void postDeliver(boolean cancel) throws JMSException;
    
    MessageListener getMessageListener() throws JMSException;
    
@@ -77,7 +77,7 @@
    
    XAResource getXAResource();
    
-   void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons);
+   void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons, int maxDeliveries);
    
    boolean getTransacted();
    

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/JBossMessage.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/JBossMessage.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/JBossMessage.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -183,7 +183,7 @@
             sb.append(name).append(" - ").append(m.headers.get(name)).append('\n');
          }
       }
-      sb.append("              redelivered:   ").append(m.redelivered).append('\n');
+      sb.append("              redelivered:   ").append(m.deliveryCount >= 1).append('\n');
       sb.append("              priority:      ").append(m.priority).append('\n');
       sb.append("              deliveryCount: ").append(m.deliveryCount).append('\n');
 
@@ -336,7 +336,7 @@
          setJMSDestination(foreign.getJMSDestination());
       }
       setJMSDeliveryMode(foreign.getJMSDeliveryMode());
-      setJMSRedelivered(foreign.getJMSRedelivered());
+      setDeliveryCount(foreign.getJMSRedelivered() ? 1 : 0);
       setJMSExpiration(foreign.getJMSExpiration());
       setJMSPriority(foreign.getJMSPriority());
       setJMSType(foreign.getJMSType());
@@ -480,12 +480,19 @@
 
    public boolean getJMSRedelivered() throws JMSException
    {
-      return isRedelivered();
+      return deliveryCount >= 2;
    }
 
    public void setJMSRedelivered(boolean redelivered) throws JMSException
    {
-      setRedelivered(redelivered);
+      if (deliveryCount == 1)
+      {
+         deliveryCount++;
+      }
+      else
+      {
+         //do nothing
+      }
    }
 
    /**

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/MessageProxy.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/message/MessageProxy.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -80,7 +80,7 @@
    protected transient boolean bodyReadOnly;
 
    protected int deliveryCount;
-   protected transient boolean jmsRedelivered;
+   //protected transient boolean jmsRedelivered;
 
    // Constructors --------------------------------------------------
 
@@ -177,13 +177,24 @@
    public boolean getJMSRedelivered() throws JMSException
    {
       //Always handled in the delegate
-      return jmsRedelivered;
+      //This is because when sending a message to a topic (for instance)
+      //with multiple subscriptions all in the same VM, then we don't copy the original
+      //message for performance reasons, unless necessary, but each reference might have
+      //it's own value for delivery count
+      return deliveryCount >= 2;
    }
 
    public void setJMSRedelivered(boolean redelivered) throws JMSException
    {
       //Always handled in the delegate
-      jmsRedelivered = redelivered;
+      if (deliveryCount == 1)
+      {
+         deliveryCount++;
+      }
+      else
+      {
+         //do nothing
+      }
    }
 
    public String getJMSType() throws JMSException
@@ -407,8 +418,6 @@
       propertiesReadOnly = true;
 
       bodyReadOnly = true;
-
-      this.jmsRedelivered = deliveryCount > 1;
    }
 
    public JBossMessage getMessage()
@@ -423,7 +432,7 @@
    
    public void incDeliveryCount()
    {
-      this.deliveryCount++;
+      this.deliveryCount++;            
    }
 
    public String toString()

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/ServerPeer.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -169,8 +169,7 @@
    public synchronized void startService() throws Exception
    {
       try
-      {
-         
+      {         
          log.info("starting serverpeer");
          
          if (started)

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -54,6 +54,7 @@
 import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.SingleReceiverDelivery;
 import org.jboss.messaging.core.local.CoreDestination;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionException;
 import org.jboss.messaging.core.tx.TransactionRepository;
@@ -129,7 +130,7 @@
    private CoreDestination dlq;
    
    private TransactionRepository tr;
-
+   
    // Constructors --------------------------------------------------
 
    protected ServerConsumerEndpoint(int id, Channel channel,
@@ -260,16 +261,7 @@
          {
             return delivery;
          }
-            
-         try
-         {
-            checkDeliveryCount(delivery);
-         }
-         catch (Throwable t)
-         {
-            log.error("Failed to check delivery count", t);
-         }
-         
+                 
          if (delivery.isDone())
          {
             return delivery;
@@ -587,19 +579,56 @@
       channel.deliver(false);
    }
    
-   protected void cancelDelivery(Long messageID) throws Throwable
+   protected void sendToDLQ(Long messageID, Transaction tx) throws Throwable
    {
       SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+      
       if (del != null)
-      {  
-          del.getReference().decrementDeliveryCount();    
-          del.cancel();
+      { 
+         log.warn(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
+         
+         if (dlq != null)
+         {         
+            //reset delivery count to zero
+            del.getReference().setDeliveryCount(0);
+            
+            dlq.handle(null, del.getReference(), tx);
+            
+            del.acknowledge(tx);           
+         }
+         else
+         {
+            log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
+            
+            del.acknowledge(tx);
+         }
       }
       else
       {
-          throw new IllegalStateException("Cannot find delivery to cancel:" + id);
+         throw new IllegalStateException("Cannot find delivery to send to DLQ:" + id);
       }
+      
    }
+   
+   protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
+   {
+      SingleReceiverDelivery del = (SingleReceiverDelivery)deliveries.remove(messageID);
+      
+      if (del != null)
+      {                               
+         //Cancel back to the queue
+         
+         //Update the delivery count
+           
+         del.getReference().setDeliveryCount(deliveryCount);
+              
+         del.cancel();         
+      }
+      else
+      {
+         throw new IllegalStateException("Cannot find delivery to cancel:" + id);
+      }
+   }
                
    protected void start()
    {             
@@ -679,7 +708,7 @@
                
                long id = proxy.getMessage().getMessageID();
                
-               cancelDelivery(new Long(id));
+               cancelDelivery(new Long(id), proxy.getMessage().getDeliveryCount());
             }
          }
                  
@@ -705,41 +734,7 @@
          if (trace) { log.trace(this + " removed from the channel"); }
       }
    }
-     
-   private void checkDeliveryCount(SimpleDelivery del) throws Throwable
-   {
-      if (del.getReference().getDeliveryCount() > maxDeliveryAttempts)
-      {
-         log.info(del.getReference() + " has exceed maximum delivery attempts and will be sent to the DLQ");
-         
-         if (dlq != null)
-         {                
-            Transaction tx = tr.createTransaction();
-            
-            try
-            {         
-               dlq.handle(null, del.getReference(), tx);
-               
-               del.acknowledge(tx);
-                        
-               tx.commit(); 
-            }
-            catch (Throwable t)
-            {
-               tx.rollback();
-               
-               throw t;
-            }    
-         }
-         else
-         {
-            log.info("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-            
-            del.acknowledge(null);
-         }
-      }                 
-   }
-   
+        
    // Inner classes -------------------------------------------------   
    
    /*

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -21,6 +21,7 @@
   */
 package org.jboss.jms.server.endpoint;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -57,6 +58,7 @@
 import org.jboss.messaging.core.memory.MemoryManager;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 
 /**
@@ -267,7 +269,7 @@
          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
             
          
-         ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize);
+         ClientConsumerDelegate stub = new ClientConsumerDelegate(consumerID, prefetchSize, maxDeliveryAttempts);
          
          if (subscription != null)
          {
@@ -467,7 +469,7 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
       }
    }      
-         
+        
    public void cancelDeliveries(List ackInfos) throws JMSException
    {
       try
@@ -476,6 +478,8 @@
           
          Set consumers = new HashSet();
          
+         List forDLQ = null;
+         
          for (int i = ackInfos.size() - 1; i >= 0; i--)
          {
             AckInfo ack = (AckInfo)ackInfos.get(i);
@@ -489,10 +493,58 @@
                throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
             }
             
-            consumer.cancelDelivery(new Long(ack.getMessageID()));
+            if (ack.getDeliveryCount() >= maxDeliveryAttempts)
+            {
+               if (forDLQ == null)
+               {
+                  forDLQ = new ArrayList();
+               }
+               
+               forDLQ.add(ack);
+            }
+            else
+            {            
+               consumer.cancelDelivery(new Long(ack.getMessageID()), ack.getDeliveryCount());
+            }
+            
             consumers.add(consumer);
          }
          
+         //Send stuff to DLQ
+         
+         if (forDLQ != null)
+         {
+            //We do this in a tx so we don't end up with the message in both the original queue
+            //and the dlq if it fails half way through
+            Transaction tx = tr.createTransaction();
+            
+            try
+            {               
+               for (int i = forDLQ.size() - 1; i >= 0; i--)
+               {
+                  AckInfo ack = (AckInfo)forDLQ.get(i);
+                  
+                  ServerConsumerEndpoint consumer =
+                     this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+         
+                  if (consumer == null)
+                  {
+                     throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
+                  }
+                  
+                  consumer.sendToDLQ(new Long(ack.getMessageID()), tx);                              
+               }
+               
+               tx.commit();
+            }
+            catch (Throwable t)
+            {
+               tx.rollback();
+               
+               throw t;
+            }
+         }
+         
          // need to prompt delivery for all consumers
          
          for(Iterator i = consumers.iterator(); i.hasNext(); )

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -46,4 +46,6 @@
    public static final String VERSION_NUMBER = "VERSION_NUMBER";
    
    public static final String JMS_CLIENT_VM_ID = "JMS_CLIENT_VM_ID";
+   
+   public static final String MAX_DELIVERIES = "MAX_DELS";
 }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/AckInfo.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/AckInfo.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -47,10 +47,8 @@
    
    protected long messageID;
    protected int consumerID;
+   protected int deliveryCount;
 
-   // One of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
-   private int ackMode;
-   
    // The actual proxy must not get serialized
    protected transient MessageProxy msg;
    
@@ -64,28 +62,17 @@
 
    public AckInfo(MessageProxy proxy, int consumerID)
    {
-      this(proxy, consumerID, -1);
-   }
-
-   /**
-    * @param ackMode - one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
-    */
-   public AckInfo(MessageProxy proxy, int consumerID, int ackMode)
-   {
       this.msg = proxy;
       this.messageID = proxy.getMessage().getMessageID();
       this.consumerID = consumerID;
-      this.ackMode = ackMode;
    }
    
-   /**
-    * @param ackMode - one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE, etc.
-    */
-   public AckInfo(long messageID, int consumerID, int ackMode)
+   //Only used for testing
+   public AckInfo(long messageID, int consumerID, int deliveryCount)
    {
       this.messageID = messageID;
       this.consumerID = consumerID;
-      this.ackMode = ackMode;
+      this.deliveryCount = deliveryCount;
    }
 
    // Public --------------------------------------------------------
@@ -104,15 +91,17 @@
    {
       return msg;
    }
-
-   /**
-    *
-    * @return one of Session.AUTO_ACKNOWLEDGE, Session.CLIENT_ACKNOWLEDGE ..., or -1 if it has not
-    *         previously set
-    */
-   public int getAckMode()
+   
+   public int getDeliveryCount()
    {
-      return ackMode;
+      if (msg == null)
+      {
+         return deliveryCount;
+      }
+      else
+      {
+         return msg.getDeliveryCount();
+      }
    }
 
    public String toString()
@@ -126,12 +115,21 @@
    {
      out.writeLong(messageID);
      out.writeInt(consumerID);
+     if (msg != null)
+     {
+        out.writeInt(msg.getDeliveryCount());
+     }
+     else
+     {
+        out.writeInt(deliveryCount);
+     }
    }
 
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
    {
       messageID = in.readLong();
       consumerID = in.readInt();
+      deliveryCount = in.readInt();
    }
    
    // Class YYY overrides -------------------------------------------

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/ResourceManager.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/jms/tx/ResourceManager.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -317,6 +317,11 @@
          sess.redeliver(acks);
       }  
    }
+   
+   public int size()
+   {
+      return transactions.size();
+   }
 
    // Protected ------------------------------------------------------
    

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -295,6 +295,12 @@
       // log.warn("Thread interrupted", e);
       // }
 
+//      Exception e = new Exception();
+//      
+//      log.error("cancelling delivery: " + d, e);
+//      
+      
+      
       // TODO We should also consider executing cancels on the event queue
       cancelInternal(d);   
    }
@@ -692,9 +698,7 @@
                {
                   // Reference is not expired
 
-                  // Attempt to push the ref to a receiver, so increment the delivery count
-                  ref.incrementDeliveryCount();
-
+                  // Attempt to push the ref to a receiver
                   Delivery del = push(ref);
 
                   if (del == null)
@@ -705,19 +709,16 @@
 
                      if (trace) { log.trace(this + ": no delivery returned for message" + ref + " so no receiver got the message. Delivery is now complete"); }
 
-                     ref.decrementDeliveryCount();
                      receiversReady = false;
+                     
                      return;
                   }
                   else if (!del.isSelectorAccepted())
                   {
                      // No receiver accepted the message because no selectors matched, so we create
                      // an iterator (if we haven't already created it) to iterate through the refs
-                     // in the channel. No delivery was really performed, so we decrement the
-                     // delivery count
-
-                     ref.decrementDeliveryCount();
-
+                     // in the channel. 
+                     
                      // TODO Note that this is only a partial solution since if there are messages
                      // paged to storage it won't try those - i.e. it will only iterate through
                      // those refs in memory. Dealing with refs in storage is somewhat tricky since
@@ -757,8 +758,6 @@
                         // http://jira.jboss.com/jira/browse/JBMESSAGING-355
                         // This will make life a lot easier
 
-                        del.getReference().incrementDeliveryCount();                    
-
                         if (!del.isCancelled())
                         {
                            if (iter == null)
@@ -969,6 +968,12 @@
                refsInStorage++;
             }
          }
+         
+         //We may need to update the delivery count in the database
+         if (ref.isReliable())
+         {
+            pm.updateDeliveryCount(this.channelID, ref);
+         }
 
          if (trace) { log.trace(this + " added " + ref + " back into state"); }
       }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/Routable.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/Routable.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/Routable.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -81,13 +81,6 @@
    void setPriority(byte priority);
 
    /**
-    * @return true if the delivery of this message had to be repeated at least once.
-    */
-   boolean isRedelivered();
-
-   void setRedelivered(boolean redelivered);
-   
-   /**
     * @return the number of times delivery has been attempted for this routable
     */
    int getDeliveryCount();

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/message/RoutableSupport.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/message/RoutableSupport.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/message/RoutableSupport.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -349,7 +349,6 @@
    protected long expiration;
    protected long timestamp;
    protected Map headers;
-   protected boolean redelivered;
    protected byte priority;
    protected int deliveryCount;   
 
@@ -403,7 +402,6 @@
       this.timestamp = timestamp;
       this.priority = priority;
       this.deliveryCount = deliveryCount;
-      this.redelivered = deliveryCount >= 2;
       if (headers == null)
       {
          this.headers = new HashMap();
@@ -421,7 +419,6 @@
       this.expiration = other.expiration;
       this.timestamp = other.timestamp;
       this.headers = new HashMap(other.headers);
-      this.redelivered = other.redelivered;
       this.deliveryCount = other.deliveryCount;
       this.priority = other.priority;  
    }
@@ -447,17 +444,7 @@
    {
       return timestamp;
    }
-
-   public boolean isRedelivered()
-   {
-      return redelivered;
-   }
-
-   public void setRedelivered(boolean redelivered)
-   {
-      this.redelivered = redelivered;      
-   }
-   
+ 
    public void setReliable(boolean reliable)
    {
       this.reliable = reliable;
@@ -481,10 +468,6 @@
    public void setDeliveryCount(int deliveryCount)
    {
       this.deliveryCount = deliveryCount;
-      if (deliveryCount > 0)
-      {
-         this.redelivered = true;
-      }
    }
 
    public Serializable putHeader(String name, Serializable value)
@@ -536,7 +519,7 @@
       out.writeLong(expiration);
       out.writeLong(timestamp);
       writeMap(out, headers, true);
-      out.writeBoolean(redelivered);
+    //  out.writeBoolean(redelivered);
       out.writeByte(priority);
       out.writeInt(deliveryCount);
    }
@@ -556,7 +539,7 @@
       {
          headers = (HashMap)m;
       }
-      redelivered = in.readBoolean();
+     // redelivered = in.readBoolean();
       priority = in.readByte();
       deliveryCount = in.readInt();
    }

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/JDBCPersistenceManager.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -118,6 +118,8 @@
    protected String updateMessageRef = "UPDATE JMS_MESSAGE_REFERENCE SET TRANSACTIONID=?, STATE='-' "
       + "WHERE MESSAGEID=? AND CHANNELID=? AND STATE='C'";
    
+   protected String updateDeliveryCount = "UPDATE JMS_MESSAGE_REFERENCE SET DELIVERYCOUNT = ? WHERE CHANNELID = ? AND MESSAGEID = ?";
+   
    protected String updateMessageRefNotLoaded = "UPDATE JMS_MESSAGE_REFERENCE SET LOADED='N' WHERE MESSAGEID=? AND CHANNELID=?";
    
    protected String commitMessageRef1 = "UPDATE JMS_MESSAGE_REFERENCE SET STATE='C', TRANSACTIONID = NULL WHERE TRANSACTIONID=? AND STATE='+'";
@@ -1737,6 +1739,57 @@
       }
    }
    
+   public void updateDeliveryCount(long channelID, MessageReference ref) throws Exception
+   {
+      TransactionWrapper wrap = new TransactionWrapper();
+      
+      PreparedStatement psReference = null;
+      
+      Connection conn = ds.getConnection();
+       
+      try
+      {                                    
+         psReference = conn.prepareStatement(updateDeliveryCount);
+         
+         psReference.setInt(1, ref.getDeliveryCount());
+         
+         psReference.setLong(2, channelID);
+         
+         psReference.setLong(3, ref.getMessageID());
+         
+         int rows = psReference.executeUpdate();                         
+      }
+      catch (Exception e)
+      {
+         wrap.exceptionOccurred();
+         throw e;
+      }
+      finally
+      {
+         if (psReference != null)
+         {
+            try
+            {
+               psReference.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }         
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Throwable t)
+            {
+            }
+         }
+         wrap.end();                        
+      }  
+   }
+   
    public void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception
    {      
       if (tx != null)
@@ -2418,6 +2471,7 @@
       updateReliableRefsNotLoaded = sqlProperties.getProperty("UPDATE_RELIABLE_REFS_NOT_LOADED", updateReliableRefsNotLoaded);
       selectMinOrdering = sqlProperties.getProperty("SELECT_MIN_ORDERING", selectMinOrdering);
       deleteUnreliableRefs = sqlProperties.getProperty("DELETE_UNRELIABLE_REFS", deleteUnreliableRefs);
+      updateDeliveryCount = sqlProperties.getProperty("UPDATE_DELIVERYCOUNT", updateDeliveryCount);
         
       //Message
       createMessage = sqlProperties.getProperty("CREATE_MESSAGE", createMessage);

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/SimpleMessageReference.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/SimpleMessageReference.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/SimpleMessageReference.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -56,8 +56,6 @@
    
    private MessageHolder holder;
    
-   //private int deliveryCount;
-   
    private long ordering;
    
    private boolean released;
@@ -79,7 +77,7 @@
    {
       this(holder.getMessage().getMessageID(), holder.getMessage().isReliable(),
            holder.getMessage().getExpiration(), holder.getMessage().getTimestamp(),
-           holder.getMessage().getHeaders(), holder.getMessage().isRedelivered(),
+           holder.getMessage().getHeaders(), holder.getMessage().getDeliveryCount(),
            holder.getMessage().getPriority(), ms);
 
       this.holder = holder;
@@ -93,7 +91,7 @@
    public SimpleMessageReference(SimpleMessageReference other)
    {
       this(other.getMessageID(), other.isReliable(), other.getExpiration(),
-           other.getTimestamp(), other.getHeaders(), other.isRedelivered(),
+           other.getTimestamp(), other.getHeaders(), other.getDeliveryCount(),
            other.getPriority(), other.ms);
       
       this.headers = other.headers;
@@ -101,11 +99,11 @@
    }
    
    protected SimpleMessageReference(long messageID, boolean reliable, long expiration,
-                                    long timestamp, Map headers, boolean redelivered,
+                                    long timestamp, Map headers, int deliveryCount,
                                     byte priority, MessageStore ms)
    {
       super(messageID, reliable, expiration, timestamp, priority, 0, headers);
-      this.redelivered = redelivered;
+      this.deliveryCount = deliveryCount;
       this.ms = ms;
    }
 

Modified: branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/src/main/org/jboss/messaging/core/plugin/contract/PersistenceManager.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -47,8 +47,9 @@
    void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
 
    void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
-      
-
+   
+   void updateDeliveryCount(long channelID, MessageReference ref) throws Exception;
+   
    void addReferences(long channelID, List references, boolean loaded) throws Exception;
    
    void removeReferences(long channelID, List refs) throws Exception;

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -1102,7 +1102,7 @@
       assertEquals(m1.isExpired(), m2.isExpired());
       assertEquals(m1.getTimestamp(), m2.getTimestamp());
       assertEquals(m1.getPriority(), m2.getPriority());
-      assertEquals(m1.isRedelivered(), m2.isRedelivered());
+      assertEquals(m1.getDeliveryCount(), m2.getDeliveryCount());
       Map m1Headers = m1.getHeaders();
       Map m2Headers = m2.getHeaders();
       checkMapsEquivalent(m1Headers, m2Headers);

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/base/MessageStoreTestBase.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/base/MessageStoreTestBase.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/core/plugin/base/MessageStoreTestBase.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -70,7 +70,7 @@
       assertEquals(m.getExpiration(), ref.getExpiration());
       assertEquals(m.getTimestamp(), ref.getTimestamp());
       assertEquals(m.getPriority(), ref.getPriority());
-      assertFalse(ref.isRedelivered());
+      assertEquals(0, ref.getDeliveryCount());
 
       Map messageHeaders = m.getHeaders();
       Map refHeaders = ref.getHeaders();

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/DLQTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -25,6 +25,7 @@
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -157,8 +158,38 @@
       }            
    }
    
-   public void testSendToDLQWithMessageListener() throws Exception
+   public void testSendToDLQWithMessageListenerPersistent() throws Exception
    {
+      sendToDLQWithMessageListener(true);
+   }
+   
+   public void testSendToDLQWithMessageListenerNonPersistent() throws Exception
+   {
+      sendToDLQWithMessageListener(false);
+   }
+   
+   public void testSendToDLQWithReceivePersistent() throws Exception
+   {
+      sendToDLQWithReceive(true);
+   }
+   
+   public void testSendToDLQWithReceiveNonPersistent() throws Exception
+   {
+      sendToDLQWithReceive(false);
+   }
+   
+   public void testSendToDLQWithReceivePartialPersistent() throws Exception
+   {
+      sendToDLQWithReceivePartial(true);
+   }
+   
+   public void testSendToDLQWithReceivePartialNonPersistent() throws Exception
+   {
+      sendToDLQWithReceivePartial(false);
+   }
+   
+   public void sendToDLQWithMessageListener(boolean persistent) throws Exception
+   {
       Connection conn = null;
       
       ServerManagement.deployQueue("DLQ");
@@ -173,6 +204,8 @@
          
          MessageProducer prod = sess.createProducer(queue);
          
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
          for (int i = 0; i < 10; i++)
          {
             TextMessage tm = sess.createTextMessage("Message:" + i);
@@ -188,21 +221,102 @@
          
          Thread.sleep(4000);
          
-         QueueBrowser browser = sess.createBrowser(dlq);
+         cons.setMessageListener(null);         
          
-         Enumeration enumeration = browser.getEnumeration();
+         Message m = cons.receive(1000);
          
-         int i = 0;
-         while (enumeration.hasMoreElements())
+         assertNull(m);
+         
+         //Message should all be in the dlq - let's check
+         
+         MessageConsumer cons2 = sess.createConsumer(dlq);
+         
+         for (int i = 0; i < 10; i++)
          {
-            TextMessage tm = (TextMessage)enumeration.nextElement();
+            TextMessage tm = (TextMessage)cons2.receive(1000);
             
-            assertEquals("message:" + i++, tm.getText());
+            assertNotNull(tm);
+            
+            assertEquals("Message:" + i, tm.getText());
          }
          
-         log.info("YUP THAT WORKED");
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("DLQ");
          
+         if (conn != null) conn.close();
       }
+   }
+   
+   public void sendToDLQWithReceive(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+      
+      ServerManagement.deployQueue("DLQ");
+      
+      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+                 
+      try
+      {                  
+         conn = cf.createConnection();
+         
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess.createProducer(queue);
+         
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);         
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+            
+            prod.send(tm);
+         }
+         
+         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageConsumer cons = sess2.createConsumer(queue);
+         
+         conn.start();
+         
+         for (int i = 0; i < 10; i++)  // retries - default is 10
+         {
+            for (int j = 0; j < 10; j++)
+            {
+               TextMessage tm = (TextMessage)cons.receive(1000);
+               
+               assertNotNull(tm);
+               
+               assertEquals("Message:" + j, tm.getText());
+            }
+            
+            //rollback should cause redelivery
+            sess2.rollback();
+         }
+         
+         cons.close();
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue);
+         
+         Message m = cons2.receive(1000);
+         
+         assertNull(m);
+
+         //Message should all be in the dlq - let's check
+         
+         MessageConsumer cons3 = sess.createConsumer(dlq);
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("Message:" + i, tm.getText());
+         }
+         
+      }
       finally
       {
          ServerManagement.undeployQueue("DLQ");
@@ -211,7 +325,105 @@
       }
    }
    
+   public void sendToDLQWithReceivePartial(boolean persistent) throws Exception
+   {
+      Connection conn = null;
+      
+      ServerManagement.deployQueue("DLQ");
+      
+      Queue dlq = (Queue)ic.lookup("/queue/DLQ");
+                 
+      try
+      {                  
+         conn = cf.createConnection();
+         
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess.createProducer(queue);
+         
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);         
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = sess.createTextMessage("Message:" + i);
+            
+            prod.send(tm);
+         }
+         
+         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageConsumer cons = sess2.createConsumer(queue);
+         
+         conn.start();
+         
+         for (int i = 0; i < 5; i++)  // retries - default is 10
+         {
+            for (int j = 0; j < 10; j++)
+            {
+               TextMessage tm = (TextMessage)cons.receive(1000);
+               
+               assertNotNull(tm);
+               
+               assertEquals("Message:" + j, tm.getText());
+            }
+            
+            //rollback should cause redelivery
+            sess2.rollback();
+         }
+         
+         //They should now be cancelled back to the server
+         cons.close();
+         
+         cons = sess2.createConsumer(queue);
+         
+         for (int i = 0; i < 5; i++)  // retries - default is 10
+         {
+            for (int j = 0; j < 10; j++)
+            {
+               TextMessage tm = (TextMessage)cons.receive(1000);
+               
+               assertNotNull(tm);
+               
+               assertEquals("Message:" + j, tm.getText());
+            }
+            
+            //rollback should cause redelivery
+            sess2.rollback();
+         }
+         
+         cons.close();
+         
+         //Now they should be in DLQ
+                  
+         MessageConsumer cons2 = sess2.createConsumer(queue);
+         
+         Message m = cons2.receive(1000);
+         
+         assertNull(m);
+
+         //Message should all be in the dlq - let's check
+         
+         MessageConsumer cons3 = sess.createConsumer(dlq);
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("Message:" + i, tm.getText());
+         }
+         
+      }
+      finally
+      {
+         ServerManagement.undeployQueue("DLQ");
+         
+         if (conn != null) conn.close();
+      }
+   }
    
+   
    class FailingMessageListener implements MessageListener
    {
 

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/JMSTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/JMSTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -24,18 +24,19 @@
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.Message;
-import javax.jms.MessageListener;
+import javax.management.ObjectName;
 import javax.naming.InitialContext;
-import javax.management.ObjectName;
 
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
+
 import EDU.oswego.cs.dl.util.concurrent.Slot;
 
 /**
@@ -86,7 +87,7 @@
 
       super.tearDown();
    }
-
+   
    public void test_NonPersistent_NonTransactional() throws Exception
    {
       ConnectionFactory cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -48,6 +48,7 @@
 import javax.naming.InitialContext;
 
 import org.jboss.jms.destination.JBossTopic;
+import org.jboss.jms.message.MessageProxy;
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -3019,14 +3020,12 @@
       //don't acknowledge it
       sess1.close();
 
-      log.debug("sess1 closed");
-
       Session sess2 = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
       MessageConsumer cons2 = sess2.createConsumer(queue);
       TextMessage tm3 = (TextMessage)cons2.receive(3000);
       assertNotNull(tm3);
       assertEquals("testRedeliveredDifferentSessions", tm3.getText());
-
+      
       assertTrue(tm3.getJMSRedelivered());
    }
 

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/SessionTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/SessionTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/SessionTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -141,6 +141,8 @@
       assertNotNull(xid);
       assertNull(cState.getResourceManager().getTx(xid));
       
+      assertEquals(0, cState.getResourceManager().size());
+      
    }
 
    public void testCreateProducer() throws Exception

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -588,9 +588,10 @@
          tm = (TextMessage)cons.receive();
 
          assertEquals("a message", tm.getText());
-
+         
+         assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
+         
          assertTrue(tm.getJMSRedelivered());
-         assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
       }
       finally
       {

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -268,7 +268,8 @@
          
          long messageID = 123456;
          int consumerID = 65432;
-         AckInfo ack = new AckInfo(messageID, consumerID, -1);
+         int deliveryCount = 765;
+         AckInfo ack = new AckInfo(messageID, consumerID, deliveryCount);
          
          Object[] args = new Object[] { ack };
          
@@ -313,6 +314,7 @@
          
          assertEquals(ack.getMessageID(), ack2.getMessageID());
          assertEquals(ack.getConsumerID(), ack2.getConsumerID());
+         assertEquals(ack.getDeliveryCount(), ack2.getDeliveryCount());
          
          //Now eos
          try
@@ -358,9 +360,9 @@
          
          mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));   
          
-         AckInfo ackA = new AckInfo(1524, 71627, -1);
-         AckInfo ackB = new AckInfo(987987, 45354, -1);
-         AckInfo ackC = new AckInfo(32423, 4533, -1);
+         AckInfo ackA = new AckInfo(1524, 71627, 32);
+         AckInfo ackB = new AckInfo(987987, 45354, 21);
+         AckInfo ackC = new AckInfo(32423, 4533, 6);
          
          List acks = new ArrayList();
          acks.add(ackA);
@@ -413,6 +415,7 @@
          
          assertEquals(ackA.getMessageID(), ack.getMessageID());
          assertEquals(ackA.getConsumerID(), ack.getConsumerID());
+         assertEquals(ackA.getDeliveryCount(), ack.getDeliveryCount());
          
          ack = new AckInfo();
          
@@ -420,6 +423,7 @@
          
          assertEquals(ackB.getMessageID(), ack.getMessageID());
          assertEquals(ackB.getConsumerID(), ack.getConsumerID());
+         assertEquals(ackB.getDeliveryCount(), ack.getDeliveryCount());
          
          ack = new AckInfo();
          
@@ -427,6 +431,7 @@
          
          assertEquals(ackC.getMessageID(), ack.getMessageID());
          assertEquals(ackC.getConsumerID(), ack.getConsumerID());
+         assertEquals(ackC.getDeliveryCount(), ack.getDeliveryCount());
          
          
          //Now eos
@@ -736,7 +741,7 @@
          JBossMessage m = new JBossMessage(123);
          MessageTest.configureMessage(m);
          
-         AckInfo info = new AckInfo(123, 456, -1);
+         AckInfo info = new AckInfo(123, 456, 66);
          
          TxState state = new TxState();
          state.getMessages().add(m);
@@ -814,6 +819,7 @@
          
          assertEquals(info.getConsumerID(), info2.getConsumerID());
          assertEquals(info.getMessageID(), info2.getMessageID());
+         assertEquals(info.getDeliveryCount(), info2.getDeliveryCount());
          
          bis.reset();
          ois = new ObjectInputStream(bis);
@@ -854,8 +860,8 @@
          
          List ids = new ArrayList();
          
-         AckInfo ack1 = new AckInfo(1254, 78123, -1);
-         AckInfo ack2 = new AckInfo(786, 8979, -1);
+         AckInfo ack1 = new AckInfo(1254, 78123, 22);
+         AckInfo ack2 = new AckInfo(786, 8979, 461);
          ids.add(ack1);
          ids.add(ack2);
          
@@ -916,9 +922,14 @@
          
          assertEquals(ack1.getMessageID(), rack1.getMessageID());
          
+         assertEquals(ack1.getDeliveryCount(), rack1.getDeliveryCount());
+         
          assertEquals(ack2.getConsumerID(), rack2.getConsumerID());
          
          assertEquals(ack2.getMessageID(), rack2.getMessageID());
+         
+         assertEquals(ack2.getDeliveryCount(), rack2.getDeliveryCount());
+         
           
          //should be eos
                 

Modified: branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java
===================================================================
--- branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java	2006-11-15 04:27:31 UTC (rev 1563)
+++ branches/Branch_1_0_CP1/tests/src/org/jboss/test/messaging/jms/message/JMSXDeliveryCountTest.java	2006-11-16 09:11:57 UTC (rev 1564)
@@ -89,7 +89,7 @@
       super.tearDown();
    }
 
-   public void testSimpleJMSXDeliverCount() throws Exception
+   public void testSimpleJMSXDeliveryCount() throws Exception
    {
       Connection conn = cf.createConnection();
       Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);




More information about the jboss-cvs-commits mailing list