[jboss-cvs] JBoss Messaging SVN: r3687 - in trunk: src/main/org/jboss/messaging/core and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 8 13:08:20 EST 2008
Author: timfox
Date: 2008-02-08 13:08:20 -0500 (Fri, 08 Feb 2008)
New Revision: 3687
Removed:
trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/MessageReference.java
trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
trunk/tests/build.xml
Log:
Completed DLQ and Expiry Queue logic
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-08 18:08:20 UTC (rev 3687)
@@ -88,6 +88,8 @@
private final boolean autoDeleteQueue;
private final boolean enableFlowControl;
+
+ private final PersistenceManager persistenceManager;
// Constructors ---------------------------------------------------------------------------------
@@ -113,6 +115,8 @@
this.enableFlowControl = enableFlowControl;
+ this.persistenceManager = sessionEndpoint.getConnectionEndpoint().getMessagingServer().getPersistenceManager();
+
// adding the consumer to the queue
messageQueue.addConsumer(this);
@@ -132,7 +136,7 @@
if (ref.getMessage().isExpired())
{
- sessionEndpoint.expireDelivery(ref);
+ ref.expire(persistenceManager);
return HandleStatus.HANDLED;
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 18:08:20 UTC (rev 3687)
@@ -213,28 +213,6 @@
// Package protected
// ----------------------------------------------------------------------------
- void expireDelivery(MessageReference ref) throws Exception
- {
- Queue expiryQueue = ref.getQueue().getExpiryQueue();
-
- if (expiryQueue != null)
- {
- Message copy = makeCopyForDLQOrExpiry(true, ref);
-
- moveInTransaction(copy, ref, expiryQueue, true);
- }
- else
- {
- log.warn("No expiry queue has been configured so removing expired " + ref);
-
- // TODO - tidy up these references - ugly
- ref.acknowledge(this.getConnectionEndpoint().getMessagingServer()
- .getPersistenceManager());
- }
-
- // TODO
- }
-
void removeBrowser(String browserId) throws Exception
{
if (browsers.remove(browserId) == null)
@@ -497,8 +475,10 @@
}
else if (expired)
{
- if (deliveryID == -1) { throw new IllegalArgumentException(
- "Invalid delivery id"); }
+ if (deliveryID == -1)
+ {
+ throw new IllegalArgumentException("Invalid delivery id");
+ }
// Expire a single reference
@@ -508,8 +488,7 @@
if (delivery.getDeliveryID() == deliveryID)
{
- // TODO - send to expiry queue
- delivery.getReference().acknowledge(sp.getPersistenceManager());
+ delivery.getReference().expire(sp.getPersistenceManager());
iter.remove();
@@ -819,134 +798,6 @@
// Private
// --------------------------------------------------------------------------------------
- // private void cancelDeliveryInternal(Cancel cancel) throws Exception
- // {
- // DeliveryRecord rec =
- // (DeliveryRecord)deliveries.remove(cancel.getDeliveryId());
- //
- // if (rec == null)
- // {
- // //The delivery might not be found, if the session is not replicated (i.e.
- // auto_ack or dups_ok)
- // //and has failed over since recoverDeliveries won't have been called
- // if (trace)
- // {
- // log.trace("Cannot find delivery to cancel, session probably failed over
- // and is not replicated");
- // }
- // return;
- // }
- //
- // MessageReference ref = rec.ref;
- //
- // //Note we check the flag *and* evaluate again, this is because the server
- // and client clocks may
- // //be out of synch and don't want to send back to the client a message it
- // thought it has sent to
- // //the expiry queue
- // boolean expired = cancel.isExpired() || ref.getMessage().isExpired();
- //
- // //Note we check the flag *and* evaluate again, this is because the server
- // value of maxDeliveries
- // //might get changed after the client has sent the cancel - and we don't
- // want to end up cancelling
- // //back to the original queue
- // boolean reachedMaxDeliveryAttempts =
- // cancel.isReachedMaxDeliveryAttempts() || cancel.getDeliveryCount() >=
- // rec.maxDeliveryAttempts;
- //
- // if (!expired && !reachedMaxDeliveryAttempts)
- // {
- // //Normal cancel back to the queue
- //
- // ref.setDeliveryCount(cancel.getDeliveryCount());
- //
- // //Do we need to set a redelivery delay?
- //
- // if (rec.redeliveryDelay != 0)
- // {
- // ref.setScheduledDeliveryTime(System.currentTimeMillis() +
- // rec.redeliveryDelay);
- // }
- //
- // if (trace) { log.trace("Cancelling delivery " + cancel.getDeliveryId()); }
- //
- // ref.cancel(sp.getPersistenceManager());
- //
- // }
- // else
- // {
- // if (expired)
- // {
- // //Sent to expiry queue
- //
- // Message copy = makeCopyForDLQOrExpiry(true, ref);
- //
- // moveInTransaction(copy, ref, rec.expiryQueue, false);
- // }
- // else
- // {
- // //Send to DLQ
- //
- // Message copy = makeCopyForDLQOrExpiry(false, ref);
- //
- // moveInTransaction(copy, ref, rec.dlq, true);
- // }
- // }
- // }
-
- private Message makeCopyForDLQOrExpiry(boolean expiry, MessageReference ref)
- throws Exception
- {
- // We copy the message and send that to the dlq/expiry queue - this is
- // because
- // otherwise we may end up with a ref with the same message id in the
- // queue more than once
- // which would barf - this might happen if the same message had been
- // expire from multiple
- // subscriptions of a topic for example
- // We set headers that hold the original message destination, expiry time
- // and original message id
-
- if (trace)
- {
- log.trace("Making copy of message for DLQ or expiry " + ref);
- }
-
- Message msg = ref.getMessage();
-
- Message copy = msg.copy();
-
- long newMessageId = sp.getPersistenceManager().generateMessageID();
-
- copy.setMessageID(newMessageId);
-
- // reset expiry
- copy.setExpiration(0);
-
- if (expiry)
- {
- long actualExpiryTime = System.currentTimeMillis();
-
- copy.putHeader(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
- }
-
- return copy;
- }
-
- private void moveInTransaction(Message msg, MessageReference ref,
- Queue queue, boolean dlq) throws Exception
- {
- Transaction tx = new TransactionImpl();
-
- tx.addMessage(msg);
-
- tx.addAcknowledgement(ref);
-
- tx.commit(true, getConnectionEndpoint().getMessagingServer()
- .getPersistenceManager());
- }
-
private void addAddress(String address) throws Exception
{
if (postOffice.containsAllowableAddress(address)) { throw new MessagingException(
Modified: trunk/src/main/org/jboss/messaging/core/MessageReference.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/MessageReference.java 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/messaging/core/MessageReference.java 2008-02-08 18:08:20 UTC (rev 3687)
@@ -60,7 +60,9 @@
void acknowledge(PersistenceManager persistenceManager) throws Exception;
- void cancel(PersistenceManager persistenceManager) throws Exception;
+ void cancel(PersistenceManager persistenceManager) throws Exception;
+
+ void expire(PersistenceManager persistenceManager) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageReferenceImpl.java 2008-02-08 18:08:20 UTC (rev 3687)
@@ -136,8 +136,44 @@
}
queue.decrementDeliveringCount();
+
+ int maxDeliveries = queue.getMaxDeliveryAttempts();
+
+ if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
+ {
+ if (queue.getDLQ() != null)
+ {
+ Message copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
+
+ moveInTransaction(queue.getDLQ(), copyMessage, persistenceManager);
+ }
+ else
+ {
+ //No DLQ
+
+ log.warn("Message has reached maximum delivery attempts, no DLQ is configured so dropping it");
+
+ acknowledge(persistenceManager);
+ }
+ }
}
+ public void expire(PersistenceManager persistenceManager) throws Exception
+ {
+ if (queue.getExpiryQueue() != null)
+ {
+ Message copyMessage = makeCopyForDLQOrExpiry(false, persistenceManager);
+
+ moveInTransaction(queue.getExpiryQueue(), copyMessage, persistenceManager);
+ }
+ else
+ {
+ log.warn("Message has expired, no expiry queue is configured so dropping it");
+
+ acknowledge(persistenceManager);
+ }
+ }
+
// Public --------------------------------------------------------
public String toString()
@@ -150,7 +186,51 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+
+ private void moveInTransaction(Queue destinationQueue, Message copyMessage,
+ PersistenceManager persistenceManager) throws Exception
+ {
+ copyMessage.createReference(queue.getExpiryQueue());
+
+ TransactionImpl tx = new TransactionImpl();
+
+ tx.addMessage(copyMessage);
+
+ tx.addAcknowledgement(this);
+
+ tx.commit(true, persistenceManager);
+ }
+
+ private Message makeCopyForDLQOrExpiry(boolean expiry, PersistenceManager pm) throws Exception
+ {
+ /*
+ We copy the message and send that to the dlq/expiry queue - this is
+ because otherwise we may end up with a ref with the same message id in the
+ queue more than once which would barf - this might happen if the same message had been
+ expire from multiple subscriptions of a topic for example
+ We set headers that hold the original message destination, expiry time
+ and original message id
+ */
+ Message copy = message.copy();
+
+ long newMessageId = pm.generateMessageID();
+
+ copy.setMessageID(newMessageId);
+
+ // reset expiry
+ copy.setExpiration(0);
+
+ if (expiry)
+ {
+ long actualExpiryTime = System.currentTimeMillis();
+
+ copy.putHeader(Message.HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
+ }
+
+ return copy;
+ }
+
// Inner classes -------------------------------------------------
}
\ No newline at end of file
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/tests/build.xml 2008-02-08 18:08:20 UTC (rev 3687)
@@ -520,8 +520,6 @@
<exclude name="**/jms/bridge/**"/>
<exclude name="**/jms/manual/**"/>
<exclude name="**/jms/clustering/**"/>
- <exclude name="**/jms/ExpiryQueueTest.class"/>
- <exclude name="**/jms/DLQTest.class"/>
<exclude name="**/postoffice/**"/>
<exclude name="**/jms/JCAWrapperTest.class"/>
<exclude name="**/jms/server/ServerPeerTest.class"/>
@@ -643,8 +641,6 @@
<exclude name="**/jms/JCAWrapperTest.class"/>
<exclude name="**/jms/ClientExitTest.class"/>
- <exclude name="**/jms/ExpiryQueueTest.class"/>
- <exclude name="**/jms/DLQTest.class"/>
<exclude name="**/jms/SecurityTest.class"/>
<exclude name="**/jms/crash/*Test.class"/>
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2008-02-08 18:08:20 UTC (rev 3687)
@@ -1,835 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-
-import org.jboss.jms.destination.JBossQueue;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.messaging.core.impl.server.MessagingServerImpl;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-/**
- * A DLQTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- * <p/>
- * $Id$
- */
-public class DLQTest extends JMSTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public DLQTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void testDefaultAndOverrideDLQ() throws Exception
- {
- if (ServerManagement.isRemote())
- {
- return;
- }
-
- final int NUM_MESSAGES = 5;
-
- final int MAX_DELIVERIES = 8;
-
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
-
- Connection conn = null;
-
- try
- {
- String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
- String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue3";
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
- ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
-
- conn = cf.createConnection();
-
- {
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < MAX_DELIVERIES; i++)
- {
- for (int j = 0; j < NUM_MESSAGES; j++)
- {
- TextMessage tm = (TextMessage) cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + j, tm.getText());
- }
-
- sess2.recover();
- }
-
- //Prompt them to go to DLQ
- cons.receive(100);
-
- //At this point all the messages have been delivered exactly MAX_DELIVERIES times
-
- checkEmpty(queue1);
-
- //Now should be in default dlq
-
- MessageConsumer cons3 = sess.createConsumer(queue2);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- conn.close();
- }
-
-
- {
- //Now try with overriding the default dlq
-
- conn = cf.createConnection();
-
- ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < MAX_DELIVERIES; i++)
- {
- for (int j = 0; j < NUM_MESSAGES; j++)
- {
- TextMessage tm = (TextMessage) cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + j, tm.getText());
- }
-
- sess2.recover();
- }
-
- cons.receive(100);
-
- //At this point all the messages have been delivered exactly MAX_DELIVERIES times
-
- checkEmpty(queue1);
-
- //Now should be in override dlq
-
- MessageConsumer cons3 = sess.createConsumer(queue3);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
- }
- }
- finally
- {
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ");
-
- ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
-
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
-
- public void testWithMessageListenerPersistent() throws Exception
- {
- testWithMessageListener(true);
- }
-
- public void testWithMessageListenerNonPersistent() throws Exception
- {
- testWithMessageListener(false);
- }
-
- public void testWithReceiveClientAckPersistent() throws Exception
- {
- this.testWithReceiveClientAck(true);
- }
-
- public void testWithReceiveClientAckNonPersistent() throws Exception
- {
- testWithReceiveClientAck(false);
- }
-
- public void testWithReceiveTransactionalPersistent() throws Exception
- {
- this.testWithReceiveTransactional(true);
- }
-
- public void testWithReceiveTransactionalNonPersistent() throws Exception
- {
- testWithReceiveTransactional(false);
- }
-
- public void testHeadersSet() throws Exception
- {
- Connection conn = null;
-
- try
- {
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
-
- final int MAX_DELIVERIES = 16;
-
- final int NUM_MESSAGES = 5;
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
- int maxRedeliveryAttempts =
- ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
- assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- Map origIds = new HashMap();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- prod.send(tm);
-
- origIds.put(tm.getText(), tm.getJMSMessageID());
- }
-
- Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < MAX_DELIVERIES; i++)
- {
- for (int j = 0; j < NUM_MESSAGES; j++)
- {
- TextMessage tm = (TextMessage) cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + j, tm.getText());
- }
-
- sess2.rollback();
- }
-
- //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
- //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
-
- checkEmpty(queue2);
-
- // So let's try and consume them - this should cause them to go to the DLQ - since they
- // will then exceed max delivery attempts
- Message m = cons.receive(100);
-
- assertNull(m);
-
- //All the messages should now be in the DLQ
-
- MessageConsumer cons3 = sess.createConsumer(queue2);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
-
- // Check the headers
- String origDest =
- tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION);
-
- String origMessageId =
- tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_MESSAGE_ID);
-
- assertEquals(queue1.toString(), origDest);
-
- String origId = (String) origIds.get(tm.getText());
-
- assertEquals(origId, origMessageId);
- }
- }
- finally
- {
- if (conn != null) conn.close();
- }
- }
-
- public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
- {
- int md = getDefaultMaxDeliveryAttempts();
- try
- {
- int maxDeliveryAttempts = md - 5;
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
- maxDeliveryAttempts);
- testMaxDeliveryAttempts(queue1, maxDeliveryAttempts, true);
- }
- finally
- {
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
- md);
- }
- }
-
- public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
- {
- int md = getDefaultMaxDeliveryAttempts();
- try
- {
- int maxDeliveryAttempts = md - 5;
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
- maxDeliveryAttempts);
-
- testMaxDeliveryAttempts(topic1, maxDeliveryAttempts, false);
- }
- finally
- {
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
- md);
- }
- }
-
- public void testUseDefaultMaxDeliveryAttemptsForQueue() throws Exception
- {
- int md = getDefaultMaxDeliveryAttempts();
- try
- {
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
- -1);
-
- // Check that defaultMaxDeliveryAttempts takes effect
- testMaxDeliveryAttempts(queue1, getDefaultMaxDeliveryAttempts(), true);
- }
- finally
- {
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
- md);
- }
- }
-
- public void testUseDefaultMaxDeliveryAttemptsForTopic() throws Exception
- {
- int md = getDefaultMaxDeliveryAttempts();
- try
- {
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
- -1);
-
- // Check that defaultMaxDeliveryAttempts takes effect
- testMaxDeliveryAttempts(topic1, getDefaultMaxDeliveryAttempts(), false);
- }
- finally
- {
- setMaxDeliveryAttempts(
- new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
- md);
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void testWithMessageListener(boolean persistent) throws Exception
- {
- Connection conn = null;
-
- try
- {
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- final int MAX_DELIVERIES = 16;
-
- final int NUM_MESSAGES = 5;
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
- String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
- int maxRedeliveryAttempts =
- ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
- assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- prod.send(tm);
- }
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- FailingMessageListener listener = new FailingMessageListener(MAX_DELIVERIES * NUM_MESSAGES);
-
- cons.setMessageListener(listener);
-
- conn.start();
-
- listener.waitForMessages();
-
- assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);
-
- //Message should all be in the dlq - let's check
-
- MessageConsumer cons2 = sess.createConsumer(queue2);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons2.receive(1000);
-
- assertNotNull(tm);
-
- log.info("Got mnessage" + tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- checkEmpty(queue1);
- }
- finally
- {
- if (conn != null) conn.close();
- }
- }
-
-
- protected void testWithReceiveClientAck(boolean persistent) throws Exception
- {
- Connection conn = null;
-
- try
- {
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- final int MAX_DELIVERIES = 16;
-
- final int NUM_MESSAGES = 5;
-
- String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
- int maxRedeliveryAttempts =
- ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
- assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < MAX_DELIVERIES; i++)
- {
- for (int j = 0; j < NUM_MESSAGES; j++)
- {
- TextMessage tm = (TextMessage) cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + j, tm.getText());
- }
-
- sess2.recover();
- }
-
- //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
- //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
-
- checkEmpty(queue2);
-
- //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
- //delivery attempts
-
- Message m = cons.receive(100);
-
- assertNull(m);
-
- //Now, all the messages should now be in the DLQ
-
- MessageConsumer cons3 = sess.createConsumer(queue2);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- //No more should be available
-
- cons.close();
-
- checkEmpty(queue1);
- }
- finally
- {
- destroyQueue("DLQ");
-
- if (conn != null) conn.close();
- }
- }
-
- protected void testWithReceiveTransactional(boolean persistent) throws Exception
- {
- Connection conn = null;
-
- try
- {
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- final int MAX_DELIVERIES = 16;
-
- final int NUM_MESSAGES = 5;
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
-
- String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
-
- int maxRedeliveryAttempts =
- ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
-
- assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
-
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < MAX_DELIVERIES; i++)
- {
- for (int j = 0; j < NUM_MESSAGES; j++)
- {
- TextMessage tm = (TextMessage) cons.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + j, tm.getText());
- }
-
- sess2.rollback();
- }
-
- //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
- //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
-
- checkEmpty(queue2);
-
- //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
- //delivery attempts
- Message m = cons.receive(100);
-
- assertNull(m);
-
- //All the messages should now be in the DLQ
-
- MessageConsumer cons3 = sess.createConsumer(queue2);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- //No more should be available
-
- checkEmpty(queue1);
- }
- finally
- {
- destroyQueue("DLQ");
-
- if (conn != null) conn.close();
- }
- }
-
- protected int getDefaultMaxDeliveryAttempts() throws Exception
- {
- return ((Integer) ServerManagement.getAttribute(
- ServerManagement.getServerPeerObjectName(),
- "DefaultMaxDeliveryAttempts"))
- .intValue();
- }
-
- protected void setMaxDeliveryAttempts(ObjectName dest, int maxDeliveryAttempts) throws Exception
- {
- ServerManagement.setAttribute(dest, "MaxDeliveryAttempts",
- Integer.toString(maxDeliveryAttempts));
- }
-
- protected void testMaxDeliveryAttempts(Destination destination, int destMaxDeliveryAttempts, boolean queue) throws Exception
- {
- Connection conn = cf.createConnection();
-
- if (!queue)
- {
- conn.setClientID("wib123");
- }
-
- try
- {
- ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
- "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
-
- // Create the consumer before the producer so that the message we send doesn't
- // get lost if the destination is a Topic.
- Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
- MessageConsumer destinationConsumer;
-
- if (queue)
- {
- destinationConsumer = consumingSession.createConsumer(destination);
- }
- else
- {
- //For topics we only keep a delivery record on the server side for durable subs
- destinationConsumer = consumingSession.createDurableSubscriber((Topic) destination, "testsub1");
- }
-
- {
- Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = producingSession.createProducer(destination);
- TextMessage tm = producingSession.createTextMessage("Message");
- prod.send(tm);
- }
-
- conn.start();
-
- // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
- for (int i = 0; i < destMaxDeliveryAttempts; i++)
- {
- TextMessage tm = (TextMessage) destinationConsumer.receive(1000);
- assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
- assertEquals("Message", tm.getText());
- consumingSession.recover();
- }
-
- // At this point the message should not yet be in the DLQ
- checkEmpty(queue2);
-
- // Now we try to consume the message again from the destination, which causes it
- // to go to the DLQ instead.
- Message m = destinationConsumer.receive(100);
- assertNull(m);
-
- // The message should be in the DLQ now
- MessageConsumer dlqConsumer = consumingSession.createConsumer(queue2);
- m = dlqConsumer.receive(1000);
- assertNotNull(m);
- assertTrue(m instanceof TextMessage);
- assertEquals("Message", ((TextMessage) m).getText());
-
- m.acknowledge();
-
- if (!queue)
- {
- destinationConsumer.close();
-
- consumingSession.unsubscribe("testsub1");
- }
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- class FailingMessageListener implements MessageListener
- {
- volatile int deliveryCount;
-
- int numMessages;
-
- FailingMessageListener(int numMessages)
- {
- this.numMessages = numMessages;
- }
-
- synchronized void waitForMessages() throws Exception
- {
- while (deliveryCount != numMessages)
- {
- this.wait();
- }
- }
-
- public synchronized void onMessage(Message msg)
- {
- deliveryCount++;
-
- this.notify();
-
- throw new RuntimeException("Your mum!");
- }
-
- }
-
-}
Deleted: trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2008-02-08 17:37:50 UTC (rev 3686)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2008-02-08 18:08:20 UTC (rev 3687)
@@ -1,620 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.ObjectName;
-
-import org.jboss.jms.destination.JBossQueue;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.test.messaging.tools.ServerManagement;
-
-/**
- * A ExpiryQueueTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- * <p/>
- * $Id$
- */
-public class ExpiryQueueTest extends JMSTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ExpiryQueueTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void testExpiryQueueAlreadyDeployed() throws Exception
- {
- if (ServerManagement.isRemote())
- {
- return;
- }
-
- //deployQueue("ExpiryQueue");
-
- /*ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- ObjectName expiryQueueObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultExpiryQueue");
-
- assertNotNull(expiryQueueObjectName);
-
- String name = (String)ServerManagement.getAttribute(expiryQueueObjectName, "Name");
-
- assertNotNull(name);
-
- assertEquals("ExpiryQueue", name);
-
- String jndiName = (String)ServerManagement.getAttribute(expiryQueueObjectName, "JNDIName");
-
- assertNotNull(jndiName);
-
- assertEquals("/queue/ExpiryQueue", jndiName);
-
- org.jboss.messaging.core.contract.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
-
- assertNotNull(expiryQueue);*/
-
- JBossQueue q = (JBossQueue) ic.lookup("/queue/ExpiryQueue");
-
- assertNotNull(q);
-
- assertEquals("ExpiryQueue", q.getName());
-
- }
-
-
- public void testDefaultAndOverrideExpiryQueue() throws Exception
- {
- final int NUM_MESSAGES = 5;
-
- Connection conn = null;
-
- //ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- try
- {
- createQueue("DefaultExpiry");
-
- createQueue("OverrideExpiry");
-
- createQueue("expTestQueue");
-
- String defaultExpiryObjectName = "DefaultExpiry";
-
- String overrideExpiryObjectName = "OverrideExpiry";
-
- String testQueueObjectName = "expTestQueue";
-
- //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
-
- //ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
-
- Queue testQueue = (Queue) ic.lookup("/queue/expTestQueue");
-
- Queue defaultExpiry = (Queue) ic.lookup("/queue/DefaultExpiry");
-
- Queue overrideExpiry = (Queue) ic.lookup("/queue/OverrideExpiry");
-
- conn = cf.createConnection();
-
- {
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(testQueue);
-
- conn.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- //Send messages with time to live of 2000 enough time to get to client consumer - so
- //they won't be expired on the server side
- prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(testQueue);
-
- //The messages should now be sitting in the consumer buffer
-
- //Now give them enough time to expire
-
- Thread.sleep(2500);
-
- //Now try and receive
-
- Message m = cons.receive(1000);
-
- assertNull(m);
-
- //Message should all be in the default expiry queue - let's check
-
- MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- conn.close();
- }
-
- //now try with overriding the default expiry queue
- {
- ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", overrideExpiryObjectName);
-
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(testQueue);
-
- conn.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- //Send messages with time to live of 2000 enough time to get to client consumer - so
- //they won't be expired on the server side
- prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(testQueue);
-
- //The messages should now be sitting in the consumer buffer
-
- //Now give them enough time to expire
-
- Thread.sleep(2500);
-
- //Now try and receive
-
- Message m = cons.receive(1000);
-
- assertNull(m);
-
- //Message should all be in the override expiry queue - let's check
-
- MessageConsumer cons3 = sess.createConsumer(overrideExpiry);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons3.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
- }
- }
- finally
- { //
- //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
-
- destroyQueue("DefaultDLQ");
-
- destroyQueue("OverrideDLQ");
-
- destroyQueue("TestQueue");
-
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testExpireSameMessagesMultiple() throws Exception
- {
- final int NUM_MESSAGES = 5;
-
- Connection conn = null;
-
- try
- {
- createQueue("ExpiryQueue");
-
- String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
-
- ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
-
- ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
-
- Queue defaultExpiry = (Queue) ic.lookup("/queue/ExpiryQueue");
-
- conn = cf.createConnection();
-
- conn.setClientID("wib1");
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(topic1);
-
- conn.start();
-
- //Create 3 durable subscriptions
-
- MessageConsumer sub1 = sess.createDurableSubscriber(topic1, "sub1");
-
- MessageConsumer sub2 = sess.createDurableSubscriber(topic1, "sub2");
-
- MessageConsumer sub3 = sess.createDurableSubscriber(topic1, "sub3");
-
- Map origIds = new HashMap();
-
- long now = System.currentTimeMillis();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- //Send messages with time to live of 3000 enough time to get to client consumer - so
- //they won't be expired on the server side
- prod.send(tm, DeliveryMode.PERSISTENT, 4, 3000);
-
- origIds.put(tm.getText(), tm.getJMSMessageID());
- }
-
- long approxExpiry = now + 3000;
-
- //Now sleep. This wil give them enough time to expire
-
- Thread.sleep(3500);
-
- //Now try and consume from each - this should force the message to the expiry queue
-
- Message m = sub1.receive(500);
- assertNull(m);
-
- m = sub2.receive(500);
- assertNull(m);
-
- m = sub3.receive(500);
- assertNull(m);
-
- //Now the messages should all be in the expiry queue
-
- MessageConsumer cons2 = sess.createConsumer(defaultExpiry);
-
- while (true)
- {
- TextMessage tm = (TextMessage) cons2.receive(500);
-
- if (tm == null)
- {
- break;
- }
-
- // Check the headers
- String origDest =
- tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_DESTINATION);
-
- String origMessageId =
- tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_ORIG_MESSAGE_ID);
-
- long actualExpiryTime =
- tm.getLongProperty(JBossMessage.JBOSS_MESSAGING_ACTUAL_EXPIRY_TIME);
-
- assertEquals(topic1.toString(), origDest);
-
- String origId = (String) origIds.get(tm.getText());
-
- assertEquals(origId, origMessageId);
-
- assertTrue(actualExpiryTime >= approxExpiry);
- }
-
- cons2.close();
-
- sub1.close();
-
- sub2.close();
-
- sub3.close();
-
- sess.unsubscribe("sub1");
-
- sess.unsubscribe("sub2");
-
- sess.unsubscribe("sub3");
-
- }
- finally
- {
- destroyQueue("ExpiryQueue");
-
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testWithMessageListenerPersistent() throws Exception
- {
- testWithMessageListener(true);
- }
-
- public void testWithMessageListenerNonPersistent() throws Exception
- {
- testWithMessageListener(false);
- }
-
- public void testWithReceivePersistent() throws Exception
- {
- this.testWithReceive(true);
- }
-
- public void testWithReceiveNonPersistent() throws Exception
- {
- testWithReceive(false);
- }
-
- public void testWithMessageListener(boolean persistent) throws Exception
- {
- Connection conn = null;
-
-
- Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
-
- final int NUM_MESSAGES = 5;
-
- conn = cf.createConnection();
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- //Send messages with time to live of 2000 enough time to get to client consumer - so
- //they won't be expired on the server side
- prod.send(tm, deliveryMode, 4, 2000);
- }
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- //The messages should now be sitting in the consumer buffer
-
- //Now give them enough time to expire
-
- Thread.sleep(2500);
-
- //Now set a listener
-
- FailingMessageListener listener = new FailingMessageListener();
-
- cons.setMessageListener(listener);
-
- Thread.sleep(1000);
-
- cons.setMessageListener(null);
-
- //No messages should have been received
- assertEquals(0, listener.deliveryCount);
-
- //Shouldn't be able to receive any more
-
- Message m = cons.receive(1000);
-
- assertNull(m);
-
- //Message should all be in the expiry queue - let's check
-
- MessageConsumer cons2 = sess.createConsumer(expiryQueue);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- }
-
- public void testWithReceive(boolean persistent) throws Exception
- {
- Connection conn = null;
-
- try
- {
-
- createQueue("ExpiryQueue");
-
- Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
-
- final int NUM_MESSAGES = 5;
-
- conn = cf.createConnection();
-
- conn.start();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("Message:" + i);
-
- //Send messages with time to live of 2000 enough time to get to client consumer - so
- //they won't be expired on the server side
- prod.send(tm, deliveryMode, 4, 2000);
- }
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- //The messages should now be sitting in the consumer buffer
-
- //Now give them enough time to expire
-
- Thread.sleep(2500);
-
- //Now try and receive
-
- Message m = cons.receive(1000);
-
- assertNull(m);
-
- //Message should all be in the expiry queue - let's check
-
- MessageConsumer cons2 = sess.createConsumer(expiryQueue);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage) cons2.receive(1000);
-
- assertNotNull(tm);
-
- assertEquals("Message:" + i, tm.getText());
- }
-
- }
- finally
- {
- destroyQueue("ExpiryQueue");
-
- if (conn != null) conn.close();
- }
- }
-
- public void testExpirationTransfer() throws Exception
- {
- createQueue("ExpiryQueue");
-
- Object originalValue = ServerManagement.getAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue");
-
- ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
-
- Connection conn = null;
-
- try
- {
- ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
-
- conn = cf.createConnection();
-
- Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- conn.start();
-
- MessageProducer prod = session.createProducer(queue1);
- prod.setTimeToLive(100);
-
- Message m = session.createTextMessage("This message will die");
-
- prod.send(m);
-
- // wait for the message to die
- Thread.sleep(2000);
-
- MessageConsumer cons = session.createConsumer(queue1);
-
- assertNull(cons.receive(3000));
-
- Queue queueExpiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
-
- MessageConsumer consumerExpiredQueue = session.createConsumer(queueExpiryQueue);
-
- TextMessage txt = (TextMessage) consumerExpiredQueue.receive(1000);
-
- assertEquals("This message will die", txt.getText());
-
- assertNull(consumerExpiredQueue.receive(1000));
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
-
- destroyQueue("ExpiryQueue");
-
- ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", originalValue.toString());
- }
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- class FailingMessageListener implements MessageListener
- {
- volatile int deliveryCount;
-
- public void onMessage(Message msg)
- {
- deliveryCount++;
-
- throw new RuntimeException("Your mum!");
- }
-
- }
-
-}
More information about the jboss-cvs-commits
mailing list