[Jboss-cvs] JBoss Messaging SVN: r1244 - in trunk: src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/base
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 31 14:11:15 EDT 2006
Author: timfox
Date: 2006-08-31 14:11:07 -0400 (Thu, 31 Aug 2006)
New Revision: 1244
Modified:
trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java
Log:
Second interim clustering commit
Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -138,8 +138,6 @@
//Important note! We must ALWAYS set done true irrespective of whether we are in a tx or not.
//Previously we were only setting done to true if there was no transaction.
- //This caused a memory leak when using with JBoss EJB3, since JBoss EJB3 always
- //seems to ack in a tx.
//This meant that if the acknowledgement (in the tx) came in before the call to handle()
//had returned the delivery would end up in the delivery set in the channel and never
//get removed - causing a leak
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -56,9 +56,10 @@
private static final Logger log = Logger.getLogger(SimpleReceiver.class);
public static final String ACKING = "ACKING";
- public static final String NACKING = "NACKING";
+ public static final String ACCEPTING = "ACCEPTING";
public static final String BROKEN = "BROKEN";
public static final String REJECTING = "REJECTING";
+ public static final String SELECTOR_REJECTING = "SELECTOR_REJECTING";
private static final String INVOCATION_COUNT = "INVOCATION_COUNT";
@@ -127,6 +128,12 @@
log.info("Receiver [" + name + "] is rejecting a null message");
return null;
}
+
+ if (SELECTOR_REJECTING.equals(state))
+ {
+ log.info(this + " is rejecting message since doesn't match selector");
+ return new SimpleDelivery(null, null, true, false);
+ }
if (REJECTING.equals(state))
{
@@ -379,9 +386,10 @@
private static void checkValid(String state)
{
if (!ACKING.equals(state) &&
- !NACKING.equals(state) &&
+ !ACCEPTING.equals(state) &&
!BROKEN.equals(state) &&
- !REJECTING.equals(state))
+ !REJECTING.equals(state) &&
+ !SELECTOR_REJECTING.equals(state))
{
throw new IllegalArgumentException("Unknown receiver state: " + state);
}
@@ -409,7 +417,7 @@
public void afterCommit()
{
-// clear the delivery
+ // clear the delivery
touple[1] = null;
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -2257,7 +2257,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2299,7 +2299,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
r.setImmediateAsynchronousAcknowledgment(true);
assertTrue(messageQueue.add(r));
@@ -2339,7 +2339,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2387,7 +2387,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2446,7 +2446,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2489,7 +2489,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2538,7 +2538,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2614,7 +2614,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -2649,7 +2649,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -2709,7 +2709,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -2774,7 +2774,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2814,7 +2814,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2873,7 +2873,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2928,7 +2928,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2976,7 +2976,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3041,7 +3041,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3096,7 +3096,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3154,7 +3154,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3212,7 +3212,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3273,7 +3273,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3320,7 +3320,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3374,7 +3374,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3417,7 +3417,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3466,7 +3466,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3507,7 +3507,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -3553,7 +3553,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -4832,7 +4832,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -4874,7 +4874,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
r.setImmediateAsynchronousAcknowledgment(true);
assertTrue(messageQueue.add(r));
@@ -4914,7 +4914,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -4962,7 +4962,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -5021,7 +5021,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5064,7 +5064,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5113,7 +5113,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5180,7 +5180,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -5222,7 +5222,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
r.setImmediateAsynchronousAcknowledgment(true);
assertTrue(messageQueue.add(r));
@@ -5263,7 +5263,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -5312,7 +5312,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -5372,7 +5372,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5415,7 +5415,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5464,7 +5464,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5532,7 +5532,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -5581,7 +5581,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5637,7 +5637,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -5686,7 +5686,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5737,7 +5737,7 @@
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -5799,7 +5799,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -5842,7 +5842,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5891,7 +5891,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -5932,7 +5932,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -5977,7 +5977,7 @@
}
// add an NACKING receiver to the channel
- SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+ SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
assertTrue(messageQueue.add(r));
@@ -6132,7 +6132,7 @@
assertEquals(1, stored.size());
SimpleReceiver receiver =
- new SimpleReceiver("NACKINGReceiver", SimpleReceiver.NACKING, messageQueue);
+ new SimpleReceiver("NACKINGReceiver", SimpleReceiver.ACCEPTING, messageQueue);
assertTrue(messageQueue.add(receiver));
assertEquals(1, messageQueue.browse().size());
@@ -6268,7 +6268,7 @@
assertEquals(1, stored.size());
SimpleReceiver receiver =
- new SimpleReceiver("NACKINGReceiver", SimpleReceiver.NACKING, messageQueue);
+ new SimpleReceiver("NACKINGReceiver", SimpleReceiver.ACCEPTING, messageQueue);
assertTrue(messageQueue.add(receiver));
assertEquals(1, messageQueue.browse().size());
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -23,6 +23,7 @@
import java.util.List;
+
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.MessageQueue;
@@ -30,13 +31,15 @@
import org.jboss.messaging.core.plugin.contract.Exchange;
import org.jboss.messaging.core.plugin.exchange.Binding;
import org.jboss.messaging.core.plugin.exchange.ClusteredTopicExchange;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
import org.jgroups.Channel;
import org.jgroups.JChannel;
/**
*
* A ClusteredTopicExchangeTest
-
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision: 1.1 $</tt>
*
@@ -71,6 +74,529 @@
super.tearDown();
}
+ //For the following tests create a simple receiver and acknowledge messages properly rather
+ //than relying on removeAllReferences
+
+ //testBindUnbind - bind and unbind on different nodes as members join and leave, make sure the correct state
+ //is replicated - make sure durable bindings remain durable
+
+ //non transactional route
+ //create several subscriptions same durable some non durable on different nodes
+ //route messages with no tx - make sure it arrives ok - make sure messages arrive in durable
+ //subs even if node is shutdown and restarted
+
+ //as above but transactional
+
+ //test that activate etc is local only
+
+ //test that can't bind twice with same name across cluster
+
+ //test that can't do exchange operations on an exchange that is stopped
+
+ //start and stop exchanges concurrently to test for race conditions
+
+ //Need to add tests for queue/topic with selector
+
+ //Need to route with peristent and non persistent messages
+
+ public void testClusteredBindUnbind() throws Throwable
+ {
+ Exchange exchange1 = null;
+
+ Exchange exchange2 = null;
+
+ Exchange exchange3 = null;
+
+ try
+ {
+ //Start one exchange
+
+ exchange1 = createExchange("node1", "testgroup");
+
+ //Add a couple of bindings
+
+ Binding binding1 = exchange1.bindQueue("sub1", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+ Binding binding2 = exchange1.bindQueue("sub2", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+
+ //Start another exchange - make sure it picks up the bindings from the first node
+
+ exchange2 = createExchange("node2", "testgroup");
+
+ List bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding1, (Binding)bindings.get(0));
+ assertEquivalent(binding2, (Binding)bindings.get(1));
+
+ //Add another binding on node 2
+
+ Binding binding3 = exchange2.bindQueue("sub3", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+
+ //Make sure both nodes pick it up
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding1, (Binding)bindings.get(0));
+ assertEquivalent(binding2, (Binding)bindings.get(1));
+ assertEquivalent(binding3, (Binding)bindings.get(2));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding1, (Binding)bindings.get(0));
+ assertEquivalent(binding2, (Binding)bindings.get(1));
+ assertEquivalent(binding3, (Binding)bindings.get(2));
+
+ //Add another binding on node 1
+
+ Binding binding4 = exchange2.bindQueue("sub4", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+
+ // Make sure both nodes pick it up
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(4, bindings.size());
+
+ assertEquivalent(binding1, (Binding)bindings.get(0));
+ assertEquivalent(binding2, (Binding)bindings.get(1));
+ assertEquivalent(binding3, (Binding)bindings.get(2));
+ assertEquivalent(binding4, (Binding)bindings.get(2));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(4, bindings.size());
+
+ assertEquivalent(binding1, (Binding)bindings.get(0));
+ assertEquivalent(binding2, (Binding)bindings.get(1));
+ assertEquivalent(binding3, (Binding)bindings.get(2));
+ assertEquivalent(binding4, (Binding)bindings.get(3));
+
+ //Unbind binding 1 and binding 2
+ exchange1.unbindQueue("sub1");
+ exchange1.unbindQueue("sub2");
+
+ //Make sure bindings are not longer available on either node
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(4, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+
+ //Add a third exchange
+
+ exchange1 = createExchange("node3", "testgroup");
+
+ //Maks sure it picks up the bindings
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+
+ //Add another binding on node 3
+
+ Binding binding5 = exchange3.bindQueue("sub5", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+
+ // Make sure all nodes pick it up
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+
+ //Add a durable binding on node 1
+
+ Binding binding6 = exchange1.bindQueue("sub6", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+
+ // Make sure all nodes pick it up
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(4, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+ assertEquivalent(binding6, (Binding)bindings.get(3));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(4, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+ assertEquivalent(binding6, (Binding)bindings.get(3));
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(4, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+ assertEquivalent(binding6, (Binding)bindings.get(3));
+
+ //Stop exchange 1
+ exchange1.stop();
+
+ //All it's non durable bindings should be removed from the other nodes
+ //Durable bindings should remain
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(3, bindings.size());
+
+ assertEquivalent(binding3, (Binding)bindings.get(0));
+ assertEquivalent(binding4, (Binding)bindings.get(1));
+ assertEquivalent(binding5, (Binding)bindings.get(2));
+
+ //Stop exchange 2
+ exchange2.stop();
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding4, (Binding)bindings.get(0));
+ assertEquivalent(binding5, (Binding)bindings.get(1));
+
+ //Restart exchange 1 and exchange 2
+ exchange1.start();
+
+ exchange2.start();
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding4, (Binding)bindings.get(0));
+ assertEquivalent(binding5, (Binding)bindings.get(1));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding4, (Binding)bindings.get(0));
+ assertEquivalent(binding5, (Binding)bindings.get(1));
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(2, bindings.size());
+
+ assertEquivalent(binding4, (Binding)bindings.get(0));
+ assertEquivalent(binding5, (Binding)bindings.get(1));
+
+ //Stop all exchanges
+
+ exchange1.stop();
+ exchange2.stop();
+ exchange3.stop();
+
+ //Start them all
+ exchange1.start();
+ exchange2.start();
+ exchange3.start();
+
+ //Only the durable queue should survive
+
+ bindings = exchange1.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(1, bindings.size());
+
+ assertEquivalent(binding5, (Binding)bindings.get(0));
+
+ bindings = exchange2.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(1, bindings.size());
+
+ assertEquivalent(binding5, (Binding)bindings.get(0));
+
+ bindings = exchange3.listBindingsForWildcard("topic1");
+ assertNotNull(bindings);
+ assertEquals(1, bindings.size());
+
+ assertEquivalent(binding5, (Binding)bindings.get(0));
+ }
+ finally
+ {
+ if (exchange1 != null)
+ {
+ exchange1.stop();
+ }
+
+ if (exchange2 != null)
+ {
+ exchange2.stop();
+ }
+
+ if (exchange3 != null)
+ {
+ exchange2.stop();
+ }
+ }
+
+ }
+
+ public void testClusteredRoute() throws Throwable
+ {
+ Exchange exchange1 = null;
+
+ Exchange exchange2 = null;
+
+ Exchange exchange3 = null;
+
+ try
+ {
+ //Start three exchanges
+
+ exchange1.start();
+ exchange2.start();
+ exchange3.start();
+
+ //Two topics with a mixture of durable and non durable subscriptions
+
+ Binding[] bindings = new Binding[16];
+
+ bindings[0] = exchange1.bindQueue("sub1", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[1] = exchange1.bindQueue("sub2", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+
+ bindings[2] = exchange2.bindQueue("sub3", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[3] = exchange2.bindQueue("sub4", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[4] = exchange2.bindQueue("sub5", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+
+ bindings[5] = exchange1.bindQueue("sub6", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[6] = exchange1.bindQueue("sub7", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+ bindings[7] = exchange1.bindQueue("sub8", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+
+ bindings[8] = exchange1.bindQueue("sub1", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[9] = exchange1.bindQueue("sub2", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+
+ bindings[10] = exchange2.bindQueue("sub3", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[11] = exchange2.bindQueue("sub4", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[12] = exchange2.bindQueue("sub5", "topic2", null, false, true, ms, pm, 20000, 100, 100);
+
+ bindings[13] = exchange1.bindQueue("sub6", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+ bindings[14] = exchange1.bindQueue("sub7", "topic2", null, false, true, ms, pm, 20000, 100, 100);
+ bindings[15] = exchange1.bindQueue("sub8", "topic2", null, false, true, ms, pm, 20000, 100, 100);
+
+ MessageQueue[] queues = new MessageQueue[16];
+ SimpleReceiver[] receivers = new SimpleReceiver[16];
+
+ for (int i = 0; i < 16; i++)
+ {
+ queues[i] = bindings[i].getQueue();
+ receivers[i] = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ }
+
+ Message msg1 = MessageFactory.createCoreMessage(1);
+ MessageReference ref1 = ms.reference(msg1);
+
+ boolean routed = exchange1.route(ref1, "topic1", null);
+ assertTrue(routed);
+
+ List msgs = receivers[0].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[0].acknowledge(msgRec, null);
+ msgs = queues[0].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[1].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[1].acknowledge(msgRec, null);
+ msgs = queues[1].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[2].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[2].acknowledge(msgRec, null);
+ msgs = queues[2].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[3].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[3].acknowledge(msgRec, null);
+ msgs = queues[3].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[4].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[4].acknowledge(msgRec, null);
+ msgs = queues[4].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[5].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[5].acknowledge(msgRec, null);
+ msgs = queues[5].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[6].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[6].acknowledge(msgRec, null);
+ msgs = queues[6].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[7].getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ msgRec = (Message)msgs.get(0);
+ assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+ receivers[7].acknowledge(msgRec, null);
+ msgs = queues[7].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[8].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[8].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[9].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[9].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[10].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[10].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[11].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[11].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[12].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[12].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[13].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[13].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[14].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[14].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receivers[15].getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ msgs = queues[15].browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+
+ }
+ finally
+ {
+ if (exchange1 != null)
+ {
+ exchange1.stop();
+ }
+
+ if (exchange2 != null)
+ {
+ exchange2.stop();
+ }
+
+ if (exchange3 != null)
+ {
+ exchange2.stop();
+ }
+ }
+ }
+
+
+
+
public void testClusteredTopic() throws Exception
{
Exchange exchange1 = null;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.plugin.exchange.Binding;
import org.jboss.messaging.core.plugin.exchange.DirectExchange;
import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.ExchangeTestBase;
/**
@@ -71,7 +72,7 @@
super.tearDown();
}
- public void testRoute() throws Throwable
+ public final void testRoute() throws Throwable
{
Exchange exchange = createExchange();
@@ -82,6 +83,10 @@
MessageQueue queue1 = binding1.getQueue();
MessageQueue queue2 = binding2.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver2);
assertTrue(binding1.isActive());
assertTrue(binding2.isActive());
@@ -92,36 +97,45 @@
boolean routed = exchange.route(ref1, "queue1", null);
assertTrue(routed);
- List msgs = queue1.browse();
+ List msgs = receiver1.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
Message msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg1.getMessageID());
- queue1.removeAllReferences();
+ assertTrue(msg1 == msgRec);
+ receiver1.acknowledge(msgRec, null);
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue2.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
+ receiver1.clear();
+ receiver2.clear();
+
Message msg2 = MessageFactory.createCoreMessage(2);
MessageReference ref2 = ms.reference(msg2);
routed = exchange.route(ref2, "queue2", null);
assertTrue(routed);
- msgs = queue2.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
- queue2.removeAllReferences();
+ assertTrue(msg2 == msgRec);
+ receiver2.acknowledge(msgRec, null);
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue1.browse();
+ msgs = receiver1.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
}
- public void testRouteInactive() throws Throwable
+ public final void testRouteInactive() throws Throwable
{
Exchange exchange = createExchange();
@@ -132,10 +146,13 @@
MessageQueue queue1 = binding1.getQueue();
MessageQueue queue2 = binding2.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver2);
- //Now make bindings inactive
+ //Now make one bindings inactive
binding1.deactivate();
- binding2.deactivate();
assertFalse(binding1.isActive());
assertFalse(binding2.isActive());
@@ -146,23 +163,37 @@
boolean routed = exchange.route(ref1, "queue1", null);
assertFalse(routed);
+ List msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
Message msg2 = MessageFactory.createCoreMessage(2);
MessageReference ref2 = ms.reference(msg2);
- routed = exchange.route(ref1, "queue2", null);
- assertFalse(routed);
+ routed = exchange.route(ref2, "queue2", null);
+ assertTrue(routed);
- List msgs = queue1.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
-
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertTrue(msg2 == msgRec);
+ receiver2.acknowledge(msgRec, null);
msgs = queue2.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
}
- public void testRouteNoBinding() throws Throwable
+ public final void testRouteNoBinding() throws Throwable
{
Exchange exchange = createExchange();
@@ -170,6 +201,8 @@
exchange.bindQueue("queue1", "queue1", null, false, true, ms, pm, 1000, 20, 20);
MessageQueue queue1 = binding1.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
assertTrue(binding1.isActive());
@@ -183,10 +216,10 @@
List msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
}
- public void testRouteTransactional() throws Throwable
+ public final void testRouteTransactional() throws Throwable
{
Exchange exchange = createExchange();
@@ -194,6 +227,8 @@
exchange.bindQueue("queue1", "queue1", null, false, true, ms, pm, 1000, 20, 20);
MessageQueue queue1 = binding1.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
assertTrue(binding1.isActive());
@@ -209,23 +244,32 @@
assertTrue(routed);
routed = exchange.route(ref2, "queue1", tx);
assertTrue(routed);
-
+
+
List msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
tx.commit();
- msgs = queue1.browse();
+ msgs = receiver1.getMessages();
assertNotNull(msgs);
assertEquals(2, msgs.size());
Message msgRec1 = (Message)msgs.get(0);
- Message msgRec2 = (Message)msgs.get(0);
- assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
- assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
+ Message msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg1);
+ assertTrue(msgRec2 == msg2);
- queue1.removeAllReferences();
+ //Acknowledge non transactionally
+ receiver1.acknowledge(msgRec1, null);
+ receiver1.acknowledge(msgRec2, null);
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+ receiver1.clear();
+
Message msg3 = MessageFactory.createCoreMessage(3);
MessageReference ref3 = ms.reference(msg3);
@@ -241,13 +285,104 @@
msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
tx.rollback();
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver1.clear();
+
+
+ Message msg5 = MessageFactory.createCoreMessage(5);
+ MessageReference ref5 = ms.reference(msg5);
+
+ Message msg6 = MessageFactory.createCoreMessage(6);
+ MessageReference ref6 = ms.reference(msg6);
+
+ routed = exchange.route(ref5, "queue1", null);
+ assertTrue(routed);
+ routed = exchange.route(ref6, "queue1", null);
+ assertTrue(routed);
+
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg5);
+ assertTrue(msgRec2 == msg6);
+
+ //Acknowledge transactionally
+
+ tx = tr.createTransaction();
+
+ receiver1.acknowledge(msgRec1, tx);
+ receiver1.acknowledge(msgRec2, tx);
+
msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg5);
+ assertTrue(msgRec2 == msg6);
+
+ tx.commit();
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+
+ receiver1.clear();
+
+ Message msg7 = MessageFactory.createCoreMessage(7);
+ MessageReference ref7 = ms.reference(msg7);
+
+ Message msg8 = MessageFactory.createCoreMessage(8);
+ MessageReference ref8 = ms.reference(msg8);
+
+ routed = exchange.route(ref7, "queue1", null);
+ assertTrue(routed);
+ routed = exchange.route(ref8, "queue1", null);
+ assertTrue(routed);
+
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ //Acknowledge transactionally
+
+ tx = tr.createTransaction();
+
+ receiver1.acknowledge(msgRec1, tx);
+ receiver1.acknowledge(msgRec2, tx);
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ tx.rollback();
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -29,9 +29,9 @@
import org.jboss.messaging.core.message.MessageFactory;
import org.jboss.messaging.core.plugin.contract.Exchange;
import org.jboss.messaging.core.plugin.exchange.Binding;
-import org.jboss.messaging.core.plugin.exchange.ClusteredTopicExchange;
import org.jboss.messaging.core.plugin.exchange.TopicExchange;
import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.core.plugin.base.ExchangeTestBase;
/**
@@ -72,16 +72,16 @@
super.tearDown();
}
- public void testRoute() throws Throwable
+ public final void testRoute() throws Throwable
{
Exchange exchange = createExchange();
Binding binding1 =
- exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);
Binding binding2 =
- exchange.bindQueue("sub2", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub2", "topic1", null, false, false, ms, pm, 1000, 20, 20);
Binding binding3 =
- exchange.bindQueue("sub3", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub3", "topic1", null, false, false, ms, pm, 1000, 20, 20);
Binding binding4 =
exchange.bindQueue("sub4", "topic2", null, false, true, ms, pm, 1000, 20, 20);
Binding binding5 =
@@ -95,6 +95,18 @@
MessageQueue queue4 = binding4.getQueue();
MessageQueue queue5 = binding5.getQueue();
MessageQueue queue6 = binding6.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+ SimpleReceiver receiver4 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+ SimpleReceiver receiver5 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+ SimpleReceiver receiver6 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue6.add(receiver6);
assertTrue(binding1.isActive());
assertTrue(binding2.isActive());
@@ -109,39 +121,51 @@
boolean routed = exchange.route(ref1, "topic1", null);
assertTrue(routed);
- List msgs = queue1.browse();
+ List msgs = receiver1.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
Message msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg1.getMessageID());
+ assertTrue(msg1 == msgRec);
+ receiver1.acknowledge(msgRec, null);
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue2.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg1.getMessageID());
+ assertTrue(msg1 == msgRec);
+ receiver2.acknowledge(msgRec, null);
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue3.browse();
+ msgs = receiver3.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg1.getMessageID());
+ assertTrue(msg1 == msgRec);
+ receiver3.acknowledge(msgRec, null);
+ msgs = queue3.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue4.browse();
+ msgs = receiver4.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue5.browse();
+ msgs = receiver5.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue6.browse();
+ msgs = receiver6.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- queue1.removeAllReferences();
- queue2.removeAllReferences();
- queue3.removeAllReferences();
+ receiver1.clear();
+ receiver2.clear();
+ receiver3.clear();
Message msg2 = MessageFactory.createCoreMessage(2);
@@ -150,49 +174,59 @@
routed = exchange.route(ref2, "topic2", null);
assertTrue(routed);
- msgs = queue4.browse();
+ msgs = receiver4.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+ assertTrue(msg2 == msgRec);
+ receiver4.acknowledge(msgRec, null);
+ msgs = queue4.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue5.browse();
+ msgs = receiver5.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+ assertTrue(msg2 == msgRec);
+ receiver5.acknowledge(msgRec, null);
+ msgs = queue5.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue6.browse();
+ msgs = receiver6.getMessages();
assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+ assertTrue(msg2 == msgRec);
+ receiver6.acknowledge(msgRec, null);
+ msgs = queue6.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
- msgs = queue1.browse();
+ msgs = receiver1.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue2.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue3.browse();
+ msgs = receiver3.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
-
-
+ assertTrue(msgs.isEmpty());
}
- public void testRouteInactive() throws Throwable
+ public final void testRouteInactive() throws Throwable
{
Exchange exchange = createExchange();
Binding binding1 =
- exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);
Binding binding2 =
- exchange.bindQueue("sub2", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub2", "topic1", null, false, false, ms, pm, 1000, 20, 20);
Binding binding3 =
- exchange.bindQueue("sub3", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub3", "topic1", null, false, false, ms, pm, 1000, 20, 20);
Binding binding4 =
exchange.bindQueue("sub4", "topic2", null, false, true, ms, pm, 1000, 20, 20);
Binding binding5 =
@@ -206,17 +240,30 @@
MessageQueue queue4 = binding4.getQueue();
MessageQueue queue5 = binding5.getQueue();
MessageQueue queue6 = binding6.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
+ SimpleReceiver receiver3 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue3.add(receiver3);
+ SimpleReceiver receiver4 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue4.add(receiver4);
+ SimpleReceiver receiver5 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue5.add(receiver5);
+ SimpleReceiver receiver6 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue6.add(receiver6);
binding1.deactivate();
binding2.deactivate();
- binding3.deactivate();
+ binding5.deactivate();
+ binding6.deactivate();
assertFalse(binding1.isActive());
assertFalse(binding2.isActive());
- assertFalse(binding3.isActive());
- assertTrue(binding4.isActive());
- assertTrue(binding5.isActive());
- assertTrue(binding6.isActive());
+ assertFalse(binding5.isActive());
+ assertFalse(binding6.isActive());
+ assertTrue(binding3.isActive());
+ assertTrue(binding4.isActive());
Message msg1 = MessageFactory.createCoreMessage(1);
MessageReference ref1 = ms.reference(msg1);
@@ -224,76 +271,88 @@
boolean routed = exchange.route(ref1, "topic1", null);
assertTrue(routed);
- List msgs = queue1.browse();
+ List msgs = receiver1.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue2.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
+ msgs = receiver3.getMessages();
+ assertNotNull(msgs);
+ assertEquals(1, msgs.size());
+ Message msgRec = (Message)msgs.get(0);
+ assertTrue(msg1 == msgRec);
+ receiver3.acknowledge(msgRec, null);
msgs = queue3.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
-
- msgs = queue4.browse();
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver4.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue5.browse();
+ msgs = receiver5.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue6.browse();
+ msgs = receiver6.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
+ receiver3.clear();
+
+
+
Message msg2 = MessageFactory.createCoreMessage(2);
MessageReference ref2 = ms.reference(msg2);
routed = exchange.route(ref2, "topic2", null);
assertTrue(routed);
- msgs = queue4.browse();
+ msgs = receiver1.getMessages();
assertNotNull(msgs);
- assertEquals(1, msgs.size());
- Message msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+ assertTrue(msgs.isEmpty());
- msgs = queue5.browse();
+ msgs = receiver2.getMessages();
assertNotNull(msgs);
- assertEquals(1, msgs.size());
- msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+ assertTrue(msgs.isEmpty());
- msgs = queue6.browse();
+ msgs = receiver3.getMessages();
assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ msgs = receiver4.getMessages();
+ assertNotNull(msgs);
assertEquals(1, msgs.size());
msgRec = (Message)msgs.get(0);
- assertEquals(msgRec.getMessageID(), msg2.getMessageID());
-
- msgs = queue1.browse();
+ assertTrue(msg1 == msgRec);
+ receiver4.acknowledge(msgRec, null);
+ msgs = queue4.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue2.browse();
+ msgs = receiver5.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
- msgs = queue3.browse();
+ msgs = receiver6.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
}
- public void testRouteNoBinding() throws Throwable
+ public final void testRouteNoBinding() throws Throwable
{
Exchange exchange = createExchange();
Binding binding1 =
- exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);
MessageQueue queue1 = binding1.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
assertTrue(binding1.isActive());
@@ -305,21 +364,29 @@
//A topic exchange always returns true even if there is no binding
assertTrue(routed);
- List msgs = queue1.browse();
+ List msgs = receiver1.getMessages();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
}
- public void testRouteTransactional() throws Throwable
+ public final void testRouteTransactional() throws Throwable
{
Exchange exchange = createExchange();
Binding binding1 =
- exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+ exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);
+ Binding binding2 =
+ exchange.bindQueue("sub2", "topic1", null, false, false, ms, pm, 1000, 20, 20);
MessageQueue queue1 = binding1.getQueue();
+ SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue1.add(receiver1);
+ MessageQueue queue2 = binding2.getQueue();
+ SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+ queue2.add(receiver2);
assertTrue(binding1.isActive());
+ assertTrue(binding2.isActive());
Message msg1 = MessageFactory.createCoreMessage(1);
MessageReference ref1 = ms.reference(msg1);
@@ -336,20 +403,49 @@
List msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
tx.commit();
- msgs = queue1.browse();
+ msgs = receiver1.getMessages();
assertNotNull(msgs);
assertEquals(2, msgs.size());
Message msgRec1 = (Message)msgs.get(0);
- Message msgRec2 = (Message)msgs.get(0);
- assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
- assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
+ Message msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg1);
+ assertTrue(msgRec2 == msg2);
- queue1.removeAllReferences();
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg1);
+ assertTrue(msgRec2 == msg2);
+ //Acknowledge non transactionally
+ receiver1.acknowledge(msgRec1, null);
+ receiver1.acknowledge(msgRec2, null);
+
+ receiver2.acknowledge(msgRec1, null);
+ receiver2.acknowledge(msgRec2, null);
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver1.clear();
+
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver2.clear();
+
Message msg3 = MessageFactory.createCoreMessage(3);
MessageReference ref3 = ms.reference(msg3);
@@ -365,13 +461,164 @@
msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertTrue(msgs.isEmpty());
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
tx.rollback();
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver1.clear();
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver2.clear();
+
+
+ Message msg5 = MessageFactory.createCoreMessage(5);
+ MessageReference ref5 = ms.reference(msg5);
+
+ Message msg6 = MessageFactory.createCoreMessage(6);
+ MessageReference ref6 = ms.reference(msg6);
+
+ routed = exchange.route(ref5, "topic1", null);
+ assertTrue(routed);
+ routed = exchange.route(ref6, "topic1", null);
+ assertTrue(routed);
+
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg5);
+ assertTrue(msgRec2 == msg6);
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg5);
+ assertTrue(msgRec2 == msg6);
+
+ //Acknowledge transactionally
+
+ tx = tr.createTransaction();
+
+ receiver1.acknowledge(msgRec1, tx);
+ receiver1.acknowledge(msgRec2, tx);
+
+ receiver2.acknowledge(msgRec1, tx);
+ receiver2.acknowledge(msgRec2, tx);
+
msgs = queue1.browse();
assertNotNull(msgs);
- assertEquals(0, msgs.size());
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg5);
+ assertTrue(msgRec2 == msg6);
+
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg5);
+ assertTrue(msgRec2 == msg6);
+
+ tx.commit();
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver1.clear();
+
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertTrue(msgs.isEmpty());
+
+ receiver2.clear();
+
+ Message msg7 = MessageFactory.createCoreMessage(7);
+ MessageReference ref7 = ms.reference(msg7);
+
+ Message msg8 = MessageFactory.createCoreMessage(8);
+ MessageReference ref8 = ms.reference(msg8);
+
+ routed = exchange.route(ref7, "topic1", null);
+ assertTrue(routed);
+ routed = exchange.route(ref8, "topic1", null);
+ assertTrue(routed);
+
+ msgs = receiver1.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ msgs = receiver2.getMessages();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ //Acknowledge transactionally
+
+ tx = tr.createTransaction();
+
+ receiver1.acknowledge(msgRec1, tx);
+ receiver1.acknowledge(msgRec2, tx);
+
+ receiver2.acknowledge(msgRec1, tx);
+ receiver2.acknowledge(msgRec2, tx);
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ tx.rollback();
+
+ msgs = queue1.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
+
+ msgs = queue2.browse();
+ assertNotNull(msgs);
+ assertEquals(2, msgs.size());
+ msgRec1 = (Message)msgs.get(0);
+ msgRec2 = (Message)msgs.get(1);
+ assertTrue(msgRec1 == msg7);
+ assertTrue(msgRec2 == msg8);
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java 2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java 2006-08-31 18:11:07 UTC (rev 1244)
@@ -27,11 +27,7 @@
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.local.MessageQueue;
-import org.jboss.messaging.core.memory.MemoryManager;
-import org.jboss.messaging.core.memory.SimpleMemoryManager;
-import org.jboss.messaging.core.message.MessageFactory;
import org.jboss.messaging.core.plugin.IdManager;
import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
import org.jboss.messaging.core.plugin.SimpleMessageStore;
@@ -39,8 +35,10 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.exchange.Binding;
+import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleReceiver;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.jmx.ServiceContainer;
@@ -122,7 +120,7 @@
// Public --------------------------------------------------------
- public void testBind() throws Throwable
+ public final void testBind() throws Throwable
{
Exchange exchange1 = createExchange();
@@ -154,11 +152,7 @@
Binding binding3 = exchange1.getBindingForName("durableQueue");
assertNotNull(binding3);
assertTrue(binding1 == binding3);
- assertEquals("durableQueue", binding3.getQueueName());
- assertEquals("condition1", binding3.getCondition());
- assertEquals(filter1.getFilterString(), binding3.getSelector());
- assertEquals(true, binding3.isNoLocal());
- assertEquals(true, binding3.isDurable());
+ assertEquivalent(binding1, binding3);
assertNotNull(binding3.getQueue());
assertEquals(true, binding3.getQueue().isRecoverable());
@@ -166,11 +160,7 @@
Binding binding4 = exchange1.getBindingForName("nonDurableQueue");
assertNotNull(binding4);
assertTrue(binding2 == binding4);
- assertEquals("nonDurableQueue", binding4.getQueueName());
- assertEquals("condition2", binding4.getCondition());
- assertEquals(filter2.getFilterString(), binding4.getSelector());
- assertEquals(true, binding4.isNoLocal());
- assertEquals(false, binding4.isDurable());
+ assertEquivalent(binding2, binding4);
assertNotNull(binding4.getQueue());
assertEquals(false, binding4.getQueue().isRecoverable());
@@ -180,11 +170,7 @@
//Only one binding should be there
Binding binding5 = exchange2.getBindingForName("durableQueue");
assertNotNull(binding5);
- assertEquals("durableQueue", binding5.getQueueName());
- assertEquals("condition1", binding5.getCondition());
- assertEquals(filter1.getFilterString(), binding5.getSelector());
- assertEquals(true, binding5.isNoLocal());
- assertEquals(true, binding5.isDurable());
+ assertEquivalent(binding1, binding5);
//Should be unloaded
assertNull(binding5.getQueue());
@@ -197,11 +183,7 @@
//Unbind the binding
Binding binding7 = exchange2.unbindQueue("durableQueue");
assertNotNull(binding7);
- assertEquals("durableQueue", binding7.getQueueName());
- assertEquals("condition1", binding7.getCondition());
- assertEquals(filter1.getFilterString(), binding7.getSelector());
- assertEquals(true, binding7.isNoLocal());
- assertEquals(true, binding7.isDurable());
+ assertEquivalent(binding1, binding7);
//Make sure no longer there
Binding binding8 = exchange2.getBindingForName("durableQueue");
@@ -216,7 +198,7 @@
}
- public void testListBindings() throws Throwable
+ public final void testListBindings() throws Throwable
{
Exchange exchange = createExchange();
@@ -272,7 +254,7 @@
}
- public void testUnloadReload() throws Exception
+ public final void testUnloadReload() throws Exception
{
Exchange exchange = createExchange();
@@ -402,12 +384,52 @@
protected void assertEquivalent(Binding binding1, Binding binding2)
{
+ assertEquals(binding1.getNodeId(), binding2.getNodeId());
assertEquals(binding1.getQueueName(), binding2.getQueueName());
- assertEquals(binding1.getCondition(), binding2.getCondition());
assertEquals(binding1.getSelector(), binding2.getSelector());
+ assertEquals(binding1.getChannelId(), binding2.getChannelId());
+ assertEquals(binding1.isDurable(), binding2.isDurable());
assertEquals(binding1.isNoLocal(), binding2.isNoLocal());
- assertEquals(binding1.isDurable(), binding2.isDurable());
}
+
+// protected void checkHasMessages(Message[] msgArray, MessageQueue queue, SimpleReceiver receiver)
+// {
+// List msgs = receiver.getMessages();
+//
+// assertNotNull(msgs);
+// assertEquals(msgArray.length, msgs.size());
+// for (int i = 0; i < msgArray.length; i++)
+// {
+// assertEquals(msgArray[i].getMessageID(), ((Message)msgs.get(i)).getMessageID());
+// }
+//
+// msgs = queue.browse();
+// assertNotNull(msgs);
+// assertEquals(msgArray.length, msgs.size());
+// for (int i = 0; i < msgArray.length; i++)
+// {
+// assertEquals(msgArray[i].getMessageID(), ((Message)msgs.get(i)).getMessageID());
+// }
+// }
+//
+// protected void ackMessages(Message[] msgArray, SimpleReceiver receiver, Transaction tx) throws Throwable
+// {
+// for (int i = 0; i < msgArray.length; i++)
+// {
+// receiver.acknowledge(msgArray[i], tx);
+// }
+// }
+//
+// protected void checkHasntMessages(MessageQueue queue, SimpleReceiver receiver)
+// {
+// List msgs = receiver.getMessages();
+// assertNotNull(msgs);
+// assertTrue(msgs.isEmpty());
+//
+// msgs = queue.browse();
+// assertNotNull(msgs);
+// assertTrue(msgs.isEmpty());
+// }
// Private -------------------------------------------------------
More information about the jboss-cvs-commits
mailing list