[jboss-cvs] JBoss Messaging SVN: r6010 - in trunk/tests: src/org/jboss/messaging/tests/integration/scheduling and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 5 10:25:22 EST 2009
Author: jmesnil
Date: 2009-03-05 10:25:22 -0500 (Thu, 05 Mar 2009)
New Revision: 6010
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/DelayedMessageTest.java
Removed:
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
moved JMS ScheduledDelivery tests to integration's DelayedMessageTest & ScheduledMessageTest
Deleted: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2009-03-05 15:24:40 UTC (rev 6009)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2009-03-05 15:25:22 UTC (rev 6010)
@@ -1,541 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.messaging.core.message.impl.MessageImpl;
-
-
-/**
- *
- * A ScheduledDeliveryTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- *
- * $Id$
- *
- */
-public class ScheduledDeliveryTest extends JMSTestCase
-{
- // Constants -----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public ScheduledDeliveryTest(String name)
- {
- super(name);
- }
-
- // Public --------------------------------------------------------
-
- public void testScheduledDeliveryTX() throws Exception
- {
- scheduledDelivery(true);
- }
-
- public void testScheduledDeliveryNoTX() throws Exception
- {
- scheduledDelivery(false);
- }
-
- public void testScheduledDeliveryWithRestart() throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue1 = (Queue) getInitialContext().lookup("/queue/testQueue");
- MessageProducer prod = sess.createProducer(queue1);
-
- //Send one scheduled
-
- long now = System.currentTimeMillis();
-
- TextMessage tm1 = sess.createTextMessage("testScheduled1");
- tm1.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 29000);
- prod.send(tm1);
-
- //First send some non scheduled messages
-
- TextMessage tm2 = sess.createTextMessage("testScheduled2");
- prod.send(tm2);
-
- TextMessage tm3 = sess.createTextMessage("testScheduled3");
- prod.send(tm3);
-
- TextMessage tm4 = sess.createTextMessage("testScheduled4");
- prod.send(tm4);
-
-
- //Now send some more scheduled messages
-
- //These numbers have to be large with Hudson, since restart can take some time
-
- TextMessage tm5 = sess.createTextMessage("testScheduled5");
- tm5.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 27000);
- prod.send(tm5);
-
- TextMessage tm6 = sess.createTextMessage("testScheduled6");
- tm6.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 26000);
- prod.send(tm6);
-
- TextMessage tm7 = sess.createTextMessage("testScheduled7");
- tm7.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 25000);
- prod.send(tm7);
-
- TextMessage tm8 = sess.createTextMessage("testScheduled8");
- tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 28000);
- prod.send(tm8);
-
- //And one scheduled with a -ve number
-
- TextMessage tm9 = sess.createTextMessage("testScheduled9");
- tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), -3);
- prod.send(tm9);
-
- //Now stop the server and restart it
-
- conn.close();
-
- stop();
-
- start();
-
- // Messaging server restart implies new ConnectionFactory lookup
- deployAndLookupAdministeredObjects();
-
- conn = cf.createConnection();
-
- conn.start();
-
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- forceGC();
-
- //First the non scheduled messages should be received
-
- TextMessage rm1 = (TextMessage)cons.receive(250);
- assertNotNull(rm1);
- assertEquals(tm2.getText(), rm1.getText());
-
- TextMessage rm2 = (TextMessage)cons.receive(250);
- assertNotNull(rm2);
- assertEquals(tm3.getText(), rm2.getText());
-
- TextMessage rm3 = (TextMessage)cons.receive(250);
- assertNotNull(rm3);
- assertEquals(tm4.getText(), rm3.getText());
-
- //Now the one with a scheduled with a -ve number
- TextMessage rm5 = (TextMessage)cons.receive(250);
- assertNotNull(rm5);
- assertEquals(tm9.getText(), rm5.getText());
-
- //Now the scheduled
- TextMessage rm6 = (TextMessage)cons.receive(25500);
- assertNotNull(rm6);
- assertEquals(tm7.getText(), rm6.getText());
-
- long now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 3000);
-
-
- TextMessage rm7 = (TextMessage)cons.receive(26500);
- assertNotNull(rm7);
- assertEquals(tm6.getText(), rm7.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 4000);
-
-
- TextMessage rm8 = (TextMessage)cons.receive(27500);
- assertNotNull(rm8);
- assertEquals(tm5.getText(), rm8.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 5000);
-
-
- TextMessage rm9 = (TextMessage)cons.receive(28500);
- assertNotNull(rm9);
- assertEquals(tm8.getText(), rm9.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 6000);
-
-
- TextMessage rm10 = (TextMessage)cons.receive(29500);
- assertNotNull(rm10);
- assertEquals(tm1.getText(), rm10.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 7000);
- }
- finally
- {
- removeAllMessages("testQueue", true);
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- public void testDelayedRedelivery() throws Exception
- {
- String qName = "testDelayedRedeliveryDefaultQ";
- try
- {
- long delay = 3000;
- addAddressSettings(qName, delay);
- createQueue(qName);
- queue1 = (Queue) getInitialContext().lookup("/queue/" + qName);
-
-
- this.delayedRedeliveryDefaultOnClose(delay);
-
- this.delayedRedeliveryDefaultOnRollback(delay);
- }
- finally
- {
- removeAddressSettings(qName);
- this.destroyQueue(qName);
- }
- }
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void delayedRedeliveryDefaultOnClose(long delay) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- final int NUM_MESSAGES = 5;
-
- forceGC();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(500);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- //Now close the session
- //This should cancel back to the queue with a delayed redelivery
-
- long now = System.currentTimeMillis();
-
- sess2.close();
-
- Session sess3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageConsumer cons2 = sess3.createConsumer(queue1);
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons2.receive(delay + 1000);
-
- assertNotNull(tm);
-
- long time = System.currentTimeMillis();
-
- assertTrue(time - now >= delay);
-
- //Hudson can introduce a large degree of indeterminism
- assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
- }
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- private void delayedRedeliveryDefaultOnRollback(long delay) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- final int NUM_MESSAGES = 5;
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = sess.createTextMessage("message" + i);
-
- prod.send(tm);
- }
-
- Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
- MessageConsumer cons = sess2.createConsumer(queue1);
-
- conn.start();
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(500);
-
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
- }
-
- //Now rollback
-
- long now = System.currentTimeMillis();
-
- sess2.rollback();
-
- //This should redeliver with a delayed redelivery
-
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- TextMessage tm = (TextMessage)cons.receive(delay + 1000);
-
- assertNotNull(tm);
-
- long time = System.currentTimeMillis();
-
- assertTrue(time - now >= delay);
-
- //Hudson can introduce a large degree of indeterminism
- assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
- }
-
- sess2.commit();
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- private void scheduledDelivery(boolean tx) throws Exception
- {
- Connection conn = null;
-
- try
- {
- conn = cf.createConnection();
-
- Session sess = conn.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-
- MessageProducer prod = sess.createProducer(queue1);
-
- MessageConsumer cons = sess.createConsumer(queue1);
-
- conn.start();
-
- //Send one scheduled
-
- long now = System.currentTimeMillis();
-
- TextMessage tm1 = sess.createTextMessage("testScheduled1");
- tm1.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 7000);
- prod.send(tm1);
-
- //First send some non scheduled messages
-
- TextMessage tm2 = sess.createTextMessage("testScheduled2");
- prod.send(tm2);
-
- TextMessage tm3 = sess.createTextMessage("testScheduled3");
- prod.send(tm3);
-
- TextMessage tm4 = sess.createTextMessage("testScheduled4");
- prod.send(tm4);
-
-
- //Now send some more scheduled messages
-
- TextMessage tm5 = sess.createTextMessage("testScheduled5");
- tm5.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 5000);
- prod.send(tm5);
-
- TextMessage tm6 = sess.createTextMessage("testScheduled6");
- tm6.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 4000);
- prod.send(tm6);
-
- TextMessage tm7 = sess.createTextMessage("testScheduled7");
- tm7.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 3000);
- prod.send(tm7);
-
- TextMessage tm8 = sess.createTextMessage("testScheduled8");
- tm8.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), now + 6000);
- prod.send(tm8);
-
- //And one scheduled with a -ve number
-
- TextMessage tm9 = sess.createTextMessage("testScheduled9");
- tm9.setLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME.toString(), -3);
- prod.send(tm9);
-
- if (tx)
- {
- sess.commit();
- }
-
- //First the non scheduled messages should be received
- forceGC();
-
-
- TextMessage rm1 = (TextMessage)cons.receive(250);
- assertNotNull(rm1);
- assertEquals(tm2.getText(), rm1.getText());
-
- TextMessage rm2 = (TextMessage)cons.receive(250);
- assertNotNull(rm2);
- assertEquals(tm3.getText(), rm2.getText());
-
- TextMessage rm3 = (TextMessage)cons.receive(250);
- assertNotNull(rm3);
- assertEquals(tm4.getText(), rm3.getText());
-
- //Now the one with a scheduled with a -ve number
- TextMessage rm5 = (TextMessage)cons.receive(250);
- assertNotNull(rm5);
- assertEquals(tm9.getText(), rm5.getText());
-
- //Now the scheduled
- TextMessage rm6 = (TextMessage)cons.receive(3250);
- assertNotNull(rm6);
- assertEquals(tm7.getText(), rm6.getText());
-
- long now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 3000);
-
-
- TextMessage rm7 = (TextMessage)cons.receive(1250);
- assertNotNull(rm7);
- assertEquals(tm6.getText(), rm7.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 4000);
-
-
- TextMessage rm8 = (TextMessage)cons.receive(1250);
- assertNotNull(rm8);
- assertEquals(tm5.getText(), rm8.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 5000);
-
-
- TextMessage rm9 = (TextMessage)cons.receive(1250);
- assertNotNull(rm9);
- assertEquals(tm8.getText(), rm9.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 6000);
-
-
- TextMessage rm10 = (TextMessage)cons.receive(1250);
- assertNotNull(rm10);
- assertEquals(tm1.getText(), rm10.getText());
-
- now2 = System.currentTimeMillis();
-
- assertTrue(now2 - now >= 7000);
-
- if (tx)
- {
- sess.commit();
- }
- }
- finally
- {
- if (conn != null)
- {
- conn.close();
- }
- }
- }
-
- // Inner classes -------------------------------------------------
-
-}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/DelayedMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/DelayedMessageTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/DelayedMessageTest.java 2009-03-05 15:25:22 UTC (rev 6010)
@@ -0,0 +1,281 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.scheduling;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class DelayedMessageTest extends ServiceTestBase
+{
+ private static final Logger log = Logger.getLogger(DelayedMessageTest.class);
+
+ private Configuration configuration;
+
+ private MessagingService messagingService;
+
+ private static final long DELAY = 3000;
+
+ private final String qName = "DelayedMessageTestQueue";
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ clearData();
+ configuration = createDefaultConfig();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.setPagingMaxGlobalSizeBytes(-1);
+ messagingService = createService(true, configuration);
+ messagingService.start();
+
+ AddressSettings qs = messagingService.getServer().getAddressSettingsRepository().getMatch("*");
+ AddressSettings newSets = new AddressSettings();
+ newSets.setRedeliveryDelay(DELAY);
+ newSets.merge(qs);
+ messagingService.getServer().getAddressSettingsRepository().addMatch(qName, newSets);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (messagingService != null)
+ {
+ try
+ {
+ messagingService.getServer().getAddressSettingsRepository().removeMatch(qName);
+
+ messagingService.stop();
+ messagingService = null;
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ }
+ super.tearDown();
+ }
+
+ public void testDelayedRedeliveryDefaultOnClose() throws Exception
+ {
+ ClientSessionFactory sessionFactory = createInVMFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+
+ session.createQueue(qName, qName, null, true, false);
+ session.close();
+
+ ClientSession session1 = sessionFactory.createSession(false, true, true);
+ ClientProducer producer = session1.createProducer(qName);
+
+ final int NUM_MESSAGES = 5;
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = this.createDurableMessage(session1, "message" + i);
+ producer.send(tm);
+ }
+
+ session1.close();
+
+ ClientSession session2 = sessionFactory.createSession(false, false, false);
+
+ ClientConsumer consumer2 = session2.createConsumer(qName);
+ session2.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getBody().readString());
+ }
+
+ //Now close the session
+ //This should cancel back to the queue with a delayed redelivery
+
+ long now = System.currentTimeMillis();
+
+ session2.close();
+
+ ClientSession session3 = sessionFactory.createSession(false, false, false);
+
+ ClientConsumer consumer3 = session3.createConsumer(qName);
+ session3.start();
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer3.receive(DELAY + 1000);
+
+ assertNotNull(tm);
+
+ long time = System.currentTimeMillis();
+
+ assertTrue(time - now >= DELAY);
+
+ //Hudson can introduce a large degree of indeterminism
+ assertTrue((time - now) + ">" + (DELAY + 1000), time - now < DELAY + 1000);
+ }
+
+ session3.commit();
+ session3.close();
+
+ sessionFactory.close();
+ }
+
+ public void testDelayedRedeliveryDefaultOnRollback() throws Exception
+ {
+ ClientSessionFactory sessionFactory = createInVMFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+
+
+ session.createQueue(qName, qName, null, true, false);
+ session.close();
+
+ ClientSession session1 = sessionFactory.createSession(false, true, true);
+ ClientProducer producer = session1.createProducer(qName);
+
+ final int NUM_MESSAGES = 5;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = this.createDurableMessage(session1, "message" + i);
+ producer.send(tm);
+ }
+ session1.close();
+
+ ClientSession session2 = sessionFactory.createSession(false, false, false);
+ ClientConsumer consumer2 = session2.createConsumer(qName);
+
+ session2.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(500);
+ assertNotNull(tm);
+ assertEquals("message" + i, tm.getBody().readString());
+ }
+
+ //Now rollback
+ long now = System.currentTimeMillis();
+
+ session2.rollback();
+
+ //This should redeliver with a delayed redelivery
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(DELAY + 1000);
+ assertNotNull(tm);
+
+ long time = System.currentTimeMillis();
+
+ assertTrue(time - now >= DELAY);
+
+ //Hudson can introduce a large degree of indeterminism
+ assertTrue((time - now) + ">" + (DELAY + 1000), time - now < DELAY + 1000);
+ }
+
+ session2.commit();
+ session2.close();
+
+ sessionFactory.close();
+ }
+
+ // Private -------------------------------------------------------
+
+
+ private void delayedRedeliveryDefaultOnRollback(ClientSessionFactory sessionFactory, String queueName, long delay) throws Exception
+ {
+ ClientSession session = sessionFactory.createSession(false, true, true);
+ ClientProducer producer = session.createProducer(queueName);
+
+ final int NUM_MESSAGES = 5;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = createDurableMessage(session, "message" + i);
+ producer.send(tm);
+ }
+ session.close();
+
+ ClientSession session2 = sessionFactory.createSession(false, false, false);
+ ClientConsumer consumer2 = session2.createConsumer(queueName);
+
+ session2.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(500);
+ assertNotNull(tm);
+ assertEquals("message" + i, tm.getBody().readString());
+ }
+
+ //Now rollback
+ long now = System.currentTimeMillis();
+
+ session2.rollback();
+
+ //This should redeliver with a delayed redelivery
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ ClientMessage tm = consumer2.receive(delay + 1000);
+ assertNotNull(tm);
+
+ long time = System.currentTimeMillis();
+
+ assertTrue(time - now >= delay);
+
+ //Hudson can introduce a large degree of indeterminism
+ assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
+ }
+
+ session2.commit();
+ session2.close();
+ }
+
+ private ClientMessage createDurableMessage(final ClientSession session, final String body)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ true,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.getBody().writeString(body);
+ return message;
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2009-03-05 15:24:40 UTC (rev 6009)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/scheduling/ScheduledMessageTest.java 2009-03-05 15:25:22 UTC (rev 6010)
@@ -145,7 +145,7 @@
ClientSession session = sessionFactory.createSession(false, true, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = createMessage(session, "m1");
+ ClientMessage message = createDurableMessage(session, "m1");
long time = System.currentTimeMillis();
time += 10000;
message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
@@ -182,7 +182,7 @@
session.createQueue(atestq, atestq, null, true, true);
session.createQueue(atestq, atestq2, null, true, true);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = createMessage(session, "m1");
+ ClientMessage message = createDurableMessage(session, "m1");
producer.send(message);
producer.close();
@@ -231,7 +231,7 @@
session.createQueue(atestq, atestq, null, true, true);
session.createQueue(atestq, atestq2, null, true, true);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = createMessage(session, "m1");
+ ClientMessage message = createDurableMessage(session, "m1");
producer.send(message);
producer.close();
@@ -338,11 +338,11 @@
ClientSession session = sessionFactory.createSession(false, true, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage m1 = createMessage(session, "m1");
- ClientMessage m2 = createMessage(session, "m2");
- ClientMessage m3 = createMessage(session, "m3");
- ClientMessage m4 = createMessage(session, "m4");
- ClientMessage m5 = createMessage(session, "m5");
+ ClientMessage m1 = createDurableMessage(session, "m1");
+ ClientMessage m2 = createDurableMessage(session, "m2");
+ ClientMessage m3 = createDurableMessage(session, "m3");
+ ClientMessage m4 = createDurableMessage(session, "m4");
+ ClientMessage m5 = createDurableMessage(session, "m5");
long time = System.currentTimeMillis();
time += 10000;
m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
@@ -418,11 +418,11 @@
ClientSession session = sessionFactory.createSession(false, true, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage m1 = createMessage(session, "m1");
- ClientMessage m2 = createMessage(session, "m2");
- ClientMessage m3 = createMessage(session, "m3");
- ClientMessage m4 = createMessage(session, "m4");
- ClientMessage m5 = createMessage(session, "m5");
+ ClientMessage m1 = createDurableMessage(session, "m1");
+ ClientMessage m2 = createDurableMessage(session, "m2");
+ ClientMessage m3 = createDurableMessage(session, "m3");
+ ClientMessage m4 = createDurableMessage(session, "m4");
+ ClientMessage m5 = createDurableMessage(session, "m5");
long time = System.currentTimeMillis();
time += 10000;
m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
@@ -499,11 +499,11 @@
ClientSession session = sessionFactory.createSession(false, true, false);
session.createQueue(atestq, atestq, null, true, true);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage m1 = createMessage(session, "m1");
- ClientMessage m2 = createMessage(session, "m2");
- ClientMessage m3 = createMessage(session, "m3");
- ClientMessage m4 = createMessage(session, "m4");
- ClientMessage m5 = createMessage(session, "m5");
+ ClientMessage m1 = createDurableMessage(session, "m1");
+ ClientMessage m2 = createDurableMessage(session, "m2");
+ ClientMessage m3 = createDurableMessage(session, "m3");
+ ClientMessage m4 = createDurableMessage(session, "m4");
+ ClientMessage m5 = createDurableMessage(session, "m5");
long time = System.currentTimeMillis();
time += 10000;
m1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, time);
@@ -574,13 +574,7 @@
session.createQueue(atestq, atestq, null, true, false);
session.start(xid, XAResource.TMNOFLAGS);
ClientProducer producer = session.createProducer(atestq);
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.getBody().writeString("testINVMCoreClient");
- message.setDurable(true);
+ ClientMessage message = createDurableMessage(session, "testINVMCoreClient");
Calendar cal = Calendar.getInstance();
cal.roll(Calendar.SECOND, 10);
message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, cal.getTimeInMillis());
@@ -620,16 +614,173 @@
assertNull(consumer.receive(1000));
session.close();
}
+
+ public void testScheduledDeliveryTX() throws Exception
+ {
+ scheduledDelivery(true);
+ }
- private ClientMessage createMessage(final ClientSession session, final String body)
+ public void testScheduledDeliveryNoTX() throws Exception
{
+ scheduledDelivery(false);
+ }
+
+ // Private -------------------------------------------------------
+
+ private void scheduledDelivery(boolean tx) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
+
+ ClientSessionFactory sessionFactory = createInVMFactory();
+ ClientSession session = sessionFactory.createSession(tx, false, false);
+ session.createQueue(atestq, atestq, null, true, false);
+ ClientProducer producer = session.createProducer(atestq);
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+ if (tx)
+ {
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ //Send one scheduled
+ long now = System.currentTimeMillis();
+
+ ClientMessage tm1 = createDurableMessage(session, "testScheduled1");
+ tm1.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, now + 7000);
+ producer.send(tm1);
+
+ //First send some non scheduled messages
+
+ ClientMessage tm2 = createDurableMessage(session, "testScheduled2");
+ producer.send(tm2);
+
+ ClientMessage tm3 = createDurableMessage(session, "testScheduled3");
+ producer.send(tm3);
+
+ ClientMessage tm4 = createDurableMessage(session, "testScheduled4");
+ producer.send(tm4);
+
+
+ //Now send some more scheduled messages
+
+ ClientMessage tm5 = createDurableMessage(session, "testScheduled5");
+ tm5.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, now + 5000);
+ producer.send(tm5);
+
+ ClientMessage tm6 = createDurableMessage(session, "testScheduled6");
+ tm6.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, now + 4000);
+ producer.send(tm6);
+
+ ClientMessage tm7 = createDurableMessage(session, "testScheduled7");
+ tm7.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, now + 3000);
+ producer.send(tm7);
+
+ ClientMessage tm8 = createDurableMessage(session, "testScheduled8");
+ tm8.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, now + 6000);
+ producer.send(tm8);
+
+ //And one scheduled with a -ve number
+
+ ClientMessage tm9 = createDurableMessage(session, "testScheduled9");
+ tm9.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, -3);
+ producer.send(tm9);
+
+ if (tx)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.commit(xid, true);
+ } else
+ {
+ session.commit();
+ }
+
+ //First the non scheduled messages should be received
+ forceGC();
+
+ if (tx)
+ {
+ session.start(xid, XAResource.TMNOFLAGS);
+ }
+
+ ClientMessage rm1 = consumer.receive(250);
+ assertNotNull(rm1);
+ assertEquals("testScheduled2", rm1.getBody().readString());
+
+ ClientMessage rm2 = consumer.receive(250);
+ assertNotNull(rm2);
+ assertEquals("testScheduled3", rm2.getBody().readString());
+
+ ClientMessage rm3 = consumer.receive(250);
+ assertNotNull(rm3);
+ assertEquals("testScheduled4", rm3.getBody().readString());
+
+ //Now the one with a scheduled with a -ve number
+ ClientMessage rm5 = consumer.receive(250);
+ assertNotNull(rm5);
+ assertEquals("testScheduled9", rm5.getBody().readString());
+
+ //Now the scheduled
+ ClientMessage rm6 = consumer.receive(3250);
+ assertNotNull(rm6);
+ assertEquals("testScheduled7", rm6.getBody().readString());
+
+ long now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 3000);
+
+ ClientMessage rm7 = consumer.receive(1250);
+ assertNotNull(rm7);
+ assertEquals("testScheduled6", rm7.getBody().readString());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 4000);
+
+ ClientMessage rm8 = consumer.receive(1250);
+ assertNotNull(rm8);
+ assertEquals("testScheduled5", rm8.getBody().readString());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 5000);
+
+ ClientMessage rm9 = consumer.receive(1250);
+ assertNotNull(rm9);
+ assertEquals("testScheduled8", rm9.getBody().readString());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 6000);
+
+ ClientMessage rm10 = consumer.receive(1250);
+ assertNotNull(rm10);
+ assertEquals("testScheduled1", rm10.getBody().readString());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 7000);
+
+ if (tx)
+ {
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ session.commit(xid, true);
+ }
+
+ session.close();
+ sessionFactory.close();
+ }
+
+ private ClientMessage createDurableMessage(final ClientSession session, final String body)
+ {
ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
+ true,
0,
System.currentTimeMillis(),
(byte)1);
message.getBody().writeString(body);
- message.setDurable(true);
return message;
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-03-05 15:24:40 UTC (rev 6009)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-03-05 15:25:22 UTC (rev 6010)
@@ -23,6 +23,7 @@
package org.jboss.messaging.tests.util;
import java.io.File;
+import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.Map;
@@ -71,6 +72,22 @@
// Static --------------------------------------------------------
private final Logger log = Logger.getLogger(this.getClass());
+ public static void forceGC()
+ {
+ WeakReference dumbReference = new WeakReference(new Object());
+ // A loopt that will wait GC, using the minimal time as possible
+ while (dumbReference.get() != null)
+ {
+ System.gc();
+ try
+ {
+ Thread.sleep(500);
+ } catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
More information about the jboss-cvs-commits
mailing list