[jboss-cvs] JBoss Messaging SVN: r7568 - in trunk: tests/src/org/jboss/messaging/tests/integration/jms/consumer and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 14 05:53:22 EDT 2009
Author: ataylor
Date: 2009-07-14 05:53:22 -0400 (Tue, 14 Jul 2009)
New Revision: 7568
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1679 - only expire messages from client when not in preack mode
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-07-14 07:32:02 UTC (rev 7567)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-07-14 09:53:22 UTC (rev 7568)
@@ -103,6 +103,8 @@
private boolean stopped = false;
+ private final boolean preAcknowledge;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -113,7 +115,8 @@
final TokenBucketLimiter rateLimiter,
final Executor executor,
final Channel channel,
- final File directory)
+ final File directory,
+ final boolean preAcknowledge)
{
this.id = id;
@@ -130,6 +133,8 @@
this.ackBatchSize = ackBatchSize;
this.directory = directory;
+
+ this.preAcknowledge = preAcknowledge;
}
// ClientConsumer implementation
@@ -213,8 +218,8 @@
if (m != null)
{
- boolean expired = m.isExpired();
-
+ //if we have already pre acked we cant expire
+ boolean expired = !preAcknowledge && m.isExpired();
flowControlBeforeConsumption(m);
if (expired)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-14 07:32:02 UTC (rev 7567)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-14 09:53:22 UTC (rev 7568)
@@ -1174,7 +1174,8 @@
: null,
executor,
channel,
- directory);
+ directory,
+ preAcknowledge);
addConsumer(consumer);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java 2009-07-14 07:32:02 UTC (rev 7567)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/consumer/ConsumerTest.java 2009-07-14 09:53:22 UTC (rev 7568)
@@ -40,6 +40,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -122,4 +123,32 @@
assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
session.close();
}
+
+ public void testPreCommitAcksWithMessageExpiry() throws Exception
+ {
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, JBossSession.PRE_ACKNOWLEDGE);
+ jBossQueue = new JBossQueue(Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 1000;
+ for (int i = 0; i < noOfMessages; i++)
+ {
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ producer.setTimeToLive(1);
+ producer.send(textMessage);
+ }
+
+ conn.start();
+ for (int i = 0; i < noOfMessages; i++)
+ {
+ Message m = consumer.receive(500);
+ assertNotNull(m);
+ }
+ // assert that all the messages are there and none have been acked
+ SimpleString queueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + Q_NAME);
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ session.close();
+ }
}
More information about the jboss-cvs-commits
mailing list