[jboss-cvs] JBoss Messaging SVN: r7568 - in trunk: tests/src/org/jboss/messaging/tests/integration/jms/consumer and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 14 05:53:22 EDT 2009


Author: ataylor
Date: 2009-07-14 05:53:22 -0400 (Tue, 14 Jul 2009)
New Revision: 7568

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1679 - only expire messages from client when not in preack mode

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-07-14 07:32:02 UTC (rev 7567)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-07-14 09:53:22 UTC (rev 7568)
@@ -103,6 +103,8 @@
 
    private boolean stopped = false;
 
+   private final boolean preAcknowledge;
+
    // Constructors
    // ---------------------------------------------------------------------------------
 
@@ -113,7 +115,8 @@
                              final TokenBucketLimiter rateLimiter,
                              final Executor executor,
                              final Channel channel,
-                             final File directory)
+                             final File directory,
+                             final boolean preAcknowledge)
    {
       this.id = id;
 
@@ -130,6 +133,8 @@
       this.ackBatchSize = ackBatchSize;
 
       this.directory = directory;
+
+      this.preAcknowledge = preAcknowledge;
    }
 
    // ClientConsumer implementation
@@ -213,8 +218,8 @@
 
             if (m != null)
             {
-               boolean expired = m.isExpired();
-
+               //if we have already pre acked we cant expire
+               boolean expired = !preAcknowledge && m.isExpired();
                flowControlBeforeConsumption(m);
 
                if (expired)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-14 07:32:02 UTC (rev 7567)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2009-07-14 09:53:22 UTC (rev 7568)
@@ -1174,7 +1174,8 @@
                                                                                   : null,
                                                                executor,
                                                                channel,
-                                                               directory);
+                                                               directory,
+                                                               preAcknowledge);
 
       addConsumer(consumer);
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java	2009-07-14 07:32:02 UTC (rev 7567)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java	2009-07-14 09:53:22 UTC (rev 7568)
@@ -40,6 +40,7 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -122,4 +123,32 @@
       assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
       session.close();
    }
+
+   public void testPreCommitAcksWithMessageExpiry() throws Exception
+   {
+      Connection conn = cf.createConnection();
+      Session session = conn.createSession(false, JBossSession.PRE_ACKNOWLEDGE);
+      jBossQueue = new JBossQueue(Q_NAME);
+      MessageProducer producer = session.createProducer(jBossQueue);
+      MessageConsumer consumer = session.createConsumer(jBossQueue);
+      int noOfMessages = 1000;
+      for (int i = 0; i < noOfMessages; i++)
+      {
+         TextMessage textMessage = session.createTextMessage("m" + i);
+         producer.setTimeToLive(1);
+         producer.send(textMessage);
+      }
+                                                                    
+      conn.start();
+      for (int i = 0; i < noOfMessages; i++)
+      {
+         Message m = consumer.receive(500);
+         assertNotNull(m);
+      }
+      // assert that all the messages are there and none have been acked
+      SimpleString queueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+      assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+      session.close();
+   }
 }




More information about the jboss-cvs-commits mailing list