[jboss-cvs] JBoss Messaging SVN: r5308 - in trunk: src/main/org/jboss/messaging/core/server/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Nov 7 06:39:44 EST 2008
Author: timfox
Date: 2008-11-07 06:39:43 -0500 (Fri, 07 Nov 2008)
New Revision: 5308
Added:
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/jms/
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/
trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java
Log:
More tests and tweaks
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -86,7 +86,7 @@
this.rateLimiter = rateLimiter;
this.blockOnNonPersistentSend = blockOnNonPersistentSend;
-
+
this.blockOnPersistentSend = blockOnPersistentSend;
this.autoGroupId = autoGroupId;
@@ -230,7 +230,7 @@
}
boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-
+
SessionSendMessage message;
// check to see if this message need to be scheduled.
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -424,7 +424,7 @@
// If the producer is not auto-commit sends then messages are never
// sent blocking - there is no point
// since commit, prepare or rollback will flush any messages sent.
-
+
producer = new ClientProducerImpl(this,
idGenerator.generateID(),
address,
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -243,6 +243,7 @@
Transaction tx = new TransactionImpl(persistenceManager, postOffice);
ServerMessage copyMessage = makeCopy(expiry, persistenceManager);
+
copyMessage.setDestination(otherBinding.getAddress());
tx.addMessage(copyMessage);
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -510,7 +510,7 @@
final boolean autoCommitAcks,
final boolean xa,
final int sendWindowSize) throws Exception
- {
+ {
checkActivate(connection);
return doCreateSession(name,
@@ -577,8 +577,6 @@
// Private
// --------------------------------------------------------------------------------------
- private final Object createSessionLock = new Object();
-
private CreateSessionResponseMessage doCreateSession(final String name,
final long channelID,
final String username,
@@ -646,7 +644,7 @@
channel.setHandler(handler);
connection.addFailureListener(session);
-
+
return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
Added: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java (rev 0)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/DLQTest.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -0,0 +1,834 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A DLQTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3674 $</tt>
+ * <p/>
+ * $Id: DLQTest.java 3674 2008-02-07 18:21:26Z timfox $
+ */
+public class DLQTest extends JMSTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DLQTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testDefaultAndOverrideDLQ() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ final int NUM_MESSAGES = 5;
+
+ final int MAX_DELIVERIES = 8;
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ String testQueueObjectName = "jboss.messaging.destination:service=Queue,name=Queue1";
+
+ Connection conn = null;
+
+ try
+ {
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+ String overrideDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue3";
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+ conn = cf.createConnection();
+
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage) cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.recover();
+ }
+
+ //Prompt them to go to DLQ
+ cons.receive(100);
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times
+
+ checkEmpty(queue1);
+
+ //Now should be in default dlq
+
+ MessageConsumer cons3 = sess.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ conn.close();
+ }
+
+
+ {
+ //Now try with overriding the default dlq
+
+ conn = cf.createConnection();
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", overrideDLQObjectName);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage) cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.recover();
+ }
+
+ cons.receive(100);
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times
+
+ checkEmpty(queue1);
+
+ //Now should be in override dlq
+
+ MessageConsumer cons3 = sess.createConsumer(queue3);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+ }
+ }
+ finally
+ {
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=DLQ");
+
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "DLQ", "");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ public void testWithMessageListenerPersistent() throws Exception
+ {
+ testWithMessageListener(true);
+ }
+
+ public void testWithMessageListenerNonPersistent() throws Exception
+ {
+ testWithMessageListener(false);
+ }
+
+ public void testWithReceiveClientAckPersistent() throws Exception
+ {
+ this.testWithReceiveClientAck(true);
+ }
+
+ public void testWithReceiveClientAckNonPersistent() throws Exception
+ {
+ testWithReceiveClientAck(false);
+ }
+
+ public void testWithReceiveTransactionalPersistent() throws Exception
+ {
+ this.testWithReceiveTransactional(true);
+ }
+
+ public void testWithReceiveTransactionalNonPersistent() throws Exception
+ {
+ testWithReceiveTransactional(false);
+ }
+
+ public void testHeadersSet() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ int maxRedeliveryAttempts =
+ ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ Map origIds = new HashMap();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+
+ origIds.put(tm.getText(), tm.getJMSMessageID());
+ }
+
+ Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage) cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.rollback();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+ //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+ checkEmpty(queue2);
+
+ // So let's try and consume them - this should cause them to go to the DLQ - since they
+ // will then exceed max delivery attempts
+ Message m = cons.receive(100);
+
+ assertNull(m);
+
+ //All the messages should now be in the DLQ
+
+ MessageConsumer cons3 = sess.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+
+ // Check the headers
+ String origDest =
+ tm.getStringProperty(MessageImpl.HDR_ORIGIN_QUEUE.toString());
+
+ String origMessageId =
+ tm.getStringProperty(MessageImpl.HDR_ORIG_MESSAGE_ID.toString());
+
+ assertEquals(queue1.toString(), origDest);
+
+ String origId = (String) origIds.get(tm.getText());
+
+ assertEquals(origId, origMessageId);
+ }
+ }
+ finally
+ {
+ if (conn != null) conn.close();
+ }
+ }
+
+ public void testOverrideDefaultMaxDeliveryAttemptsForQueue() throws Exception
+ {
+ int md = getDefaultMaxDeliveryAttempts();
+ try
+ {
+ int maxDeliveryAttempts = md - 5;
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+ maxDeliveryAttempts);
+ testMaxDeliveryAttempts(queue1, maxDeliveryAttempts, true);
+ }
+ finally
+ {
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+ md);
+ }
+ }
+
+ public void testOverrideDefaultMaxDeliveryAttemptsForTopic() throws Exception
+ {
+ int md = getDefaultMaxDeliveryAttempts();
+ try
+ {
+ int maxDeliveryAttempts = md - 5;
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
+ maxDeliveryAttempts);
+
+ testMaxDeliveryAttempts(topic1, maxDeliveryAttempts, false);
+ }
+ finally
+ {
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+ md);
+ }
+ }
+
+ public void testUseDefaultMaxDeliveryAttemptsForQueue() throws Exception
+ {
+ int md = getDefaultMaxDeliveryAttempts();
+ try
+ {
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+ -1);
+
+ // Check that defaultMaxDeliveryAttempts takes effect
+ testMaxDeliveryAttempts(queue1, getDefaultMaxDeliveryAttempts(), true);
+ }
+ finally
+ {
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+ md);
+ }
+ }
+
+ public void testUseDefaultMaxDeliveryAttemptsForTopic() throws Exception
+ {
+ int md = getDefaultMaxDeliveryAttempts();
+ try
+ {
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Topic,name=Topic1"),
+ -1);
+
+ // Check that defaultMaxDeliveryAttempts takes effect
+ testMaxDeliveryAttempts(topic1, getDefaultMaxDeliveryAttempts(), false);
+ }
+ finally
+ {
+ setMaxDeliveryAttempts(
+ new ObjectName("jboss.messaging.destination:service=Queue,name=Queue1"),
+ md);
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void testWithMessageListener(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ int maxRedeliveryAttempts =
+ ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ FailingMessageListener listener = new FailingMessageListener(MAX_DELIVERIES * NUM_MESSAGES);
+
+ cons.setMessageListener(listener);
+
+ conn.start();
+
+ listener.waitForMessages();
+
+ assertEquals(MAX_DELIVERIES * NUM_MESSAGES, listener.deliveryCount);
+
+ //Message should all be in the dlq - let's check
+
+ MessageConsumer cons2 = sess.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ log.info("Got mnessage" + tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ checkEmpty(queue1);
+ }
+ finally
+ {
+ if (conn != null) conn.close();
+ }
+ }
+
+
+ protected void testWithReceiveClientAck(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ int maxRedeliveryAttempts =
+ ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage) cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.recover();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+ //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+ checkEmpty(queue2);
+
+ //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+ //delivery attempts
+
+ Message m = cons.receive(100);
+
+ assertNull(m);
+
+ //Now, all the messages should now be in the DLQ
+
+ MessageConsumer cons3 = sess.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ //No more should be available
+
+ cons.close();
+
+ checkEmpty(queue1);
+ }
+ finally
+ {
+ destroyQueue("DLQ");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+ protected void testWithReceiveTransactional(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ final int MAX_DELIVERIES = 16;
+
+ final int NUM_MESSAGES = 5;
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts", String.valueOf(MAX_DELIVERIES));
+
+ String defaultDLQObjectName = "jboss.messaging.destination:service=Queue,name=Queue2";
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultDLQ", defaultDLQObjectName);
+
+ int maxRedeliveryAttempts =
+ ((Integer) ServerManagement.getAttribute(serverPeerObjectName, "DefaultMaxDeliveryAttempts")).intValue();
+
+ assertEquals(MAX_DELIVERIES, maxRedeliveryAttempts);
+
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < MAX_DELIVERIES; i++)
+ {
+ for (int j = 0; j < NUM_MESSAGES; j++)
+ {
+ TextMessage tm = (TextMessage) cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + j, tm.getText());
+ }
+
+ sess2.rollback();
+ }
+
+ //At this point all the messages have been delivered exactly MAX_DELIVERIES times - this is ok
+ //they haven't exceeded max delivery attempts so shouldn't be in the DLQ - let's check
+
+ checkEmpty(queue2);
+
+ //So let's try and consume them - this should cause them to go to the DLQ - since they will then exceed max
+ //delivery attempts
+ Message m = cons.receive(100);
+
+ assertNull(m);
+
+ //All the messages should now be in the DLQ
+
+ MessageConsumer cons3 = sess.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ //No more should be available
+
+ checkEmpty(queue1);
+ }
+ finally
+ {
+ destroyQueue("DLQ");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+ protected int getDefaultMaxDeliveryAttempts() throws Exception
+ {
+ return ((Integer) ServerManagement.getAttribute(
+ ServerManagement.getServerPeerObjectName(),
+ "DefaultMaxDeliveryAttempts"))
+ .intValue();
+ }
+
+ protected void setMaxDeliveryAttempts(ObjectName dest, int maxDeliveryAttempts) throws Exception
+ {
+ ServerManagement.setAttribute(dest, "MaxDeliveryAttempts",
+ Integer.toString(maxDeliveryAttempts));
+ }
+
+ protected void testMaxDeliveryAttempts(Destination destination, int destMaxDeliveryAttempts, boolean queue) throws Exception
+ {
+ Connection conn = cf.createConnection();
+
+ if (!queue)
+ {
+ conn.setClientID("wib123");
+ }
+
+ try
+ {
+ ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(),
+ "DefaultDLQ", "jboss.messaging.destination:service=Queue,name=Queue2");
+
+ // Create the consumer before the producer so that the message we send doesn't
+ // get lost if the destination is a Topic.
+ Session consumingSession = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer destinationConsumer;
+
+ if (queue)
+ {
+ destinationConsumer = consumingSession.createConsumer(destination);
+ }
+ else
+ {
+ //For topics we only keep a delivery record on the server side for durable subs
+ destinationConsumer = consumingSession.createDurableSubscriber((Topic) destination, "testsub1");
+ }
+
+ {
+ Session producingSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = producingSession.createProducer(destination);
+ TextMessage tm = producingSession.createTextMessage("Message");
+ prod.send(tm);
+ }
+
+ conn.start();
+
+ // Make delivery attempts up to the maximum. The message should not end up in the DLQ.
+ for (int i = 0; i < destMaxDeliveryAttempts; i++)
+ {
+ TextMessage tm = (TextMessage) destinationConsumer.receive(1000);
+ assertNotNull("No message received on delivery attempt number " + (i + 1), tm);
+ assertEquals("Message", tm.getText());
+ consumingSession.recover();
+ }
+
+ // At this point the message should not yet be in the DLQ
+ checkEmpty(queue2);
+
+ // Now we try to consume the message again from the destination, which causes it
+ // to go to the DLQ instead.
+ Message m = destinationConsumer.receive(100);
+ assertNull(m);
+
+ // The message should be in the DLQ now
+ MessageConsumer dlqConsumer = consumingSession.createConsumer(queue2);
+ m = dlqConsumer.receive(1000);
+ assertNotNull(m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Message", ((TextMessage) m).getText());
+
+ m.acknowledge();
+
+ if (!queue)
+ {
+ destinationConsumer.close();
+
+ consumingSession.unsubscribe("testsub1");
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class FailingMessageListener implements MessageListener
+ {
+ volatile int deliveryCount;
+
+ int numMessages;
+
+ FailingMessageListener(int numMessages)
+ {
+ this.numMessages = numMessages;
+ }
+
+ synchronized void waitForMessages() throws Exception
+ {
+ while (deliveryCount != numMessages)
+ {
+ this.wait();
+ }
+ }
+
+ public synchronized void onMessage(Message msg)
+ {
+ deliveryCount++;
+
+ this.notify();
+
+ throw new RuntimeException("Your mum!");
+ }
+
+ }
+
+}
Added: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java (rev 0)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ExpiryQueueTest.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -0,0 +1,621 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.client.JBossMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * A ExpiryQueueTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 3614 $</tt>
+ * <p/>
+ * $Id: ExpiryQueueTest.java 3614 2008-01-22 16:41:40Z timfox $
+ */
+public class ExpiryQueueTest extends JMSTestCase
+{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ExpiryQueueTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------
+
+ public void testExpiryQueueAlreadyDeployed() throws Exception
+ {
+ if (ServerManagement.isRemote())
+ {
+ return;
+ }
+
+ //deployQueue("ExpiryQueue");
+
+ /*ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ObjectName expiryQueueObjectName = (ObjectName)ServerManagement.getAttribute(serverPeerObjectName, "DefaultExpiryQueue");
+
+ assertNotNull(expiryQueueObjectName);
+
+ String name = (String)ServerManagement.getAttribute(expiryQueueObjectName, "Name");
+
+ assertNotNull(name);
+
+ assertEquals("ExpiryQueue", name);
+
+ String jndiName = (String)ServerManagement.getAttribute(expiryQueueObjectName, "JNDIName");
+
+ assertNotNull(jndiName);
+
+ assertEquals("/queue/ExpiryQueue", jndiName);
+
+ org.jboss.messaging.core.contract.Queue expiryQueue = ServerManagement.getServer().getServerPeer().getDefaultExpiryQueueInstance();
+
+ assertNotNull(expiryQueue);*/
+
+ JBossQueue q = (JBossQueue) ic.lookup("/queue/ExpiryQueue");
+
+ assertNotNull(q);
+
+ assertEquals("ExpiryQueue", q.getName());
+
+ }
+
+
+ public void testDefaultAndOverrideExpiryQueue() throws Exception
+ {
+ final int NUM_MESSAGES = 5;
+
+ Connection conn = null;
+
+ //ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ try
+ {
+ createQueue("DefaultExpiry");
+
+ createQueue("OverrideExpiry");
+
+ createQueue("expTestQueue");
+
+ String defaultExpiryObjectName = "DefaultExpiry";
+
+ String overrideExpiryObjectName = "OverrideExpiry";
+
+ String testQueueObjectName = "expTestQueue";
+
+ //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+ //ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", "");
+
+ Queue testQueue = (Queue) ic.lookup("/queue/expTestQueue");
+
+ Queue defaultExpiry = (Queue) ic.lookup("/queue/DefaultExpiry");
+
+ Queue overrideExpiry = (Queue) ic.lookup("/queue/OverrideExpiry");
+
+ conn = cf.createConnection();
+
+ {
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now try and receive
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the default expiry queue - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(defaultExpiry);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ conn.close();
+ }
+
+ //now try with overriding the default expiry queue
+ {
+ ServerManagement.setAttribute(new ObjectName(testQueueObjectName), "ExpiryQueue", overrideExpiryObjectName);
+
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(testQueue);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 2000);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(testQueue);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now try and receive
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the override expiry queue - let's check
+
+ MessageConsumer cons3 = sess.createConsumer(overrideExpiry);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons3.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+ }
+ }
+ finally
+ { //
+ //ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+ destroyQueue("DefaultDLQ");
+
+ destroyQueue("OverrideDLQ");
+
+ destroyQueue("TestQueue");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testExpireSameMessagesMultiple() throws Exception
+ {
+ final int NUM_MESSAGES = 5;
+
+ Connection conn = null;
+
+ try
+ {
+ createQueue("ExpiryQueue");
+
+ String defaultExpiryObjectName = "jboss.messaging.destination:service=Queue,name=ExpiryQueue";
+
+ ObjectName serverPeerObjectName = ServerManagement.getServerPeerObjectName();
+
+ ServerManagement.setAttribute(serverPeerObjectName, "DefaultExpiryQueue", defaultExpiryObjectName);
+
+ Queue defaultExpiry = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+ conn = cf.createConnection();
+
+ conn.setClientID("wib1");
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(topic1);
+
+ conn.start();
+
+ //Create 3 durable subscriptions
+
+ MessageConsumer sub1 = sess.createDurableSubscriber(topic1, "sub1");
+
+ MessageConsumer sub2 = sess.createDurableSubscriber(topic1, "sub2");
+
+ MessageConsumer sub3 = sess.createDurableSubscriber(topic1, "sub3");
+
+ Map origIds = new HashMap();
+
+ long now = System.currentTimeMillis();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 3000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, DeliveryMode.PERSISTENT, 4, 3000);
+
+ origIds.put(tm.getText(), tm.getJMSMessageID());
+ }
+
+ long approxExpiry = now + 3000;
+
+ //Now sleep. This wil give them enough time to expire
+
+ Thread.sleep(3500);
+
+ //Now try and consume from each - this should force the message to the expiry queue
+
+ Message m = sub1.receive(500);
+ assertNull(m);
+
+ m = sub2.receive(500);
+ assertNull(m);
+
+ m = sub3.receive(500);
+ assertNull(m);
+
+ //Now the messages should all be in the expiry queue
+
+ MessageConsumer cons2 = sess.createConsumer(defaultExpiry);
+
+ while (true)
+ {
+ TextMessage tm = (TextMessage) cons2.receive(500);
+
+ if (tm == null)
+ {
+ break;
+ }
+
+ // Check the headers
+ String origDest =
+ tm.getStringProperty(MessageImpl.HDR_ORIGIN_QUEUE.toString());
+
+ String origMessageId =
+ tm.getStringProperty(MessageImpl.HDR_ORIG_MESSAGE_ID.toString());
+
+ long actualExpiryTime =
+ tm.getLongProperty(MessageImpl.HDR_ACTUAL_EXPIRY_TIME.toString());
+
+ assertEquals(topic1.toString(), origDest);
+
+ String origId = (String) origIds.get(tm.getText());
+
+ assertEquals(origId, origMessageId);
+
+ assertTrue(actualExpiryTime >= approxExpiry);
+ }
+
+ cons2.close();
+
+ sub1.close();
+
+ sub2.close();
+
+ sub3.close();
+
+ sess.unsubscribe("sub1");
+
+ sess.unsubscribe("sub2");
+
+ sess.unsubscribe("sub3");
+
+ }
+ finally
+ {
+ destroyQueue("ExpiryQueue");
+
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testWithMessageListenerPersistent() throws Exception
+ {
+ testWithMessageListener(true);
+ }
+
+ public void testWithMessageListenerNonPersistent() throws Exception
+ {
+ testWithMessageListener(false);
+ }
+
+ public void testWithReceivePersistent() throws Exception
+ {
+ this.testWithReceive(true);
+ }
+
+ public void testWithReceiveNonPersistent() throws Exception
+ {
+ testWithReceive(false);
+ }
+
+ public void testWithMessageListener(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+
+ Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+ final int NUM_MESSAGES = 5;
+
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, deliveryMode, 4, 2000);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now set a listener
+
+ FailingMessageListener listener = new FailingMessageListener();
+
+ cons.setMessageListener(listener);
+
+ Thread.sleep(1000);
+
+ cons.setMessageListener(null);
+
+ //No messages should have been received
+ assertEquals(0, listener.deliveryCount);
+
+ //Shouldn't be able to receive any more
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the expiry queue - let's check
+
+ MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ }
+
+ public void testWithReceive(boolean persistent) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+
+ createQueue("ExpiryQueue");
+
+ Queue expiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+ final int NUM_MESSAGES = 5;
+
+ conn = cf.createConnection();
+
+ conn.start();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ int deliveryMode = persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("Message:" + i);
+
+ //Send messages with time to live of 2000 enough time to get to client consumer - so
+ //they won't be expired on the server side
+ prod.send(tm, deliveryMode, 4, 2000);
+ }
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ //The messages should now be sitting in the consumer buffer
+
+ //Now give them enough time to expire
+
+ Thread.sleep(2500);
+
+ //Now try and receive
+
+ Message m = cons.receive(1000);
+
+ assertNull(m);
+
+ //Message should all be in the expiry queue - let's check
+
+ MessageConsumer cons2 = sess.createConsumer(expiryQueue);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage) cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message:" + i, tm.getText());
+ }
+
+ }
+ finally
+ {
+ destroyQueue("ExpiryQueue");
+
+ if (conn != null) conn.close();
+ }
+ }
+
+ public void testExpirationTransfer() throws Exception
+ {
+ createQueue("ExpiryQueue");
+
+ Object originalValue = ServerManagement.getAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue");
+
+ ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", "jboss.messaging.destination:service=Queue,name=ExpiryQueue");
+
+ Connection conn = null;
+
+ try
+ {
+ ConnectionFactory cf = (ConnectionFactory) ic.lookup("/ConnectionFactory");
+
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ MessageProducer prod = session.createProducer(queue1);
+ prod.setTimeToLive(100);
+
+ Message m = session.createTextMessage("This message will die");
+
+ prod.send(m);
+
+ // wait for the message to die
+ Thread.sleep(2000);
+
+ MessageConsumer cons = session.createConsumer(queue1);
+
+ assertNull(cons.receive(3000));
+
+ Queue queueExpiryQueue = (Queue) ic.lookup("/queue/ExpiryQueue");
+
+ MessageConsumer consumerExpiredQueue = session.createConsumer(queueExpiryQueue);
+
+ TextMessage txt = (TextMessage) consumerExpiredQueue.receive(1000);
+
+ assertEquals("This message will die", txt.getText());
+
+ assertNull(consumerExpiredQueue.receive(1000));
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+
+ destroyQueue("ExpiryQueue");
+
+ ServerManagement.setAttribute(ServerManagement.getServerPeerObjectName(), "DefaultExpiryQueue", originalValue.toString());
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class FailingMessageListener implements MessageListener
+ {
+ volatile int deliveryCount;
+
+ public void onMessage(Message msg)
+ {
+ deliveryCount++;
+
+ throw new RuntimeException("Your mum!");
+ }
+
+ }
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java 2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverNoSessionsFailoverTest.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -33,13 +33,10 @@
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.MessagingService;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java 2008-11-07 11:14:05 UTC (rev 5307)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -40,7 +40,6 @@
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
-import org.jboss.messaging.util.SimpleString;
/**
*
@@ -62,8 +61,6 @@
// Attributes ----------------------------------------------------
- private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
private MessagingService liveService;
private MessagingService backupService;
Added: trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/cluster/JMSFailoverTest.java 2008-11-07 11:39:43 UTC (rev 5308)
@@ -0,0 +1,330 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ *
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.jms.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.JBossQueue;
+import org.jboss.messaging.jms.client.JBossConnectionFactory;
+import org.jboss.messaging.jms.client.JBossSession;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ *
+ * A JMSFailoverTest
+ *
+ * A simple test to test failover when using the JMS API.
+ * Most of the failover tests are done on the Core API.
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ * Created 7 Nov 2008 11:13:39
+ *
+ *
+ */
+public class JMSFailoverTest extends TestCase
+{
+ private static final Logger log = Logger.getLogger(JMSFailoverTest.class);
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAutomaticFailover() throws Exception
+ {
+ JBossConnectionFactory jbcf = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams),
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ null,
+ 1000,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID,
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+
+ Connection conn = jbcf.createConnection();
+
+ MyExceptionListener listener = new MyExceptionListener();
+
+ conn.setExceptionListener(listener);
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSession = ((JBossSession)sess).getCoreSession();
+
+ RemotingConnection coreConn = ((ClientSessionImpl)coreSession).getConnection();
+
+ SimpleString jmsQueueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+
+ coreSession.createQueue(jmsQueueName, jmsQueueName, null, false, false);
+
+ Queue queue = sess.createQueue("myqueue");
+
+ final int numMessages = 1000;
+
+ MessageProducer producer = sess.createProducer(queue);
+
+ MessageConsumer consumer = sess.createConsumer(queue);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+ producer.send(tm);
+ }
+
+ conn.start();
+
+ MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED);
+
+ coreConn.fail(me);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = (TextMessage)consumer.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ TextMessage tm = (TextMessage)consumer.receive(1000);
+
+ assertNull(tm);
+
+ conn.close();
+
+ assertNotNull(listener.e);
+
+ JMSException je = listener.e;
+
+ assertEquals(me, je.getCause());
+ }
+
+ public void testManualFailover() throws Exception
+ {
+ JBossConnectionFactory jbcfLive = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ null,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ null,
+ 1000,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ true,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID,
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+
+ JBossConnectionFactory jbcfBackup = new JBossConnectionFactory(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams),
+ null,
+ ClientSessionFactoryImpl.DEFAULT_PING_PERIOD,
+ ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+ null,
+ 1000,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE,
+ ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP_ID,
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
+
+ Connection connLive = jbcfLive.createConnection();
+
+ MyExceptionListener listener = new MyExceptionListener();
+
+ connLive.setExceptionListener(listener);
+
+ Session sessLive = connLive.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ ClientSession coreSessionLive = ((JBossSession)sessLive).getCoreSession();
+
+ RemotingConnection coreConnLive = ((ClientSessionImpl)coreSessionLive).getConnection();
+
+ SimpleString jmsQueueName = new SimpleString(JBossQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+
+ coreSessionLive.createQueue(jmsQueueName, jmsQueueName, null, false, false);
+
+ Queue queue = sessLive.createQueue("myqueue");
+
+ final int numMessages = 1000;
+
+ MessageProducer producerLive = sessLive.createProducer(queue);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sessLive.createTextMessage("message" + i);
+
+ producerLive.send(tm);
+ }
+
+ // Note we block on NP send to make sure all messages get to server before failover
+
+ MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED);
+
+ coreConnLive.fail(me);
+
+ assertNotNull(listener.e);
+
+ JMSException je = listener.e;
+
+ assertEquals(me, je.getCause());
+
+ connLive.close();
+
+ // Now recreate on backup
+
+ Connection connBackup = jbcfBackup.createConnection();
+
+ log.info("creating session on backup");
+ Session sessBackup = connBackup.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ log.info("created on backup");
+
+ MessageConsumer consumerBackup = sessBackup.createConsumer(queue);
+
+ connBackup.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = (TextMessage)consumerBackup.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ TextMessage tm = (TextMessage)consumerBackup.receive(1000);
+
+ assertNull(tm);
+
+ connBackup.close();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+ backupParams));
+ backupConf.setBackup(true);
+ backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+ liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+ liveService.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ private static class MyExceptionListener implements ExceptionListener
+ {
+ volatile JMSException e;
+
+ public void onException(final JMSException e)
+ {
+ this.e = e;
+ }
+ }
+
+}
More information about the jboss-cvs-commits
mailing list