[jboss-cvs] JBoss Messaging SVN: r3180 - in trunk: tests/src/org/jboss/test/messaging/jms/bridge and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Oct 8 07:58:49 EDT 2007
Author: timfox
Date: 2007-10-08 07:58:49 -0400 (Mon, 08 Oct 2007)
New Revision: 3180
Modified:
trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1098
Modified: trunk/src/main/org/jboss/jms/server/bridge/Bridge.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-10-08 02:43:15 UTC (rev 3179)
+++ trunk/src/main/org/jboss/jms/server/bridge/Bridge.java 2007-10-08 11:58:49 UTC (rev 3180)
@@ -1041,7 +1041,7 @@
enlistResources(tx);
}
- producer = sess.createProducer(targetDestination);
+ producer = sess.createProducer(null);
consumer.setMessageListener(new SourceListener());
@@ -1152,10 +1152,6 @@
return false;
}
- /*
- * If one of the JMS operations fail, then we try and lookup the connection factories, create
- * the connections and retry, up to a certain number of times
- */
private void sendBatch()
{
if (trace) { log.trace("Sending batch of " + messages.size() + " messages"); }
@@ -1197,8 +1193,20 @@
if (trace) { log.trace("Sending message " + msg); }
- producer.send(msg);
+ long timeToLive = msg.getJMSExpiration();
+ if (timeToLive != 0)
+ {
+ timeToLive -= System.currentTimeMillis();
+
+ if (timeToLive <= 0)
+ {
+ timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+ }
+ }
+
+ producer.send(targetDestination, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive);
+
if (trace) { log.trace("Sent message " + msg); }
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-10-08 02:43:15 UTC (rev 3179)
+++ trunk/tests/src/org/jboss/test/messaging/jms/bridge/BridgeTest.java 2007-10-08 11:58:49 UTC (rev 3180)
@@ -720,7 +720,17 @@
}
}
- public void testMessageIDInHeader() throws Exception
+ public void testMessageIDInHeaderOn() throws Exception
+ {
+ messageIDInHeader(true);
+ }
+
+ public void testMessageIDInHeaderOff() throws Exception
+ {
+ messageIDInHeader(false);
+ }
+
+ private void messageIDInHeader(boolean on) throws Exception
{
Bridge bridge = null;
@@ -736,7 +746,7 @@
null, null, null, null,
null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
1, -1,
- null, null, true);
+ null, null, on);
bridge.start();
@@ -796,54 +806,199 @@
assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));
- String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
-
- assertNotNull(header);
-
- assertEquals(ids1.get(i), header);
-
- msgs.add(tm);
+ if (on)
+ {
+ String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+
+ assertNotNull(header);
+
+ assertEquals(ids1.get(i), header);
+
+ msgs.add(tm);
+ }
}
- //Now we send them again back to the source
+ if (on)
+ {
+ //Now we send them again back to the source
+
+ Iterator iter = msgs.iterator();
+
+ List ids2 = new ArrayList();
+
+ while (iter.hasNext())
+ {
+ Message msg = (Message)iter.next();
+
+ prod.send(msg);
+
+ ids2.add(msg.getJMSMessageID());
+ }
+
+ //And consume them again
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+
+ assertEquals("uhuh", tm.getStringProperty("wib"));
+ assertTrue(tm.getBooleanProperty("cheese"));
+ assertEquals(23, tm.getIntProperty("Sausages"));
+
+ assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
+ assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));
+
+ String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+
+ assertNotNull(header);
+
+ assertEquals(ids1.get(i) + "," + ids2.get(i), header);
+ }
+ }
- Iterator iter = msgs.iterator();
+ }
+ finally
+ {
+ if (bridge != null)
+ {
+ bridge.stop();
+ }
- List ids2 = new ArrayList();
-
- while (iter.hasNext())
+ if (connSource != null)
{
- Message msg = (Message)iter.next();
-
- prod.send(msg);
-
- ids2.add(msg.getJMSMessageID());
+ connSource.close();
}
-
- //And consume them again
- for (int i = 0; i < NUM_MESSAGES; i++)
+ if (connTarget != null)
{
- TextMessage tm = (TextMessage)cons.receive(5000);
+ connTarget.close();
+ }
+ }
+ }
+
+
+ public void testPropertiesPreservedPOn() throws Exception
+ {
+ propertiesPreserved(true, true);
+ }
+
+ public void testPropertiesPreservedNPoff() throws Exception
+ {
+ propertiesPreserved(false, true);
+ }
+
+ public void testPropertiesPreservedNPOn() throws Exception
+ {
+ propertiesPreserved(false, true);
+ }
+
+ public void testPropertiesPreservedPoff() throws Exception
+ {
+ propertiesPreserved(true, true);
+ }
+
+ private void propertiesPreserved(boolean persistent, boolean messageIDInHeader) throws Exception
+ {
+ Bridge bridge = null;
+
+ Connection connSource = null;
+
+ Connection connTarget = null;
- assertNotNull(tm);
-
- assertEquals("message" + i, tm.getText());
-
- assertEquals("uhuh", tm.getStringProperty("wib"));
- assertTrue(tm.getBooleanProperty("cheese"));
- assertEquals(23, tm.getIntProperty("Sausages"));
-
- assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
- assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));
-
- String header = tm.getStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
-
- assertNotNull(header);
-
- assertEquals(ids1.get(i) + "," + ids2.get(i), header);
- }
+ try
+ {
+ final int NUM_MESSAGES = 10;
+ bridge = new Bridge(cff0, cff1, sourceQueue, destQueue,
+ null, null, null, null,
+ null, 5000, 10, Bridge.QOS_AT_MOST_ONCE,
+ 1, -1,
+ null, null, messageIDInHeader);
+
+ bridge.start();
+
+ connSource = cf0.createConnection();
+
+ connTarget = cf1.createConnection();
+
+ log.trace("Sending " + NUM_MESSAGES + " messages");
+
+ Session sessSource = connSource.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Session sessTarget = connTarget.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessTarget.createConsumer(destQueue);
+
+ connTarget.start();
+
+ MessageProducer prod = sessSource.createProducer(sourceQueue);
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+
+
+ TextMessage tm = sessSource.createTextMessage("blahmessage");
+
+ prod.setPriority(7);
+
+ prod.setTimeToLive(1 * 60 * 60 * 1000);
+
+ prod.send(tm);
+
+ long expiration = tm.getJMSExpiration();
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("blahmessage", tm.getText());
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ assertEquals(7, tm.getJMSPriority());
+
+ assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
+
+ Message m = cons.receive(5000);
+
+ assertNull(m);
+
+
+ //Now do one with expiration = 0
+
+
+ tm = sessSource.createTextMessage("blahmessage2");
+
+ prod.setPriority(7);
+
+ prod.setTimeToLive(0);
+
+ prod.send(tm);
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ tm = (TextMessage)cons.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("blahmessage2", tm.getText());
+
+ assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+
+ assertEquals(7, tm.getJMSPriority());
+
+ assertEquals(0, tm.getJMSExpiration());
+
+ m = cons.receive(5000);
+
+ assertNull(m);
+
}
finally
{
More information about the jboss-cvs-commits
mailing list