[jboss-cvs] JBoss Messaging SVN: r5082 - in trunk: src/main/org/jboss/messaging/core/persistence/impl/journal and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 7 12:25:27 EDT 2008
Author: ataylor
Date: 2008-10-07 12:25:27 -0400 (Tue, 07 Oct 2008)
New Revision: 5082
Modified:
trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
reenabled redelivery delay and scheduled tests
Modified: trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/examples/jms/src/org/jboss/jms/example/ScheduledExample.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -63,7 +63,7 @@
log.info("current time " + df.format(cal.getTime()));
cal.roll(Calendar.SECOND, 5);
log.info("message scheduled for " + df.format(cal.getTime()));
- message.setLongProperty("JBM_SCHEDULED_DELIVERY_TIME", cal.getTimeInMillis());
+ message.setLongProperty("JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME", cal.getTimeInMillis());
log.info("sending message to queue");
producer.send(message);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -434,7 +434,7 @@
//for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
for (MessageReference ref : refs)
{
- ref.getQueue().removeReferenceWithID(record.id);
+ ref.getQueue().removeReferenceWithID(ref.getMessage().getMessageID());
ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
ref.getQueue().addScheduledDelivery(ref);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessageReferenceImpl.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -130,8 +130,9 @@
persistenceManager.updateDeliveryCount(this);
}
- int maxDeliveries = queueSettingsRepository.getMatch(
- queue.getName().toString()).getMaxDeliveryAttempts();
+ QueueSettings queueSettings = queueSettingsRepository.getMatch(
+ queue.getName().toString());
+ int maxDeliveries = queueSettings.getMaxDeliveryAttempts();
if (maxDeliveries > 0 && deliveryCount >= maxDeliveries)
{
@@ -142,6 +143,12 @@
}
else
{
+ long redeliveryDelay = queueSettings.getRedeliveryDelay();
+
+ if(redeliveryDelay > 0)
+ {
+ scheduledDeliveryTime = System.currentTimeMillis() + redeliveryDelay;
+ }
queue.referenceCancelled();
return true;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -185,9 +185,15 @@
MessageReference ref = iter.previous();
ServerMessage msg = ref.getMessage();
+ if(ref.getScheduledDeliveryTime() <= 0)
+ {
+ messageReferences.addFirst(ref, msg.getPriority());
+ }
+ else
+ {
+ addScheduledDelivery(ref);
+ }
- messageReferences.addFirst(ref, msg.getPriority());
-
checkWaiting(msg.getMessageID());
}
@@ -910,6 +916,7 @@
// TODO - need to replicate this so backup node also adds back to
// front of queue
+
addFirst(ref);
}
}
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossMessage.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -77,7 +77,7 @@
// Used when bridging a message
public static final String JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST = "JBM_BRIDGE_MSG_ID_LIST";
- public static final String JBM_SCHEDULED_DELIVERY_TIME = "JBM_SCHEDULED_DELIVERY_TIME";
+ public static final String JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME = "JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME";
public static final byte TYPE = 0;
@@ -385,7 +385,7 @@
public void setScheduledDeliveryTime(long scheduledDeliveryTime)
{
- message.putLongProperty(new SimpleString(JBM_SCHEDULED_DELIVERY_TIME), scheduledDeliveryTime);
+ message.putLongProperty(new SimpleString(JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME), scheduledDeliveryTime);
this.scheduledDeliveryTime = scheduledDeliveryTime;
}
@@ -832,7 +832,7 @@
{
Long l = new Long(value);
checkProperty(name, l);
- if(JBM_SCHEDULED_DELIVERY_TIME.equals(name))
+ if(JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME.equals(name))
{
scheduledDeliveryTime = l;
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/JBMServerTestCase.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -21,22 +21,6 @@
*/
package org.jboss.test.messaging;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import javax.jms.Queue;
-import javax.jms.Topic;
-import javax.management.ObjectName;
-import javax.naming.InitialContext;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.jms.JBossQueue;
@@ -49,6 +33,21 @@
import org.jboss.test.messaging.tools.container.Server;
import org.jboss.tm.TransactionManagerLocator;
+import javax.jms.Queue;
+import javax.jms.Topic;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
/**
* @author <a href="mailto:adrian at jboss.org">Adrian Brock</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -828,11 +827,16 @@
servers.get(0).configureSecurityForDestination(s, b, lockedConf);
}
- protected void setRedeliveryDelayOnDestination(String dest, boolean isQueue, long delay) throws Exception
+ protected void addQueueSettings(String name, long scheduledDeliveryTime)
{
- servers.get(0).setRedeliveryDelayOnDestination(dest, isQueue, delay);
+ servers.get(0).addQueueSettings("queuejms." + name, scheduledDeliveryTime);
}
+ protected void removeQueueSettings(String name)
+ {
+ servers.get(0).removeQueueSettings(name);
+ }
+
protected void kill(int i) throws Exception
{
log.info("Attempting to kill server " + i);
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ScheduledDeliveryTest.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -21,7 +21,16 @@
*/
package org.jboss.test.messaging.jms;
+import org.jboss.messaging.jms.client.JBossMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+
/**
*
* A ScheduledDeliveryTest
@@ -53,517 +62,483 @@
{
}
-// Uncomment when http://jira.jboss.org/jira/browse/JBMESSAGING-1206 is complete
-// public void testScheduledDeliveryTX() throws Exception
-// {
-// scheduledDelivery(true);
-// }
-//
-// public void testScheduledDeliveryNoTX() throws Exception
-// {
-// scheduledDelivery(false);
-// }
-//
-// public void testScheduledDeliveryWithRestart() throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-// Queue queue1 = (Queue) getInitialContext().lookup("/queue/testQueue");
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// //Send one scheduled
-//
-// long now = System.currentTimeMillis();
-//
-// TextMessage tm1 = sess.createTextMessage("testScheduled1");
-// tm1.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 7000);
-// prod.send(tm1);
-//
-// //First send some non scheduled messages
-//
-// TextMessage tm2 = sess.createTextMessage("testScheduled2");
-// prod.send(tm2);
-//
-// TextMessage tm3 = sess.createTextMessage("testScheduled3");
-// prod.send(tm3);
-//
-// TextMessage tm4 = sess.createTextMessage("testScheduled4");
-// prod.send(tm4);
-//
-//
-// //Now send some more scheduled messages
-//
-// TextMessage tm5 = sess.createTextMessage("testScheduled5");
-// tm5.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 5000);
-// prod.send(tm5);
-//
-// TextMessage tm6 = sess.createTextMessage("testScheduled6");
-// tm6.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 4000);
-// prod.send(tm6);
-//
-// TextMessage tm7 = sess.createTextMessage("testScheduled7");
-// tm7.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 3000);
-// prod.send(tm7);
-//
-// TextMessage tm8 = sess.createTextMessage("testScheduled8");
-// tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 6000);
-// prod.send(tm8);
-//
-// //And one scheduled with a -ve number
-//
-// TextMessage tm9 = sess.createTextMessage("testScheduled9");
-// tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, -3);
-// prod.send(tm9);
-//
-// //Now stop the server and restart it
-//
-// conn.close();
-//
-// stop();
-//
-// start();
-//
-// // Messaging server restart implies new ConnectionFactory lookup
-// deployAndLookupAdministeredObjects();
-//
-// conn = cf.createConnection();
-//
-// conn.start();
-//
-// sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// forceGC();
-//
-// //First the non scheduled messages should be received
-//
-// TextMessage rm1 = (TextMessage)cons.receive(250);
-// assertNotNull(rm1);
-// assertEquals(tm2.getText(), rm1.getText());
-//
-// TextMessage rm2 = (TextMessage)cons.receive(250);
-// assertNotNull(rm2);
-// assertEquals(tm3.getText(), rm2.getText());
-//
-// TextMessage rm3 = (TextMessage)cons.receive(250);
-// assertNotNull(rm3);
-// assertEquals(tm4.getText(), rm3.getText());
-//
-// //Now the one with a scheduled with a -ve number
-// TextMessage rm5 = (TextMessage)cons.receive(250);
-// assertNotNull(rm5);
-// assertEquals(tm9.getText(), rm5.getText());
-//
-// //Now the scheduled
-// TextMessage rm6 = (TextMessage)cons.receive(3250);
-// assertNotNull(rm6);
-// assertEquals(tm7.getText(), rm6.getText());
-//
-// long now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 3000);
-//
-//
-// TextMessage rm7 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm7);
-// assertEquals(tm6.getText(), rm7.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 4000);
-//
-//
-// TextMessage rm8 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm8);
-// assertEquals(tm5.getText(), rm8.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 5000);
-//
-//
-// TextMessage rm9 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm9);
-// assertEquals(tm8.getText(), rm9.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 6000);
-//
-//
-// TextMessage rm10 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm10);
-// assertEquals(tm1.getText(), rm10.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 7000);
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
-//
-// public void testDelayedRedeliveryDefault() throws Exception
-// {
-//
-// try
-// {
-//
-// setRedeliveryDelayOnDestination("Queue1", true, -1);
-//
-// long delay = 3000;
-//
-// setDefaultRedeliveryDelay(delay);
-// this.delayedRedeliveryDefaultOnClose(delay);
-//
-// this.delayedRedeliveryDefaultOnRollback(delay);
-// }
-// finally
-// {
-// setDefaultRedeliveryDelay(0);
-//
-// removeAllMessages(queue1.getQueueName(), true, 0);
-// }
-// }
-//
-// public void testDelayedRedeliveryOverride() throws Exception
-// {
-//
-// try
-// {
-// long delay = 6000;
-//
-// setRedeliveryDelayOnDestination("Queue1", true, delay);
-//
-// setDefaultRedeliveryDelay(3000);
-//
-// this.delayedRedeliveryDefaultOnClose(delay);
-//
-// this.delayedRedeliveryDefaultOnRollback(delay);
-// }
-// finally
-// {
-// setDefaultRedeliveryDelay(0);
-//
-// setRedeliveryDelayOnDestination("Queue1", true, -1);
-//
-// removeAllMessages(queue1.getQueueName(), true, 0);
-// }
-// }
-//
-//
-// // Package protected ---------------------------------------------
-//
-// // Protected -----------------------------------------------------
-//
-// protected void tearDown() throws Exception
-// {
-// super.tearDown();
-//
-// // Some tests here are changing this attribute.. what would affect tests later
-// // Instead of restart the ServerPeer I'm just restoring the default
-// setDefaultRedeliveryDelay(0);
-// }
-//
-// // Private -------------------------------------------------------
-//
-// private void delayedRedeliveryDefaultOnClose(long delay) throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// final int NUM_MESSAGES = 5;
-//
-// forceGC();
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message" + i);
-//
-// prod.send(tm);
-// }
-//
-// Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-//
-// MessageConsumer cons = sess2.createConsumer(queue1);
-//
-// conn.start();
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(500);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message" + i, tm.getText());
-// }
-//
-// //Now close the session
-// //This should cancel back to the queue with a delayed redelivery
-//
-// long now = System.currentTimeMillis();
-//
-// sess2.close();
-//
-// Session sess3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageConsumer cons2 = sess3.createConsumer(queue1);
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = (TextMessage)cons2.receive(delay + 1000);
-//
-// assertNotNull(tm);
-//
-// long time = System.currentTimeMillis();
-//
-// assertTrue(time - now >= delay);
-//
-// //Hudson can introduce a large degree of indeterminism
-// assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
-// }
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
-//
-// private void delayedRedeliveryDefaultOnRollback(long delay) throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// final int NUM_MESSAGES = 5;
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = sess.createTextMessage("message" + i);
-//
-// prod.send(tm);
-// }
-//
-// Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-// MessageConsumer cons = sess2.createConsumer(queue1);
-//
-// conn.start();
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(500);
-//
-// assertNotNull(tm);
-//
-// assertEquals("message" + i, tm.getText());
-// }
-//
-// //Now rollback
-//
-// long now = System.currentTimeMillis();
-//
-// sess2.rollback();
-//
-// //This should redeliver with a delayed redelivery
-//
-// for (int i = 0; i < NUM_MESSAGES; i++)
-// {
-// TextMessage tm = (TextMessage)cons.receive(delay + 1000);
-//
-// assertNotNull(tm);
-//
-// long time = System.currentTimeMillis();
-//
-// assertTrue(time - now >= delay);
-//
-// //Hudson can introduce a large degree of indeterminism
-// assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
-// }
-//
-// sess2.commit();
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
-//
-// private void scheduledDelivery(boolean tx) throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = cf.createConnection();
-//
-// Session sess = conn.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
-//
-// MessageProducer prod = sess.createProducer(queue1);
-//
-// MessageConsumer cons = sess.createConsumer(queue1);
-//
-// conn.start();
-//
-// //Send one scheduled
-//
-// long now = System.currentTimeMillis();
-//
-// TextMessage tm1 = sess.createTextMessage("testScheduled1");
-// tm1.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 7000);
-// prod.send(tm1);
-//
-// //First send some non scheduled messages
-//
-// TextMessage tm2 = sess.createTextMessage("testScheduled2");
-// prod.send(tm2);
-//
-// TextMessage tm3 = sess.createTextMessage("testScheduled3");
-// prod.send(tm3);
-//
-// TextMessage tm4 = sess.createTextMessage("testScheduled4");
-// prod.send(tm4);
-//
-//
-// //Now send some more scheduled messages
-//
-// TextMessage tm5 = sess.createTextMessage("testScheduled5");
-// tm5.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 5000);
-// prod.send(tm5);
-//
-// TextMessage tm6 = sess.createTextMessage("testScheduled6");
-// tm6.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 4000);
-// prod.send(tm6);
-//
-// TextMessage tm7 = sess.createTextMessage("testScheduled7");
-// tm7.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 3000);
-// prod.send(tm7);
-//
-// TextMessage tm8 = sess.createTextMessage("testScheduled8");
-// tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 6000);
-// prod.send(tm8);
-//
-// //And one scheduled with a -ve number
-//
-// TextMessage tm9 = sess.createTextMessage("testScheduled9");
-// tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, -3);
-// prod.send(tm9);
-//
-// if (tx)
-// {
-// sess.commit();
-// }
-//
-// //First the non scheduled messages should be received
-// forceGC();
-//
-//
-// TextMessage rm1 = (TextMessage)cons.receive(250);
-// assertNotNull(rm1);
-// assertEquals(tm2.getText(), rm1.getText());
-//
-// TextMessage rm2 = (TextMessage)cons.receive(250);
-// assertNotNull(rm2);
-// assertEquals(tm3.getText(), rm2.getText());
-//
-// TextMessage rm3 = (TextMessage)cons.receive(250);
-// assertNotNull(rm3);
-// assertEquals(tm4.getText(), rm3.getText());
-//
-// //Now the one with a scheduled with a -ve number
-// TextMessage rm5 = (TextMessage)cons.receive(250);
-// assertNotNull(rm5);
-// assertEquals(tm9.getText(), rm5.getText());
-//
-// //Now the scheduled
-// TextMessage rm6 = (TextMessage)cons.receive(3250);
-// assertNotNull(rm6);
-// assertEquals(tm7.getText(), rm6.getText());
-//
-// long now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 3000);
-//
-//
-// TextMessage rm7 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm7);
-// assertEquals(tm6.getText(), rm7.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 4000);
-//
-//
-// TextMessage rm8 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm8);
-// assertEquals(tm5.getText(), rm8.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 5000);
-//
-//
-// TextMessage rm9 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm9);
-// assertEquals(tm8.getText(), rm9.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 6000);
-//
-//
-// TextMessage rm10 = (TextMessage)cons.receive(1250);
-// assertNotNull(rm10);
-// assertEquals(tm1.getText(), rm10.getText());
-//
-// now2 = System.currentTimeMillis();
-//
-// assertTrue(now2 - now >= 7000);
-//
-// if (tx)
-// {
-// sess.commit();
-// }
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
+ //Uncomment when http://jira.jboss.org/jira/browse/JBMESSAGING-1206 is complete
+ public void testScheduledDeliveryTX() throws Exception
+ {
+ scheduledDelivery(true);
+ }
+ public void testScheduledDeliveryNoTX() throws Exception
+ {
+ scheduledDelivery(false);
+ }
+
+ public void testScheduledDeliveryWithRestart() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue1 = (Queue) getInitialContext().lookup("/queue/testQueue");
+ MessageProducer prod = sess.createProducer(queue1);
+
+ //Send one scheduled
+
+ long now = System.currentTimeMillis();
+
+ TextMessage tm1 = sess.createTextMessage("testScheduled1");
+ tm1.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 19000);
+ prod.send(tm1);
+
+ //First send some non scheduled messages
+
+ TextMessage tm2 = sess.createTextMessage("testScheduled2");
+ prod.send(tm2);
+
+ TextMessage tm3 = sess.createTextMessage("testScheduled3");
+ prod.send(tm3);
+
+ TextMessage tm4 = sess.createTextMessage("testScheduled4");
+ prod.send(tm4);
+
+
+ //Now send some more scheduled messages
+
+ TextMessage tm5 = sess.createTextMessage("testScheduled5");
+ tm5.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 17000);
+ prod.send(tm5);
+
+ TextMessage tm6 = sess.createTextMessage("testScheduled6");
+ tm6.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 16000);
+ prod.send(tm6);
+
+ TextMessage tm7 = sess.createTextMessage("testScheduled7");
+ tm7.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 15000);
+ prod.send(tm7);
+
+ TextMessage tm8 = sess.createTextMessage("testScheduled8");
+ tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 18000);
+ prod.send(tm8);
+
+ //And one scheduled with a -ve number
+
+ TextMessage tm9 = sess.createTextMessage("testScheduled9");
+ tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, -3);
+ prod.send(tm9);
+
+ //Now stop the server and restart it
+
+ conn.close();
+
+ stop();
+
+ start();
+
+ // Messaging server restart implies new ConnectionFactory lookup
+ deployAndLookupAdministeredObjects();
+
+ conn = cf.createConnection();
+
+ conn.start();
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ forceGC();
+
+ //First the non scheduled messages should be received
+
+ TextMessage rm1 = (TextMessage)cons.receive(250);
+ assertNotNull(rm1);
+ assertEquals(tm2.getText(), rm1.getText());
+
+ TextMessage rm2 = (TextMessage)cons.receive(250);
+ assertNotNull(rm2);
+ assertEquals(tm3.getText(), rm2.getText());
+
+ TextMessage rm3 = (TextMessage)cons.receive(250);
+ assertNotNull(rm3);
+ assertEquals(tm4.getText(), rm3.getText());
+
+ //Now the one with a scheduled with a -ve number
+ TextMessage rm5 = (TextMessage)cons.receive(250);
+ assertNotNull(rm5);
+ assertEquals(tm9.getText(), rm5.getText());
+
+ //Now the scheduled
+ TextMessage rm6 = (TextMessage)cons.receive(3250);
+ assertNotNull(rm6);
+ assertEquals(tm7.getText(), rm6.getText());
+
+ long now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 3000);
+
+
+ TextMessage rm7 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm7);
+ assertEquals(tm6.getText(), rm7.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 4000);
+
+
+ TextMessage rm8 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm8);
+ assertEquals(tm5.getText(), rm8.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 5000);
+
+
+ TextMessage rm9 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm9);
+ assertEquals(tm8.getText(), rm9.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 6000);
+
+
+ TextMessage rm10 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm10);
+ assertEquals(tm1.getText(), rm10.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 7000);
+ }
+ finally
+ {
+ removeAllMessages("testQueue", true);
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testDelayedRedelivery() throws Exception
+ {
+ String qName = "testDelayedRedeliveryDefaultQ";
+ try
+ {
+ long delay = 3000;
+ addQueueSettings(qName, delay);
+ createQueue(qName);
+ queue1 = (Queue) getInitialContext().lookup("/queue/" + qName);
+
+
+ this.delayedRedeliveryDefaultOnClose(delay);
+
+ this.delayedRedeliveryDefaultOnRollback(delay);
+ }
+ finally
+ {
+ removeQueueSettings(qName);
+ this.destroyQueue(qName);
+ }
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void delayedRedeliveryDefaultOnClose(long delay) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ final int NUM_MESSAGES = 5;
+
+ forceGC();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Now close the session
+ //This should cancel back to the queue with a delayed redelivery
+
+ long now = System.currentTimeMillis();
+
+ sess2.close();
+
+ Session sess3 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons2 = sess3.createConsumer(queue1);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(delay + 1000);
+
+ assertNotNull(tm);
+
+ long time = System.currentTimeMillis();
+
+ assertTrue(time - now >= delay);
+
+ //Hudson can introduce a large degree of indeterminism
+ assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ private void delayedRedeliveryDefaultOnRollback(long delay) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ final int NUM_MESSAGES = 5;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+
+ Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer cons = sess2.createConsumer(queue1);
+
+ conn.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(500);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ //Now rollback
+
+ long now = System.currentTimeMillis();
+
+ sess2.rollback();
+
+ //This should redeliver with a delayed redelivery
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(delay + 1000);
+
+ assertNotNull(tm);
+
+ long time = System.currentTimeMillis();
+
+ assertTrue(time - now >= delay);
+
+ //Hudson can introduce a large degree of indeterminism
+ assertTrue((time - now) + ">" + (delay + 1000), time - now < delay + 1000);
+ }
+
+ sess2.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ private void scheduledDelivery(boolean tx) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sess = conn.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue1);
+
+ MessageConsumer cons = sess.createConsumer(queue1);
+
+ conn.start();
+
+ //Send one scheduled
+
+ long now = System.currentTimeMillis();
+
+ TextMessage tm1 = sess.createTextMessage("testScheduled1");
+ tm1.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 7000);
+ prod.send(tm1);
+
+ //First send some non scheduled messages
+
+ TextMessage tm2 = sess.createTextMessage("testScheduled2");
+ prod.send(tm2);
+
+ TextMessage tm3 = sess.createTextMessage("testScheduled3");
+ prod.send(tm3);
+
+ TextMessage tm4 = sess.createTextMessage("testScheduled4");
+ prod.send(tm4);
+
+
+ //Now send some more scheduled messages
+
+ TextMessage tm5 = sess.createTextMessage("testScheduled5");
+ tm5.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 5000);
+ prod.send(tm5);
+
+ TextMessage tm6 = sess.createTextMessage("testScheduled6");
+ tm6.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 4000);
+ prod.send(tm6);
+
+ TextMessage tm7 = sess.createTextMessage("testScheduled7");
+ tm7.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 3000);
+ prod.send(tm7);
+
+ TextMessage tm8 = sess.createTextMessage("testScheduled8");
+ tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, now + 6000);
+ prod.send(tm8);
+
+ //And one scheduled with a -ve number
+
+ TextMessage tm9 = sess.createTextMessage("testScheduled9");
+ tm8.setLongProperty(JBossMessage.JMS_JBOSS_SCHEDULED_DELIVERY_PROP_NAME, -3);
+ prod.send(tm9);
+
+ if (tx)
+ {
+ sess.commit();
+ }
+
+ //First the non scheduled messages should be received
+ forceGC();
+
+
+ TextMessage rm1 = (TextMessage)cons.receive(250);
+ assertNotNull(rm1);
+ assertEquals(tm2.getText(), rm1.getText());
+
+ TextMessage rm2 = (TextMessage)cons.receive(250);
+ assertNotNull(rm2);
+ assertEquals(tm3.getText(), rm2.getText());
+
+ TextMessage rm3 = (TextMessage)cons.receive(250);
+ assertNotNull(rm3);
+ assertEquals(tm4.getText(), rm3.getText());
+
+ //Now the one with a scheduled with a -ve number
+ TextMessage rm5 = (TextMessage)cons.receive(250);
+ assertNotNull(rm5);
+ assertEquals(tm9.getText(), rm5.getText());
+
+ //Now the scheduled
+ TextMessage rm6 = (TextMessage)cons.receive(3250);
+ assertNotNull(rm6);
+ assertEquals(tm7.getText(), rm6.getText());
+
+ long now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 3000);
+
+
+ TextMessage rm7 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm7);
+ assertEquals(tm6.getText(), rm7.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 4000);
+
+
+ TextMessage rm8 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm8);
+ assertEquals(tm5.getText(), rm8.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 5000);
+
+
+ TextMessage rm9 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm9);
+ assertEquals(tm8.getText(), rm9.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 6000);
+
+
+ TextMessage rm10 = (TextMessage)cons.receive(1250);
+ assertNotNull(rm10);
+ assertEquals(tm1.getText(), rm10.getText());
+
+ now2 = System.currentTimeMillis();
+
+ assertTrue(now2 - now >= 7000);
+
+ if (tx)
+ {
+ sess.commit();
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -53,7 +53,12 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -603,6 +608,20 @@
return (JMSServerManager) bootstrap.getKernel().getRegistry().getEntry("JMSServerManager").getTarget();
}
+ public void addQueueSettings(String name, long redeliveryDelay)
+ {
+ QueueSettings qs = getMessagingServer().getQueueSettingsRepository().getMatch("*");
+ QueueSettings newSets = new QueueSettings();
+ newSets.setRedeliveryDelay(redeliveryDelay);
+ newSets.merge(qs);
+ getMessagingServer().getQueueSettingsRepository().addMatch(name, newSets);
+ }
+
+ public void removeQueueSettings(String name)
+ {
+ getMessagingServer().getQueueSettingsRepository().removeMatch(name);
+ }
+
public InitialContext getInitialContext() throws Exception
{
Properties props = new Properties();
@@ -682,7 +701,7 @@
QueueSettings queueSettings = new QueueSettings();
queueSettings.setRedeliveryDelay(delay);
//FIXME we need to expose queue attributes in another way
-// getMessagingServer().getServerManagement().setQueueAttributes(condition, queueSettings);
+// getMessagingServer().getServerManagement().setQueueAttributes(condition, queueSettings);
}
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -21,6 +21,22 @@
*/
package org.jboss.test.messaging.tools.container;
+import org.jboss.kernel.spi.deployment.KernelDeployment;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.security.Role;
+import org.jboss.messaging.core.server.MessagingServer;
+import org.jboss.messaging.jms.JBossDestination;
+import org.jboss.messaging.jms.server.JMSServerManager;
+import org.jboss.messaging.jms.server.management.JMSQueueControlMBean;
+import org.jboss.messaging.jms.server.management.SubscriptionInfo;
+import org.jboss.messaging.jms.server.management.TopicControlMBean;
+import org.jboss.messaging.jms.server.management.impl.JMSManagementServiceImpl;
+
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.transaction.UserTransaction;
import java.lang.management.ManagementFactory;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;
@@ -33,23 +49,6 @@
import java.util.Map;
import java.util.Set;
-import javax.management.MBeanServerInvocationHandler;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-import javax.naming.InitialContext;
-import javax.transaction.UserTransaction;
-
-import org.jboss.kernel.spi.deployment.KernelDeployment;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.security.Role;
-import org.jboss.messaging.core.server.MessagingServer;
-import org.jboss.messaging.jms.JBossDestination;
-import org.jboss.messaging.jms.server.JMSServerManager;
-import org.jboss.messaging.jms.server.management.JMSQueueControlMBean;
-import org.jboss.messaging.jms.server.management.SubscriptionInfo;
-import org.jboss.messaging.jms.server.management.TopicControlMBean;
-import org.jboss.messaging.jms.server.management.impl.JMSManagementServiceImpl;
-
/**
* An RMI wrapper to access the ServiceContainer from a different address space.
*
@@ -491,11 +490,15 @@
server.setSecurityConfig(defConfig);
}
- public void setRedeliveryDelayOnDestination(String dest, boolean queue, long delay) throws Exception
+ public void addQueueSettings(String name, long redeliveryDelay)
{
- server.setRedeliveryDelayOnDestination(dest, queue, delay);
+ server.addQueueSettings(name, redeliveryDelay);
}
+ public void removeQueueSettings(String name)
+ {
+ server.removeQueueSettings(name);
+ }
public InitialContext getInitialContext() throws Exception
{
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/Server.java 2008-10-07 09:47:52 UTC (rev 5081)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/tools/container/Server.java 2008-10-07 16:25:27 UTC (rev 5082)
@@ -21,16 +21,6 @@
*/
package org.jboss.test.messaging.tools.container;
-import java.rmi.Remote;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-import javax.naming.InitialContext;
-import javax.transaction.UserTransaction;
-
import org.jboss.kernel.spi.deployment.KernelDeployment;
import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessagingServer;
@@ -38,6 +28,15 @@
import org.jboss.messaging.jms.server.JMSServerManager;
import org.jboss.messaging.jms.server.management.SubscriptionInfo;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+import javax.naming.InitialContext;
+import javax.transaction.UserTransaction;
+import java.rmi.Remote;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+
/**
* The remote interface exposed by TestServer.
*
@@ -268,8 +267,10 @@
//void setSecurityConfigOnManager(boolean b, String s, Set<Role> lockedConf) throws Exception;
- void setRedeliveryDelayOnDestination(String dest, boolean queue, long delay) throws Exception;
-
//void setDefaultRedeliveryDelay(long delay) throws Exception;
JMSServerManager getJMSServerManager() throws Exception;
+
+ void addQueueSettings(String name, long redeliveryDelay);
+
+ void removeQueueSettings(String name);
}
More information about the jboss-cvs-commits
mailing list