[Jboss-cvs] JBoss Messaging SVN: r1244 - in trunk: src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/base

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 31 14:11:15 EDT 2006


Author: timfox
Date: 2006-08-31 14:11:07 -0400 (Thu, 31 Aug 2006)
New Revision: 1244

Modified:
   trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
   trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
   trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java
Log:
Second interim clustering commit



Modified: trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/src/main/org/jboss/messaging/core/SimpleDelivery.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -138,8 +138,6 @@
 
       //Important note! We must ALWAYS set done true irrespective of whether we are in a tx or not.
       //Previously we were only setting done to true if there was no transaction.
-      //This caused a memory leak when using with JBoss EJB3, since JBoss EJB3 always
-      //seems to ack in a tx.
       //This meant that if the acknowledgement (in the tx) came in before the call to handle()
       //had returned the delivery would end up in the delivery set in the channel and never
       //get removed - causing a leak

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -56,9 +56,10 @@
    private static final Logger log = Logger.getLogger(SimpleReceiver.class);
 
    public static final String ACKING = "ACKING";
-   public static final String NACKING = "NACKING";
+   public static final String ACCEPTING = "ACCEPTING";
    public static final String BROKEN = "BROKEN";
    public static final String REJECTING = "REJECTING";
+   public static final String SELECTOR_REJECTING = "SELECTOR_REJECTING";
 
    private static final String INVOCATION_COUNT = "INVOCATION_COUNT";
 
@@ -127,6 +128,12 @@
             log.info("Receiver [" + name + "] is rejecting a null message");
             return null;
          }
+         
+         if (SELECTOR_REJECTING.equals(state))
+         {
+            log.info(this + " is rejecting message since doesn't match selector");
+            return new SimpleDelivery(null, null, true, false);
+         }
 
          if (REJECTING.equals(state))
          {
@@ -379,9 +386,10 @@
    private static void checkValid(String state)
    {
       if (!ACKING.equals(state) &&
-          !NACKING.equals(state) &&
+          !ACCEPTING.equals(state) &&
           !BROKEN.equals(state) &&
-          !REJECTING.equals(state))
+          !REJECTING.equals(state) &&
+          !SELECTOR_REJECTING.equals(state))
       {
          throw new IllegalArgumentException("Unknown receiver state: " + state);
       }
@@ -409,7 +417,7 @@
       
       public void afterCommit()
       {
-//       clear the delivery
+         // clear the delivery
          touple[1] = null;
       }
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/base/MessageQueueTestBase.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -2257,7 +2257,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2299,7 +2299,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       r.setImmediateAsynchronousAcknowledgment(true);
       assertTrue(messageQueue.add(r));
 
@@ -2339,7 +2339,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2387,7 +2387,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2446,7 +2446,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2489,7 +2489,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2538,7 +2538,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2614,7 +2614,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -2649,7 +2649,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -2709,7 +2709,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -2774,7 +2774,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2814,7 +2814,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2873,7 +2873,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -2928,7 +2928,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -2976,7 +2976,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3041,7 +3041,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3096,7 +3096,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3154,7 +3154,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3212,7 +3212,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3273,7 +3273,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3320,7 +3320,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3374,7 +3374,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3417,7 +3417,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -3466,7 +3466,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3507,7 +3507,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -3553,7 +3553,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -4832,7 +4832,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -4874,7 +4874,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       r.setImmediateAsynchronousAcknowledgment(true);
       assertTrue(messageQueue.add(r));
 
@@ -4914,7 +4914,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -4962,7 +4962,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -5021,7 +5021,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5064,7 +5064,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5113,7 +5113,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5180,7 +5180,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -5222,7 +5222,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       r.setImmediateAsynchronousAcknowledgment(true);
       assertTrue(messageQueue.add(r));
 
@@ -5263,7 +5263,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -5312,7 +5312,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, true, "payload");
@@ -5372,7 +5372,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5415,7 +5415,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5464,7 +5464,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5532,7 +5532,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       Message m = MessageFactory.createCoreMessage(0, false, "payload");
@@ -5581,7 +5581,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5637,7 +5637,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -5686,7 +5686,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5737,7 +5737,7 @@
 
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -5799,7 +5799,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -5842,7 +5842,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
       SimpleDeliveryObserver observer = new SimpleDeliveryObserver();
@@ -5891,7 +5891,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -5932,7 +5932,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -5977,7 +5977,7 @@
       }
 
       // add an NACKING receiver to the channel
-      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.NACKING);
+      SimpleReceiver r = new SimpleReceiver("NackingReceiver", SimpleReceiver.ACCEPTING);
       assertTrue(messageQueue.add(r));
 
 
@@ -6132,7 +6132,7 @@
       assertEquals(1, stored.size());
 
       SimpleReceiver receiver =
-            new SimpleReceiver("NACKINGReceiver", SimpleReceiver.NACKING, messageQueue);
+            new SimpleReceiver("NACKINGReceiver", SimpleReceiver.ACCEPTING, messageQueue);
       assertTrue(messageQueue.add(receiver));
 
       assertEquals(1, messageQueue.browse().size());
@@ -6268,7 +6268,7 @@
       assertEquals(1, stored.size());
 
       SimpleReceiver receiver =
-            new SimpleReceiver("NACKINGReceiver", SimpleReceiver.NACKING, messageQueue);
+            new SimpleReceiver("NACKINGReceiver", SimpleReceiver.ACCEPTING, messageQueue);
       assertTrue(messageQueue.add(receiver));
 
       assertEquals(1, messageQueue.browse().size());

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/ClusteredTopicExchangeTest.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -23,6 +23,7 @@
 
 import java.util.List;
 
+
 import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.local.MessageQueue;
@@ -30,13 +31,15 @@
 import org.jboss.messaging.core.plugin.contract.Exchange;
 import org.jboss.messaging.core.plugin.exchange.Binding;
 import org.jboss.messaging.core.plugin.exchange.ClusteredTopicExchange;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
 import org.jgroups.Channel;
 import org.jgroups.JChannel;
 
 /**
  * 
  * A ClusteredTopicExchangeTest
-
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
@@ -71,6 +74,529 @@
       super.tearDown();
    }
    
+   //For the following tests create a simple receiver and acknowledge messages properly rather
+   //than relying on removeAllReferences
+   
+   //testBindUnbind - bind and unbind on different nodes as members join and leave, make sure the correct state
+   //is replicated - make sure durable bindings remain durable
+   
+   //non transactional route
+   //create several subscriptions same durable some non durable on different nodes
+   //route messages with no tx - make sure it arrives ok - make sure messages arrive in durable
+   //subs even if node is shutdown and restarted
+   
+   //as above but transactional
+   
+   //test that activate etc is local only
+   
+   //test that can't bind twice with same name across cluster
+   
+   //test that can't do exchange operations on an exchange that is stopped
+   
+   //start and stop exchanges concurrently to test for race conditions
+   
+   //Need to add tests for queue/topic with selector
+   
+   //Need to route with peristent and non persistent messages
+   
+   public void testClusteredBindUnbind() throws Throwable
+   {
+      Exchange exchange1 = null;
+      
+      Exchange exchange2 = null;
+      
+      Exchange exchange3 = null;
+      
+      try
+      {         
+         //Start one exchange
+         
+         exchange1 = createExchange("node1", "testgroup");
+         
+         //Add a couple of bindings
+         
+         Binding binding1 = exchange1.bindQueue("sub1", "topic1", null, false, false, ms, pm, 20000, 100, 100);         
+         Binding binding2 = exchange1.bindQueue("sub2", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+    
+         //Start another exchange - make sure it picks up the bindings from the first node
+         
+         exchange2 = createExchange("node2", "testgroup");
+         
+         List bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding1, (Binding)bindings.get(0));
+         assertEquivalent(binding2, (Binding)bindings.get(1));         
+         
+         //Add another binding on node 2
+         
+         Binding binding3 = exchange2.bindQueue("sub3", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+         
+         //Make sure both nodes pick it up
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding1, (Binding)bindings.get(0));
+         assertEquivalent(binding2, (Binding)bindings.get(1));
+         assertEquivalent(binding3, (Binding)bindings.get(2));
+
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding1, (Binding)bindings.get(0));
+         assertEquivalent(binding2, (Binding)bindings.get(1));
+         assertEquivalent(binding3, (Binding)bindings.get(2));
+
+         //Add another binding on node 1
+         
+         Binding binding4 = exchange2.bindQueue("sub4", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+         
+         // Make sure both nodes pick it up
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(4, bindings.size());
+         
+         assertEquivalent(binding1, (Binding)bindings.get(0));
+         assertEquivalent(binding2, (Binding)bindings.get(1));
+         assertEquivalent(binding3, (Binding)bindings.get(2));
+         assertEquivalent(binding4, (Binding)bindings.get(2));
+         
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(4, bindings.size());
+         
+         assertEquivalent(binding1, (Binding)bindings.get(0));
+         assertEquivalent(binding2, (Binding)bindings.get(1));
+         assertEquivalent(binding3, (Binding)bindings.get(2));
+         assertEquivalent(binding4, (Binding)bindings.get(3));
+         
+         //Unbind binding 1 and binding 2
+         exchange1.unbindQueue("sub1");
+         exchange1.unbindQueue("sub2");
+         
+         //Make sure bindings are not longer available on either node
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+   
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(4, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         
+         //Add a third exchange
+                  
+         exchange1 = createExchange("node3", "testgroup");
+         
+         //Maks sure it picks up the bindings
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         
+         //Add another binding on node 3
+                  
+         Binding binding5 = exchange3.bindQueue("sub5", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+         
+         // Make sure all nodes pick it up
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         
+         //Add a durable binding on node 1
+         
+         Binding binding6 = exchange1.bindQueue("sub6", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+         
+         // Make sure all nodes pick it up
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(4, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         assertEquivalent(binding6, (Binding)bindings.get(3));
+         
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(4, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         assertEquivalent(binding6, (Binding)bindings.get(3));
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(4, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         assertEquivalent(binding6, (Binding)bindings.get(3));
+         
+         //Stop exchange 1
+         exchange1.stop();
+         
+         //All it's non durable bindings should be removed from the other nodes
+         //Durable bindings should remain
+         
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(3, bindings.size());
+         
+         assertEquivalent(binding3, (Binding)bindings.get(0));
+         assertEquivalent(binding4, (Binding)bindings.get(1));
+         assertEquivalent(binding5, (Binding)bindings.get(2));
+         
+         //Stop exchange 2
+         exchange2.stop();
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding4, (Binding)bindings.get(0));
+         assertEquivalent(binding5, (Binding)bindings.get(1));
+         
+         //Restart exchange 1 and exchange 2
+         exchange1.start();
+         
+         exchange2.start();
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding4, (Binding)bindings.get(0));
+         assertEquivalent(binding5, (Binding)bindings.get(1));
+         
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding4, (Binding)bindings.get(0));
+         assertEquivalent(binding5, (Binding)bindings.get(1));
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(2, bindings.size());
+         
+         assertEquivalent(binding4, (Binding)bindings.get(0));
+         assertEquivalent(binding5, (Binding)bindings.get(1));
+         
+         //Stop all exchanges
+         
+         exchange1.stop();
+         exchange2.stop();
+         exchange3.stop();
+         
+         //Start them all
+         exchange1.start();
+         exchange2.start();
+         exchange3.start();
+         
+         //Only the durable queue should survive
+         
+         bindings = exchange1.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(1, bindings.size());
+         
+         assertEquivalent(binding5, (Binding)bindings.get(0));
+         
+         bindings = exchange2.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(1, bindings.size());
+         
+         assertEquivalent(binding5, (Binding)bindings.get(0));
+         
+         bindings = exchange3.listBindingsForWildcard("topic1");
+         assertNotNull(bindings);
+         assertEquals(1, bindings.size());
+         
+         assertEquivalent(binding5, (Binding)bindings.get(0));
+      }
+      finally
+      {
+         if (exchange1 != null)
+         {
+            exchange1.stop();
+         }
+         
+         if (exchange2 != null)
+         {
+            exchange2.stop();
+         }
+         
+         if (exchange3 != null)
+         {
+            exchange2.stop();
+         }
+      }
+      
+   }
+   
+   public void testClusteredRoute() throws Throwable
+   {
+      Exchange exchange1 = null;
+      
+      Exchange exchange2 = null;
+      
+      Exchange exchange3 = null;
+      
+      try
+      {   
+         //Start three exchanges
+         
+         exchange1.start();
+         exchange2.start();
+         exchange3.start();
+         
+         //Two topics with a mixture of durable and non durable subscriptions
+         
+         Binding[] bindings = new Binding[16];
+         
+         bindings[0] = exchange1.bindQueue("sub1", "topic1", null, false, false, ms, pm, 20000, 100, 100);         
+         bindings[1] = exchange1.bindQueue("sub2", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+         
+         bindings[2] = exchange2.bindQueue("sub3", "topic1", null, false, false, ms, pm, 20000, 100, 100);         
+         bindings[3] = exchange2.bindQueue("sub4", "topic1", null, false, false, ms, pm, 20000, 100, 100);         
+         bindings[4] = exchange2.bindQueue("sub5", "topic1", null, false, true, ms, pm, 20000, 100, 100);      
+         
+         bindings[5] = exchange1.bindQueue("sub6", "topic1", null, false, false, ms, pm, 20000, 100, 100);
+         bindings[6] = exchange1.bindQueue("sub7", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+         bindings[7] = exchange1.bindQueue("sub8", "topic1", null, false, true, ms, pm, 20000, 100, 100);
+                  
+         bindings[8] = exchange1.bindQueue("sub1", "topic2", null, false, false, ms, pm, 20000, 100, 100);         
+         bindings[9] = exchange1.bindQueue("sub2", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+         
+         bindings[10] = exchange2.bindQueue("sub3", "topic2", null, false, false, ms, pm, 20000, 100, 100);         
+         bindings[11] = exchange2.bindQueue("sub4", "topic2", null, false, false, ms, pm, 20000, 100, 100);         
+         bindings[12] = exchange2.bindQueue("sub5", "topic2", null, false, true, ms, pm, 20000, 100, 100);      
+         
+         bindings[13] = exchange1.bindQueue("sub6", "topic2", null, false, false, ms, pm, 20000, 100, 100);
+         bindings[14] = exchange1.bindQueue("sub7", "topic2", null, false, true, ms, pm, 20000, 100, 100);
+         bindings[15] = exchange1.bindQueue("sub8", "topic2", null, false, true, ms, pm, 20000, 100, 100);
+    
+         MessageQueue[] queues = new MessageQueue[16];
+         SimpleReceiver[] receivers = new SimpleReceiver[16];
+         
+         for (int i = 0; i < 16; i++)
+         {
+            queues[i] = bindings[i].getQueue();
+            receivers[i] = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+         }
+         
+         Message msg1 = MessageFactory.createCoreMessage(1);      
+         MessageReference ref1 = ms.reference(msg1);         
+
+         boolean routed = exchange1.route(ref1, "topic1", null);         
+         assertTrue(routed);
+         
+         List msgs = receivers[0].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         Message msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[0].acknowledge(msgRec, null);
+         msgs = queues[0].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[1].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[1].acknowledge(msgRec, null);
+         msgs = queues[1].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[2].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[2].acknowledge(msgRec, null);
+         msgs = queues[2].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[3].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[3].acknowledge(msgRec, null);
+         msgs = queues[3].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[4].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[4].acknowledge(msgRec, null);
+         msgs = queues[4].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[5].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[5].acknowledge(msgRec, null);
+         msgs = queues[5].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[6].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[6].acknowledge(msgRec, null);
+         msgs = queues[6].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[7].getMessages();
+         assertNotNull(msgs);
+         assertEquals(1, msgs.size());
+         msgRec = (Message)msgs.get(0);
+         assertEquals(msg1.getMessageID(), msgRec.getMessageID());
+         receivers[7].acknowledge(msgRec, null);
+         msgs = queues[7].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty()); 
+         
+         msgs = receivers[8].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[8].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[9].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[9].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[10].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[10].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[11].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[11].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[12].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[12].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[13].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[13].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[14].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[14].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receivers[15].getMessages();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         msgs = queues[15].browse();
+         assertNotNull(msgs);
+         assertTrue(msgs.isEmpty());
+         
+         
+      }
+      finally
+      {
+         if (exchange1 != null)
+         {
+            exchange1.stop();
+         }
+         
+         if (exchange2 != null)
+         {
+            exchange2.stop();
+         }
+         
+         if (exchange3 != null)
+         {
+            exchange2.stop();
+         }
+      }
+   }
+   
+   
+
+   
    public void testClusteredTopic() throws Exception
    {
       Exchange exchange1 = null;

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/DirectExchangeTest.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.plugin.exchange.Binding;
 import org.jboss.messaging.core.plugin.exchange.DirectExchange;
 import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
 import org.jboss.test.messaging.core.plugin.base.ExchangeTestBase;
 
 /**
@@ -71,7 +72,7 @@
       super.tearDown();
    }
    
-   public void testRoute() throws Throwable
+   public final void testRoute() throws Throwable
    {
       Exchange exchange = createExchange();
       
@@ -82,6 +83,10 @@
       
       MessageQueue queue1 = binding1.getQueue();      
       MessageQueue queue2 = binding2.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
+      SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver2);
       
       assertTrue(binding1.isActive());      
       assertTrue(binding2.isActive());
@@ -92,36 +97,45 @@
       boolean routed = exchange.route(ref1, "queue1", null);      
       assertTrue(routed);
       
-      List msgs = queue1.browse();
+      List msgs = receiver1.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       Message msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg1.getMessageID());
-      queue1.removeAllReferences();
+      assertTrue(msg1 == msgRec);
+      receiver1.acknowledge(msgRec, null);
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());      
       
-      msgs = queue2.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
+      receiver1.clear();
+      receiver2.clear();
+      
       Message msg2 = MessageFactory.createCoreMessage(2);      
       MessageReference ref2 = ms.reference(msg2);
       
       routed = exchange.route(ref2, "queue2", null);      
       assertTrue(routed);
       
-      msgs = queue2.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
-      queue2.removeAllReferences();
+      assertTrue(msg2 == msgRec);
+      receiver2.acknowledge(msgRec, null);
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue1.browse();
+      msgs = receiver1.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
    }
    
-   public void testRouteInactive() throws Throwable
+   public final void testRouteInactive() throws Throwable
    {
       Exchange exchange = createExchange();
       
@@ -132,10 +146,13 @@
       
       MessageQueue queue1 = binding1.getQueue();
       MessageQueue queue2 = binding2.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
+      SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver2);
       
-      //Now make bindings inactive
+      //Now make one bindings inactive
       binding1.deactivate();
-      binding2.deactivate();
       
       assertFalse(binding1.isActive());
       assertFalse(binding2.isActive());
@@ -146,23 +163,37 @@
       boolean routed = exchange.route(ref1, "queue1", null);      
       assertFalse(routed);
       
+      List msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+      
+      msgs = receiver2.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+                  
       Message msg2 = MessageFactory.createCoreMessage(2);      
       MessageReference ref2 = ms.reference(msg2);
       
-      routed = exchange.route(ref1, "queue2", null);      
-      assertFalse(routed);
+      routed = exchange.route(ref2, "queue2", null);      
+      assertTrue(routed);
       
-      List msgs = queue1.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());     
-      
+      assertEquals(1, msgs.size());
+      Message msgRec = (Message)msgs.get(0);
+      assertTrue(msg2 == msgRec);
+      receiver2.acknowledge(msgRec, null);
       msgs = queue2.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());     
+      assertTrue(msgs.isEmpty());    
+      
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());     
    
    }
    
-   public void testRouteNoBinding() throws Throwable
+   public final void testRouteNoBinding() throws Throwable
    {
       Exchange exchange = createExchange();
       
@@ -170,6 +201,8 @@
          exchange.bindQueue("queue1", "queue1", null, false, true, ms, pm, 1000, 20, 20);      
 
       MessageQueue queue1 = binding1.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
 
       assertTrue(binding1.isActive());
 
@@ -183,10 +216,10 @@
             
       List msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());             
+      assertTrue(msgs.isEmpty());             
    }
    
-   public void testRouteTransactional() throws Throwable
+   public final void testRouteTransactional() throws Throwable
    {
       Exchange exchange = createExchange();
       
@@ -194,6 +227,8 @@
          exchange.bindQueue("queue1", "queue1", null, false, true, ms, pm, 1000, 20, 20);      
 
       MessageQueue queue1 = binding1.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
 
       assertTrue(binding1.isActive());
 
@@ -209,23 +244,32 @@
       assertTrue(routed);
       routed = exchange.route(ref2, "queue1", tx);            
       assertTrue(routed);
-            
+      
+      
       List msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size()); 
+      assertTrue(msgs.isEmpty());
       
       tx.commit();
       
-      msgs = queue1.browse();
+      msgs = receiver1.getMessages();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());
       Message msgRec1 = (Message)msgs.get(0);
-      Message msgRec2 = (Message)msgs.get(0);
-      assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-      assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
+      Message msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg1);
+      assertTrue(msgRec2 == msg2);
       
-      queue1.removeAllReferences();
+      //Acknowledge non transactionally
+      receiver1.acknowledge(msgRec1, null);
+      receiver1.acknowledge(msgRec2, null);
+
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
       
+      receiver1.clear();
+           
       Message msg3 = MessageFactory.createCoreMessage(3);      
       MessageReference ref3 = ms.reference(msg3);
       
@@ -241,13 +285,104 @@
             
       msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size()); 
+      assertTrue(msgs.isEmpty()); 
       
       tx.rollback();
       
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+      
+      receiver1.clear();
+      
+      
+      Message msg5 = MessageFactory.createCoreMessage(5);      
+      MessageReference ref5 = ms.reference(msg5);
+      
+      Message msg6 = MessageFactory.createCoreMessage(6);      
+      MessageReference ref6 = ms.reference(msg6);
+            
+      routed = exchange.route(ref5, "queue1", null);            
+      assertTrue(routed);
+      routed = exchange.route(ref6, "queue1", null);            
+      assertTrue(routed);
+      
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg5);
+      assertTrue(msgRec2 == msg6);
+      
+      //Acknowledge transactionally
+      
+      tx = tr.createTransaction();
+      
+      receiver1.acknowledge(msgRec1, tx);
+      receiver1.acknowledge(msgRec2, tx);
+      
       msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg5);
+      assertTrue(msgRec2 == msg6);
+      
+      tx.commit();
+      
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+      
+      
+      receiver1.clear();
+      
+      Message msg7 = MessageFactory.createCoreMessage(7);      
+      MessageReference ref7 = ms.reference(msg7);
+      
+      Message msg8 = MessageFactory.createCoreMessage(8);      
+      MessageReference ref8 = ms.reference(msg8);
+            
+      routed = exchange.route(ref7, "queue1", null);            
+      assertTrue(routed);
+      routed = exchange.route(ref8, "queue1", null);            
+      assertTrue(routed);
+      
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      //Acknowledge transactionally
+      
+      tx = tr.createTransaction();
+      
+      receiver1.acknowledge(msgRec1, tx);
+      receiver1.acknowledge(msgRec2, tx);
+      
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      tx.rollback();
+      
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+
    }
    
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/TopicExchangeTest.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -29,9 +29,9 @@
 import org.jboss.messaging.core.message.MessageFactory;
 import org.jboss.messaging.core.plugin.contract.Exchange;
 import org.jboss.messaging.core.plugin.exchange.Binding;
-import org.jboss.messaging.core.plugin.exchange.ClusteredTopicExchange;
 import org.jboss.messaging.core.plugin.exchange.TopicExchange;
 import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleReceiver;
 import org.jboss.test.messaging.core.plugin.base.ExchangeTestBase;
 
 /**
@@ -72,16 +72,16 @@
       super.tearDown();
    }
    
-   public void testRoute() throws Throwable
+   public final void testRoute() throws Throwable
    {
       Exchange exchange = createExchange();
       
       Binding binding1 =
-         exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);      
+         exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);      
       Binding binding2 =
-         exchange.bindQueue("sub2", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+         exchange.bindQueue("sub2", "topic1", null, false, false, ms, pm, 1000, 20, 20);
       Binding binding3 =
-         exchange.bindQueue("sub3", "topic1", null, false, true, ms, pm, 1000, 20, 20);      
+         exchange.bindQueue("sub3", "topic1", null, false, false, ms, pm, 1000, 20, 20);      
       Binding binding4 =
          exchange.bindQueue("sub4", "topic2", null, false, true, ms, pm, 1000, 20, 20);
       Binding binding5 =
@@ -95,6 +95,18 @@
       MessageQueue queue4 = binding4.getQueue();
       MessageQueue queue5 = binding5.getQueue();      
       MessageQueue queue6 = binding6.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
+      SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue2.add(receiver2);
+      SimpleReceiver receiver3 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue3.add(receiver3);
+      SimpleReceiver receiver4 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue4.add(receiver4);
+      SimpleReceiver receiver5 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue5.add(receiver5);
+      SimpleReceiver receiver6 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue6.add(receiver6);
       
       assertTrue(binding1.isActive());      
       assertTrue(binding2.isActive());
@@ -109,39 +121,51 @@
       boolean routed = exchange.route(ref1, "topic1", null);      
       assertTrue(routed);
       
-      List msgs = queue1.browse();
+      List msgs = receiver1.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       Message msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg1.getMessageID());
+      assertTrue(msg1 == msgRec);
+      receiver1.acknowledge(msgRec, null);
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue2.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg1.getMessageID());
+      assertTrue(msg1 == msgRec);
+      receiver2.acknowledge(msgRec, null);
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue3.browse();
+      msgs = receiver3.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg1.getMessageID());
+      assertTrue(msg1 == msgRec);
+      receiver3.acknowledge(msgRec, null);
+      msgs = queue3.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue4.browse();
+      msgs = receiver4.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue5.browse();
+      msgs = receiver5.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue6.browse();
+      msgs = receiver6.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      queue1.removeAllReferences();
-      queue2.removeAllReferences();
-      queue3.removeAllReferences();
+      receiver1.clear();
+      receiver2.clear();
+      receiver3.clear();
       
       
       Message msg2 = MessageFactory.createCoreMessage(2);      
@@ -150,49 +174,59 @@
       routed = exchange.route(ref2, "topic2", null);      
       assertTrue(routed);
       
-      msgs = queue4.browse();
+      msgs = receiver4.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+      assertTrue(msg2 == msgRec);
+      receiver4.acknowledge(msgRec, null);
+      msgs = queue4.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue5.browse();
+      msgs = receiver5.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+      assertTrue(msg2 == msgRec);
+      receiver5.acknowledge(msgRec, null);
+      msgs = queue5.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue6.browse();
+      msgs = receiver6.getMessages();
       assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+      assertTrue(msg2 == msgRec);
+      receiver6.acknowledge(msgRec, null);
+      msgs = queue6.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());    
       
-      msgs = queue1.browse();
+      msgs = receiver1.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue2.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue3.browse();
+      msgs = receiver3.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
-      
-      
+      assertTrue(msgs.isEmpty());
    }
    
-   public void testRouteInactive() throws Throwable
+   public final void testRouteInactive() throws Throwable
    {
       Exchange exchange = createExchange();
       
       Binding binding1 =
-         exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);      
+         exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);      
       Binding binding2 =
-         exchange.bindQueue("sub2", "topic1", null, false, true, ms, pm, 1000, 20, 20);
+         exchange.bindQueue("sub2", "topic1", null, false, false, ms, pm, 1000, 20, 20);
       Binding binding3 =
-         exchange.bindQueue("sub3", "topic1", null, false, true, ms, pm, 1000, 20, 20);      
+         exchange.bindQueue("sub3", "topic1", null, false, false, ms, pm, 1000, 20, 20);      
       Binding binding4 =
          exchange.bindQueue("sub4", "topic2", null, false, true, ms, pm, 1000, 20, 20);
       Binding binding5 =
@@ -206,17 +240,30 @@
       MessageQueue queue4 = binding4.getQueue();
       MessageQueue queue5 = binding5.getQueue();      
       MessageQueue queue6 = binding6.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
+      SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue2.add(receiver2);
+      SimpleReceiver receiver3 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue3.add(receiver3);
+      SimpleReceiver receiver4 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue4.add(receiver4);
+      SimpleReceiver receiver5 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue5.add(receiver5);
+      SimpleReceiver receiver6 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue6.add(receiver6);
       
       binding1.deactivate();
       binding2.deactivate();
-      binding3.deactivate();
+      binding5.deactivate();
+      binding6.deactivate();
       
       assertFalse(binding1.isActive());      
       assertFalse(binding2.isActive());
-      assertFalse(binding3.isActive());      
-      assertTrue(binding4.isActive());
-      assertTrue(binding5.isActive());      
-      assertTrue(binding6.isActive());
+      assertFalse(binding5.isActive());
+      assertFalse(binding6.isActive()); 
+      assertTrue(binding3.isActive());
+      assertTrue(binding4.isActive());      
       
       Message msg1 = MessageFactory.createCoreMessage(1);      
       MessageReference ref1 = ms.reference(msg1);
@@ -224,76 +271,88 @@
       boolean routed = exchange.route(ref1, "topic1", null);      
       assertTrue(routed);
       
-      List msgs = queue1.browse();
+      List msgs = receiver1.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue2.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
+      msgs = receiver3.getMessages();
+      assertNotNull(msgs);
+      assertEquals(1, msgs.size());
+      Message msgRec = (Message)msgs.get(0);
+      assertTrue(msg1 == msgRec);
+      receiver3.acknowledge(msgRec, null);
       msgs = queue3.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
-
-      msgs = queue4.browse();
+      assertTrue(msgs.isEmpty());  
+      
+      msgs = receiver4.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue5.browse();
+      msgs = receiver5.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue6.browse();
+      msgs = receiver6.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
+      receiver3.clear();
+      
+      
+      
       Message msg2 = MessageFactory.createCoreMessage(2);      
       MessageReference ref2 = ms.reference(msg2);
       
       routed = exchange.route(ref2, "topic2", null);      
       assertTrue(routed);
       
-      msgs = queue4.browse();
+      msgs = receiver1.getMessages();
       assertNotNull(msgs);
-      assertEquals(1, msgs.size());
-      Message msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue5.browse();
+      msgs = receiver2.getMessages();
       assertNotNull(msgs);
-      assertEquals(1, msgs.size());
-      msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue6.browse();
+      msgs = receiver3.getMessages();
       assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());      
+      
+      msgs = receiver4.getMessages();
+      assertNotNull(msgs);
       assertEquals(1, msgs.size());
       msgRec = (Message)msgs.get(0);
-      assertEquals(msgRec.getMessageID(), msg2.getMessageID());
-      
-      msgs = queue1.browse();
+      assertTrue(msg1 == msgRec);
+      receiver4.acknowledge(msgRec, null);
+      msgs = queue4.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());  
       
-      msgs = queue2.browse();
+      msgs = receiver5.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
       
-      msgs = queue3.browse();
+      msgs = receiver6.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertTrue(msgs.isEmpty());
    
    }
 
-   public void testRouteNoBinding() throws Throwable
+   public final void testRouteNoBinding() throws Throwable
    {
       Exchange exchange = createExchange();
       
       Binding binding1 =
-         exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);      
+         exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);      
 
       MessageQueue queue1 = binding1.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
 
       assertTrue(binding1.isActive());
 
@@ -305,21 +364,29 @@
       //A topic exchange always returns true even if there is no binding
       assertTrue(routed);
             
-      List msgs = queue1.browse();
+      List msgs = receiver1.getMessages();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());             
+      assertTrue(msgs.isEmpty());             
    }
    
-   public void testRouteTransactional() throws Throwable
+   public final void testRouteTransactional() throws Throwable
    {
       Exchange exchange = createExchange();
       
       Binding binding1 =
-         exchange.bindQueue("sub1", "topic1", null, false, true, ms, pm, 1000, 20, 20);      
+         exchange.bindQueue("sub1", "topic1", null, false, false, ms, pm, 1000, 20, 20);     
+      Binding binding2 =
+         exchange.bindQueue("sub2", "topic1", null, false, false, ms, pm, 1000, 20, 20);  
 
       MessageQueue queue1 = binding1.getQueue();
+      SimpleReceiver receiver1 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue1.add(receiver1);
+      MessageQueue queue2 = binding2.getQueue();
+      SimpleReceiver receiver2 = new SimpleReceiver(SimpleReceiver.ACCEPTING);
+      queue2.add(receiver2);
 
       assertTrue(binding1.isActive());
+      assertTrue(binding2.isActive());
 
       Message msg1 = MessageFactory.createCoreMessage(1);      
       MessageReference ref1 = ms.reference(msg1);
@@ -336,20 +403,49 @@
             
       List msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size()); 
+      assertTrue(msgs.isEmpty());
       
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+      
       tx.commit();
       
-      msgs = queue1.browse();
+      msgs = receiver1.getMessages();
       assertNotNull(msgs);
       assertEquals(2, msgs.size());
       Message msgRec1 = (Message)msgs.get(0);
-      Message msgRec2 = (Message)msgs.get(0);
-      assertEquals(msg1.getMessageID(), msgRec1.getMessageID());
-      assertEquals(msg2.getMessageID(), msgRec2.getMessageID());
+      Message msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg1);
+      assertTrue(msgRec2 == msg2);
       
-      queue1.removeAllReferences();
+      msgs = receiver2.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg1);
+      assertTrue(msgRec2 == msg2);
       
+      //Acknowledge non transactionally
+      receiver1.acknowledge(msgRec1, null);
+      receiver1.acknowledge(msgRec2, null);
+      
+      receiver2.acknowledge(msgRec1, null);
+      receiver2.acknowledge(msgRec2, null);
+
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      
+      receiver1.clear();
+      
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      
+      receiver2.clear();
+           
       Message msg3 = MessageFactory.createCoreMessage(3);      
       MessageReference ref3 = ms.reference(msg3);
       
@@ -365,13 +461,164 @@
             
       msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size()); 
+      assertTrue(msgs.isEmpty()); 
       
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      
       tx.rollback();
       
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+      
+      receiver1.clear();
+      
+      msgs = receiver2.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+      
+      receiver2.clear();
+      
+      
+      Message msg5 = MessageFactory.createCoreMessage(5);      
+      MessageReference ref5 = ms.reference(msg5);
+      
+      Message msg6 = MessageFactory.createCoreMessage(6);      
+      MessageReference ref6 = ms.reference(msg6);
+            
+      routed = exchange.route(ref5, "topic1", null);            
+      assertTrue(routed);
+      routed = exchange.route(ref6, "topic1", null);            
+      assertTrue(routed);
+      
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg5);
+      assertTrue(msgRec2 == msg6);
+      
+      msgs = receiver2.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg5);
+      assertTrue(msgRec2 == msg6);
+      
+      //Acknowledge transactionally
+      
+      tx = tr.createTransaction();
+      
+      receiver1.acknowledge(msgRec1, tx);
+      receiver1.acknowledge(msgRec2, tx);
+      
+      receiver2.acknowledge(msgRec1, tx);
+      receiver2.acknowledge(msgRec2, tx);
+      
       msgs = queue1.browse();
       assertNotNull(msgs);
-      assertEquals(0, msgs.size());
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg5);
+      assertTrue(msgRec2 == msg6);
+      
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg5);
+      assertTrue(msgRec2 == msg6);
+      
+      tx.commit();
+      
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+            
+      receiver1.clear();
+      
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+            
+      receiver2.clear();
+      
+      Message msg7 = MessageFactory.createCoreMessage(7);      
+      MessageReference ref7 = ms.reference(msg7);
+      
+      Message msg8 = MessageFactory.createCoreMessage(8);      
+      MessageReference ref8 = ms.reference(msg8);
+            
+      routed = exchange.route(ref7, "topic1", null);            
+      assertTrue(routed);
+      routed = exchange.route(ref8, "topic1", null);            
+      assertTrue(routed);
+      
+      msgs = receiver1.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      msgs = receiver2.getMessages();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      //Acknowledge transactionally
+      
+      tx = tr.createTransaction();
+      
+      receiver1.acknowledge(msgRec1, tx);
+      receiver1.acknowledge(msgRec2, tx);
+      
+      receiver2.acknowledge(msgRec1, tx);
+      receiver2.acknowledge(msgRec2, tx);
+      
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      tx.rollback();
+      
+      msgs = queue1.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
+      
+      msgs = queue2.browse();
+      assertNotNull(msgs);
+      assertEquals(2, msgs.size());
+      msgRec1 = (Message)msgs.get(0);
+      msgRec2 = (Message)msgs.get(1);
+      assertTrue(msgRec1 == msg7);
+      assertTrue(msgRec2 == msg8);
    }
    
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java	2006-08-31 00:21:12 UTC (rev 1243)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ExchangeTestBase.java	2006-08-31 18:11:07 UTC (rev 1244)
@@ -27,11 +27,7 @@
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.local.MessageQueue;
-import org.jboss.messaging.core.memory.MemoryManager;
-import org.jboss.messaging.core.memory.SimpleMemoryManager;
-import org.jboss.messaging.core.message.MessageFactory;
 import org.jboss.messaging.core.plugin.IdManager;
 import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
 import org.jboss.messaging.core.plugin.SimpleMessageStore;
@@ -39,8 +35,10 @@
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.plugin.contract.PersistenceManager;
 import org.jboss.messaging.core.plugin.exchange.Binding;
+import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleReceiver;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.jmx.ServiceContainer;
 
@@ -122,7 +120,7 @@
    
    // Public --------------------------------------------------------
    
-   public void testBind() throws Throwable
+   public final void testBind() throws Throwable
    {
       Exchange exchange1 = createExchange();
       
@@ -154,11 +152,7 @@
       Binding binding3 = exchange1.getBindingForName("durableQueue");
       assertNotNull(binding3);
       assertTrue(binding1 == binding3);
-      assertEquals("durableQueue", binding3.getQueueName());
-      assertEquals("condition1", binding3.getCondition());
-      assertEquals(filter1.getFilterString(), binding3.getSelector());
-      assertEquals(true, binding3.isNoLocal());
-      assertEquals(true, binding3.isDurable());
+      assertEquivalent(binding1, binding3);
       assertNotNull(binding3.getQueue());
       assertEquals(true, binding3.getQueue().isRecoverable());
       
@@ -166,11 +160,7 @@
       Binding binding4 = exchange1.getBindingForName("nonDurableQueue");
       assertNotNull(binding4);
       assertTrue(binding2 == binding4);
-      assertEquals("nonDurableQueue", binding4.getQueueName());
-      assertEquals("condition2", binding4.getCondition());
-      assertEquals(filter2.getFilterString(), binding4.getSelector());
-      assertEquals(true, binding4.isNoLocal());
-      assertEquals(false, binding4.isDurable());
+      assertEquivalent(binding2, binding4);
       assertNotNull(binding4.getQueue());
       assertEquals(false, binding4.getQueue().isRecoverable());
       
@@ -180,11 +170,7 @@
       //Only one binding should be there
       Binding binding5 = exchange2.getBindingForName("durableQueue");
       assertNotNull(binding5);
-      assertEquals("durableQueue", binding5.getQueueName());
-      assertEquals("condition1", binding5.getCondition());
-      assertEquals(filter1.getFilterString(), binding5.getSelector());
-      assertEquals(true, binding5.isNoLocal());
-      assertEquals(true, binding5.isDurable());
+      assertEquivalent(binding1, binding5);
       //Should be unloaded
       assertNull(binding5.getQueue());
       
@@ -197,11 +183,7 @@
       //Unbind the binding
       Binding binding7 = exchange2.unbindQueue("durableQueue");
       assertNotNull(binding7);
-      assertEquals("durableQueue", binding7.getQueueName());
-      assertEquals("condition1", binding7.getCondition());
-      assertEquals(filter1.getFilterString(), binding7.getSelector());
-      assertEquals(true, binding7.isNoLocal());
-      assertEquals(true, binding7.isDurable());
+      assertEquivalent(binding1, binding7);
       
       //Make sure no longer there
       Binding binding8 = exchange2.getBindingForName("durableQueue");
@@ -216,7 +198,7 @@
             
    }
    
-   public void testListBindings() throws Throwable
+   public final void testListBindings() throws Throwable
    {
       Exchange exchange = createExchange();
       
@@ -272,7 +254,7 @@
          
    }
    
-   public void testUnloadReload() throws Exception
+   public final void testUnloadReload() throws Exception
    {
       Exchange exchange = createExchange();
       
@@ -402,12 +384,52 @@
    
    protected void assertEquivalent(Binding binding1, Binding binding2)
    {
+      assertEquals(binding1.getNodeId(), binding2.getNodeId());
       assertEquals(binding1.getQueueName(), binding2.getQueueName());
-      assertEquals(binding1.getCondition(), binding2.getCondition());
       assertEquals(binding1.getSelector(), binding2.getSelector());
+      assertEquals(binding1.getChannelId(), binding2.getChannelId());
+      assertEquals(binding1.isDurable(), binding2.isDurable());
       assertEquals(binding1.isNoLocal(), binding2.isNoLocal());
-      assertEquals(binding1.isDurable(), binding2.isDurable());
    }
+   
+//   protected void checkHasMessages(Message[] msgArray, MessageQueue queue, SimpleReceiver receiver)
+//   {
+//      List msgs = receiver.getMessages();
+//      
+//      assertNotNull(msgs);
+//      assertEquals(msgArray.length, msgs.size());
+//      for (int i = 0; i < msgArray.length; i++)
+//      {
+//         assertEquals(msgArray[i].getMessageID(), ((Message)msgs.get(i)).getMessageID());
+//      }
+//      
+//      msgs = queue.browse();
+//      assertNotNull(msgs);
+//      assertEquals(msgArray.length, msgs.size());
+//      for (int i = 0; i < msgArray.length; i++)
+//      {
+//         assertEquals(msgArray[i].getMessageID(), ((Message)msgs.get(i)).getMessageID());
+//      }
+//   }
+//   
+//   protected void ackMessages(Message[] msgArray, SimpleReceiver receiver, Transaction tx) throws Throwable
+//   {
+//      for (int i = 0; i < msgArray.length; i++)
+//      {
+//         receiver.acknowledge(msgArray[i], tx);
+//      }
+//   }
+//   
+//   protected void checkHasntMessages(MessageQueue queue, SimpleReceiver receiver)
+//   {
+//      List msgs = receiver.getMessages();      
+//      assertNotNull(msgs);
+//      assertTrue(msgs.isEmpty());
+//      
+//      msgs = queue.browse();
+//      assertNotNull(msgs);
+//      assertTrue(msgs.isEmpty());
+//   }
 
    // Private -------------------------------------------------------
 




More information about the jboss-cvs-commits mailing list