Author: timfox
Date: 2009-08-27 11:22:39 -0400 (Thu, 27 Aug 2009)
New Revision: 7924
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
Log:
Fixed various issues with pre-ack
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-08-27
14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -101,8 +101,6 @@
private boolean stopped = false;
- private final boolean preAcknowledge;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -112,8 +110,7 @@
final int ackBatchSize,
final TokenBucketLimiter rateLimiter,
final Executor executor,
- final Channel channel,
- final boolean preAcknowledge)
+ final Channel channel)
{
this.id = id;
@@ -128,8 +125,6 @@
this.clientWindowSize = clientWindowSize;
this.ackBatchSize = ackBatchSize;
-
- this.preAcknowledge = preAcknowledge;
}
// ClientConsumer implementation
@@ -214,7 +209,8 @@
if (m != null)
{
// if we have already pre acked we cant expire
- boolean expired = !preAcknowledge && m.isExpired();
+ boolean expired = m.isExpired();
+
flowControlBeforeConsumption(m);
if (expired)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-08-27 14:22:20
UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-08-27 15:22:39
UTC (rev 7924)
@@ -232,7 +232,7 @@
{
internalCreateQueue(address, queueName, null, false, false);
}
-
+
public void createQueue(final SimpleString address, final SimpleString queueName,
final boolean durable) throws HornetQException
{
internalCreateQueue(address, queueName, null, durable, false);
@@ -321,12 +321,12 @@
{
return createConsumer(queueName, filterString, consumerWindowSize, consumerMaxRate,
false);
}
-
+
public void createQueue(final String address, final String queueName) throws
HornetQException
{
internalCreateQueue(toSimpleString(address), toSimpleString(queueName), null,
false, false);
}
-
+
public ClientConsumer createConsumer(final String queueName, final String
filterString) throws HornetQException
{
return createConsumer(toSimpleString(queueName), toSimpleString(filterString));
@@ -618,9 +618,13 @@
{
checkClosed();
- SessionExpiredMessage message = new SessionExpiredMessage(consumerID, messageID);
+ //We don't send expiries for pre-ack since message will already have been acked
on server
+ if (!preAcknowledge)
+ {
+ SessionExpiredMessage message = new SessionExpiredMessage(consumerID,
messageID);
- channel.send(message);
+ channel.send(message);
+ }
}
public void addConsumer(final ClientConsumerInternal consumer)
@@ -798,7 +802,7 @@
{
channel.returnBlocking();
}
-
+
public ConnectionManager getConnectionManager()
{
return connectionManager;
@@ -1193,8 +1197,7 @@
false)
:
null,
executor,
- channel,
- preAcknowledge);
+ channel);
addConsumer(consumer);
@@ -1204,8 +1207,6 @@
if (windowSize != 0)
{
- log.info("Sending " + windowSize + " initial credits");
-
channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-27 14:22:20 UTC
(rev 7923)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java 2009-08-27 15:22:39 UTC
(rev 7924)
@@ -585,12 +585,12 @@
else
{
if (packet.isResponse())
- {
- response = packet;
-
+ {
confirm(packet);
lock.lock();
+
+ response = packet;
try
{
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-27
14:22:20 UTC (rev 7923)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -232,16 +232,14 @@
failureCheckThread.close();
- // We need to stop them accepting first so no new connections are accepted after we
send the disconnect message
+ // We need to stop them accepting first so no new connections are accepted after we
send the disconnect message
for (Acceptor acceptor : acceptors)
{
acceptor.pause();
}
-
- log.info("there are " + connections.size() + " connections to close
on server close");
+
for (ConnectionEntry entry : connections.values())
- {
- log.info("sending disconnect message");
+ {
entry.connection.getChannel(0, -1, false).sendAndFlush(new
PacketImpl(DISCONNECT));
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-27 14:22:20 UTC
(rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-08-27 15:22:39 UTC
(rev 7924)
@@ -146,6 +146,8 @@
private final Map<Consumer, Iterator<MessageReference>> iterators = new
HashMap<Consumer, Iterator<MessageReference>>();
private ConcurrentMap<SimpleString, Consumer> groups = new
ConcurrentHashMap<SimpleString, Consumer>();
+
+ private final SimpleString expiryAddress;
public QueueImpl(final long persistenceID,
final SimpleString address,
@@ -190,6 +192,15 @@
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
+
+ if (addressSettingsRepository != null)
+ {
+ expiryAddress =
addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
+ }
+ else
+ {
+ expiryAddress = null;
+ }
}
// Bindable implementation
-------------------------------------------------------------------------------------
@@ -737,35 +748,17 @@
messageReferences.addFirst(reference, reference.getMessage().getPriority());
}
}
- }
+ }
public void expire(final MessageReference ref) throws Exception
- {
- SimpleString expiryAddress =
addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
-
+ {
+ log.info("expiring ref " + this.expiryAddress);
if (expiryAddress != null)
{
- Bindings bindingList = postOffice.getBindingsForAddress(expiryAddress);
-
- if (bindingList.getBindings().isEmpty())
- {
- if (log.isDebugEnabled())
- {
- log.debug("Message " + ref + " has expired without any
binding for expiry address " + expiryAddress + ", dropping it");
- }
- }
- else
- {
- move(expiryAddress, ref, true);
- }
+ move(expiryAddress, ref, true);
}
else
- {
- if (log.isDebugEnabled())
- {
- log.debug("Message " + ref + " has expired without any expiry
address configured for " + name + ", dropping it");
- }
-
+ {
acknowledge(ref);
}
}
@@ -1312,6 +1305,7 @@
Iterator<MessageReference> iterator = null;
+ //TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
Set<Consumer> nullReferences = new HashSet<Consumer>();
@@ -1335,6 +1329,7 @@
else
{
reference = null;
+
if (consumer.getFilter() != null)
{
// we have iterated on the whole queue for
@@ -1361,6 +1356,32 @@
else
{
nullReferences.remove(consumer);
+
+ if (reference.getMessage().isExpired())
+ {
+ //We expire messages on the server too
+ if (iterator == null)
+ {
+ messageReferences.removeFirst();
+ }
+ else
+ {
+ iterator.remove();
+ }
+
+ referenceHandled();
+
+ try
+ {
+ expire(reference);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to expire ref", e);
+ }
+
+ continue;
+ }
}
initPagingStore(reference.getMessage().getDestination());
@@ -1629,7 +1650,7 @@
// TODO: We could optimize this by storing the paging-store for the address on the
Queue. We would need to know
// the Address for the Queue
- PagingStore store = null;
+ PagingStore store;
if (pagingManager != null)
{
@@ -1637,13 +1658,14 @@
store.addSize(-ref.getMemoryEstimate());
}
+ else
+ {
+ store = null;
+ }
- if (message.decrementRefCount() == 0)
+ if (message.decrementRefCount() == 0 && store != null)
{
- if (store != null)
- {
- store.addSize(-ref.getMessage().getMemoryEstimate());
- }
+ store.addSize(-ref.getMessage().getMemoryEstimate());
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-08-27
14:22:20 UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -397,7 +397,7 @@
{
return null;
}
-
+
// Expiries can come in out of sequence with respect to delivery order
Iterator<MessageReference> iter = deliveringRefs.iterator();
@@ -418,15 +418,6 @@
}
}
- if (ref == null)
- {
- throw new IllegalStateException("Could not find reference with id " +
messageID +
- " backup " +
- messageQueue.isBackup() +
- " closed " +
- closed);
- }
-
return ref;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-08-27 14:22:20
UTC (rev 7923)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-08-27 15:22:39
UTC (rev 7924)
@@ -1622,7 +1622,6 @@
{
MessageReference ref =
consumers.get(packet.getConsumerID()).getExpired(packet.getMessageID());
- // Null implies a browser
if (ref != null)
{
ref.getQueue().expire(ref);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-08-27 14:22:20 UTC
(rev 7923)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2009-08-27 15:22:39 UTC
(rev 7924)
@@ -248,8 +248,8 @@
* <p/>
* $Id$
*/
-public class HornetQConnection implements Connection, QueueConnection, TopicConnection,
XAConnection, XAQueueConnection,
- XATopicConnection
+public class HornetQConnection implements Connection, QueueConnection, TopicConnection,
XAConnection,
+ XAQueueConnection, XATopicConnection
{
// Constants
------------------------------------------------------------------------------------
@@ -304,18 +304,18 @@
private final int transactionBatchSize;
private ClientSession initialSession;
-
+
private final Exception creationStack;
// Constructors
---------------------------------------------------------------------------------
public HornetQConnection(final String username,
- final String password,
- final int connectionType,
- final String clientID,
- final int dupsOKBatchSize,
- final int transactionBatchSize,
- final ClientSessionFactory sessionFactory)
+ final String password,
+ final int connectionType,
+ final String clientID,
+ final int dupsOKBatchSize,
+ final int transactionBatchSize,
+ final ClientSessionFactory sessionFactory)
{
this.username = username;
@@ -334,7 +334,7 @@
this.dupsOKBatchSize = dupsOKBatchSize;
this.transactionBatchSize = transactionBatchSize;
-
+
this.creationStack = new Exception();
}
@@ -622,19 +622,18 @@
{
if (!closed)
{
- log.warn("I'm closing a JMS connection you left open. Please make sure
you close all JMS connections explicitly " +
- "before letting them go out of scope!");
-
+ log.warn("I'm closing a JMS connection you left open. Please make sure
you close all JMS connections explicitly " + "before letting them go out of
scope!");
+
log.warn("The JMS connection you didn't close was created here:",
creationStack);
-
+
close();
}
}
protected HornetQSession createSessionInternal(final boolean transacted,
- int acknowledgeMode,
- final boolean isXA,
- final int type) throws JMSException
+ int acknowledgeMode,
+ final boolean isXA,
+ final int type) throws JMSException
{
if (transacted)
{
@@ -647,19 +646,43 @@
if (acknowledgeMode == Session.SESSION_TRANSACTED)
{
- session = sessionFactory.createSession(username, password, isXA, false,
false, false, transactionBatchSize);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ false,
+ false,
+ sessionFactory.isPreAcknowledge(),
+ transactionBatchSize);
}
else if (acknowledgeMode == Session.AUTO_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, true,
false, 0);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ true,
+ true,
+ sessionFactory.isPreAcknowledge(),
+ 0);
}
else if (acknowledgeMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, true,
false, dupsOKBatchSize);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ true,
+ true,
+ sessionFactory.isPreAcknowledge(),
+ dupsOKBatchSize);
}
else if (acknowledgeMode == Session.CLIENT_ACKNOWLEDGE)
{
- session = sessionFactory.createSession(username, password, isXA, true, false,
false, transactionBatchSize);
+ session = sessionFactory.createSession(username,
+ password,
+ isXA,
+ true,
+ false,
+ sessionFactory.isPreAcknowledge(),
+ transactionBatchSize);
}
else if (acknowledgeMode == HornetQSession.PRE_ACKNOWLEDGE)
{
@@ -722,33 +745,33 @@
private static class JMSFailureListener implements FailureListener
{
private WeakReference<HornetQConnection> connectionRef;
-
+
JMSFailureListener(final HornetQConnection connection)
{
connectionRef = new WeakReference<HornetQConnection>(connection);
}
-
+
public synchronized void connectionFailed(final HornetQException me)
{
if (me == null)
{
return;
}
-
+
HornetQConnection conn = connectionRef.get();
-
+
if (conn != null)
{
try
{
final ExceptionListener exceptionListener = conn.getExceptionListener();
-
+
if (exceptionListener != null)
{
final JMSException je = new JMSException(me.toString());
-
+
je.initCause(me);
-
+
new Thread(new Runnable()
{
public void run()
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-08-27
14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -212,11 +212,7 @@
server.start();
ClientSessionFactory sf = createInVMFactory();
-
- session = sf.createSession(false, false, false);
-
- session.createQueue(ADDRESS, ADDRESS, true);
-
+
SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -228,6 +224,11 @@
server.getAddressSettingsRepository().addMatch("*", addressSettings);
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
@@ -346,21 +347,21 @@
server = createServer(true);
server.start();
+
+ AddressSettings addressSettings = new AddressSettings();
+ SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+ addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+
+ server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
ClientSessionFactory sf = createInVMFactory();
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
-
- SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
-
- AddressSettings addressSettings = new AddressSettings();
-
- addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
-
- server.getAddressSettingsRepository().addMatch("*", addressSettings);
-
+
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
ClientProducer producer = session.createProducer(ADDRESS);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-08-27
14:22:20 UTC (rev 7923)
+++
trunk/tests/src/org/hornetq/tests/integration/client/MessageExpirationTest.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -22,6 +22,7 @@
import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.SimpleString;
@@ -77,16 +78,71 @@
consumer.close();
session.deleteQueue(queue);
}
+
+ public void testMessageExpirationOnServer() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message = session.createClientMessage(false);
+ message.setExpiration(System.currentTimeMillis() + EXPIRATION);
+ producer.send(message);
+
+ Thread.sleep(EXPIRATION * 2);
+
+ session.start();
+
+ Thread.sleep(500);
+
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
+
+
+ ClientMessage message2 = consumer.receive(500);
+ assertNull(message2);
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
+ public void testMessageExpirationOnClient() throws Exception
+ {
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ session.createQueue(address, queue, false);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createClientMessage(false);
+ message.setExpiration(System.currentTimeMillis() + EXPIRATION);
+ producer.send(message);
+
+ session.start();
+
+ Thread.sleep(EXPIRATION * 2);
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ ClientMessage message2 = consumer.receive(500);
+ assertNull(message2);
+
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queue).getBindable()).getDeliveringCount());
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queue).getBindable()).getMessageCount());
+
+ consumer.close();
+ session.deleteQueue(queue);
+ }
+
public void testMessageExpiredWithExpiryAddress() throws Exception
{
SimpleString address = randomSimpleString();
SimpleString queue = randomSimpleString();
final SimpleString expiryAddress = randomSimpleString();
SimpleString expiryQueue = randomSimpleString();
-
- session.createQueue(address, queue, false);
- session.createQueue(expiryAddress, expiryQueue, false);
+
server.getAddressSettingsRepository().addMatch(address.toString(), new
AddressSettings()
{
@Override
@@ -96,6 +152,10 @@
}
});
+ session.createQueue(address, queue, false);
+ session.createQueue(expiryAddress, expiryQueue, false);
+
+
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createClientMessage(false);
message.setExpiration(System.currentTimeMillis() + EXPIRATION);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-08-27
14:22:20 UTC (rev 7923)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/consumer/ConsumerTest.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -74,13 +74,42 @@
Message m = consumer.receive(500);
assertNotNull(m);
}
- // assert that all the messages are there and none have been acked
+
SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX +
Q_NAME);
assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
conn.close();
}
+
+ public void testPreCommitAcksSetOnConnectionFactory() throws Exception
+ {
+ ((HornetQConnectionFactory)cf).setPreAcknowledge(true);
+ Connection conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ jBossQueue = new HornetQQueue(Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 100;
+ for (int i = 0; i < noOfMessages; i++)
+ {
+ producer.send(session.createTextMessage("m" + i));
+ }
+ conn.start();
+ for (int i = 0; i < noOfMessages; i++)
+ {
+ Message m = consumer.receive(500);
+ assertNotNull(m);
+ }
+
+ //Messages should all have been acked since we set pre ack on the cf
+ SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX +
Q_NAME);
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ conn.close();
+ }
+
public void testPreCommitAcksWithMessageExpiry() throws Exception
{
Connection conn = cf.createConnection();
@@ -95,14 +124,41 @@
producer.setTimeToLive(1);
producer.send(textMessage);
}
+
+ Thread.sleep(2);
conn.start();
+ Message m = consumer.receive(500);
+ assertNull(m);
+
+ SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX +
Q_NAME);
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
+ conn.close();
+ }
+
+ public void testPreCommitAcksWithMessageExpirySetOnConnectionFactory() throws
Exception
+ {
+ ((HornetQConnectionFactory)cf).setPreAcknowledge(true);
+ Connection conn = cf.createConnection();
+ Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ jBossQueue = new HornetQQueue(Q_NAME);
+ MessageProducer producer = session.createProducer(jBossQueue);
+ MessageConsumer consumer = session.createConsumer(jBossQueue);
+ int noOfMessages = 1000;
for (int i = 0; i < noOfMessages; i++)
{
- Message m = consumer.receive(500);
- assertNotNull(m);
+ TextMessage textMessage = session.createTextMessage("m" + i);
+ producer.setTimeToLive(1);
+ producer.send(textMessage);
}
- // assert that all the messages are there and none have been acked
+
+ Thread.sleep(2);
+
+ conn.start();
+ Message m = consumer.receive(500);
+ assertNull(m);
+
SimpleString queueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX +
Q_NAME);
assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
assertEquals(0,
((Queue)server.getPostOffice().getBinding(queueName).getBindable()).getMessageCount());
@@ -112,7 +168,7 @@
public void testClearExceptionListener() throws Exception
{
Connection conn = cf.createConnection();
- Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = new HornetQQueue(Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener()
@@ -129,7 +185,7 @@
public void testCantReceiveWhenListenerIsSet() throws Exception
{
Connection conn = cf.createConnection();
- Session session = conn.createSession(false, HornetQSession.PRE_ACKNOWLEDGE);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
jBossQueue = new HornetQQueue(Q_NAME);
MessageConsumer consumer = session.createConsumer(jBossQueue);
consumer.setMessageListener(new MessageListener()
Modified: trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java 2009-08-27
14:22:20 UTC (rev 7923)
+++ trunk/tests/src/org/hornetq/tests/integration/server/ExpiryRunnerTest.java 2009-08-27
15:22:39 UTC (rev 7924)
@@ -67,17 +67,7 @@
}
Thread.sleep(1600);
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
- assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
+ assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
}
public void testExpireFromMultipleQueues() throws Exception
@@ -102,16 +92,6 @@
Thread.sleep(1600);
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 0; i < numMessages * 2; i++)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
}
public void testExpireHalf() throws Exception
@@ -131,16 +111,6 @@
Thread.sleep(1600);
assertEquals(numMessages / 2,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- ClientConsumer consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 0; i < numMessages; i += 2)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
}
public void testExpireConsumeHalf() throws Exception
@@ -167,24 +137,16 @@
Thread.sleep(2100);
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getMessageCount());
assertEquals(0,
((Queue)server.getPostOffice().getBinding(qName).getBindable()).getDeliveringCount());
-
- consumer = clientSession.createConsumer(expiryQueue);
- clientSession.start();
- for (int i = 50; i < numMessages; i++)
- {
- ClientMessage cm = consumer.receive(500);
- assertNotNull(cm);
- //assertEquals("m" + i, cm.getBody().getString());
- }
- consumer.close();
}
- public void testExpireToMultipleQueues() throws Exception
- {
- clientSession.createQueue(qName, qName2, null, false);
+ public void testExpireToExpiryQueue() throws Exception
+ {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setExpiryAddress(expiryAddress);
server.getAddressSettingsRepository().addMatch(qName2.toString(),
addressSettings);
+ clientSession.deleteQueue(qName);
+ clientSession.createQueue(qName, qName, null, false);
+ clientSession.createQueue(qName, qName2, null, false);
ClientProducer producer = clientSession.createProducer(qName);
int numMessages = 100;
long expiration = System.currentTimeMillis();
@@ -222,7 +184,8 @@
CountDownLatch latch = new CountDownLatch(1);
DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(consumer,
latch);
clientSession.start();
- new Thread(dummyMessageHandler).start();
+ Thread thr = new Thread(dummyMessageHandler);
+ thr.start();
long expiration = System.currentTimeMillis() + 1000;
int numMessages = 0;
long sendMessagesUntil = System.currentTimeMillis() + 2000;
@@ -257,6 +220,7 @@
}
assertTrue(dummyMessageHandler.payloads.isEmpty());
consumer.close();
+ thr.join();
}
public static void main(String[] args) throws Exception