[jboss-cvs] JBoss Messaging SVN: r1457 - in trunk: . src/main/org/jboss/jms/server/destination src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/paging/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Oct 9 16:39:34 EDT 2006


Author: timfox
Date: 2006-10-09 16:39:19 -0400 (Mon, 09 Oct 2006)
New Revision: 1457

Added:
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
Modified:
   trunk/.classpath
   trunk/src/main/org/jboss/jms/server/destination/TopicService.java
   trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-573 and http://jira.jboss.com/jira/browse/JBMESSAGING-579 and a few other things


Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/.classpath	2006-10-09 20:39:19 UTC (rev 1457)
@@ -1,6 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <classpath>
-	<classpathentry excluding="**/.svn/**/*" kind="src" path="perf/src"/>
 	<classpathentry kind="src" path="output/gen-parsers"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="src/main"/>
 	<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src"/>
@@ -21,8 +20,6 @@
 	<classpathentry kind="lib" path="tests/lib/jms-ra.jar"/>
 	<classpathentry kind="lib" path="tests/lib/mysql-connector-java-3.1.13-bin.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jgroups/lib/jgroups.jar"/>
-	<classpathentry kind="lib" path="perf/resources/jcommon-1.0.0-rc1.jar"/>
-	<classpathentry kind="lib" path="perf/resources/jfreechart-1.0.0-rc1.jar"/>
 	<classpathentry kind="lib" path="thirdparty/apache-log4j/lib/log4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/junit/lib/junit.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/profiler/jvmti/lib/jboss-profiler-jvmti.jar"/>

Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -15,8 +15,10 @@
 
 import org.jboss.jms.util.ExceptionUtil;
 import org.jboss.jms.util.XMLUtil;
+import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
 
 /**
  * A deployable JBoss Messaging topic.
@@ -72,10 +74,10 @@
             Binding binding = (Binding)iter.next();
             
             PagingFilteredQueue queue = (PagingFilteredQueue)binding.getQueue();
-            
+                        
             queue.setPagingParams(destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
             queue.load();
-            queue.activate();
+            queue.activate();            
          }
 
          dm.registerDestination(destination);

Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -52,5 +52,5 @@
     * @return
     * @throws Throwable
     */
-   Binding unbindClusteredQueue(String queueName) throws Throwable;   
+   Binding unbindClusteredQueue(String queueName) throws Throwable;         
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -25,10 +25,12 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.Types;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
@@ -244,7 +246,9 @@
       
       try
       {
-         Bindings cb = (Bindings)conditionMap.get(condition);
+         //We should only list the bindings for the local node
+         
+         Bindings cb = (Bindings)conditionMap.get(condition);                  
                   
          if (cb == null)
          {
@@ -252,7 +256,23 @@
          }
          else
          {
-            return cb.getAllBindings();
+            List list = new ArrayList();
+            
+            Collection bindings = cb.getAllBindings();
+            
+            Iterator iter = bindings.iterator();
+            
+            while (iter.hasNext())
+            {
+               Binding binding = (Binding)iter.next();
+               
+               if (binding.getNodeId() == this.nodeId)
+               {
+                  list.add(binding);
+               }
+            }
+            
+            return list;
          }
       }
       finally

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/CastMessagesCallback.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -27,6 +27,7 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.tx.TransactionException;
 import org.jboss.messaging.core.tx.TxCallback;
 
 /**
@@ -52,6 +53,35 @@
  * CastMessagesCallback.beforeCommit() - cast message(s) to holding areas
  * JDBCPersistenceManager.TransactionCallback.beforeCommit() - persist message(s) in database
  * CastMessagesCallback.afterCommit() - send "commit" message to holding areas
+ * 
+ * Failure handling:
+ * 
+ * If failure of the remote node occurs after casting the message to the holding area
+ * but before the message is persisted locally, then no recovery is necessary since the message wasn't persisted.
+ * In this case an exception should be propagated to the client.
+ * 
+ * If failure of the remote node occurs after casting the message to the holding area
+ * and after the message has been persisted locally then, when the failover node takes
+ * over it will load the message from the db.
+ * 
+ * If failure of the local node occurs before casting the message, no recovery is necessary and an exception
+ * should be thrown to the client.
+ * 
+ * If failure of the local node occurs after casting the message, but before persisting the message.
+ * Then failure of the local node casues the remote node to check it's holding area - it will then check
+ * if the message has been persisted - in this case it has not so it will discard the tx from the holding area.
+ * (TODO is it possible that the cast of the original hold arrives *after* the receiving node knows the sending
+ * node has failed - we must be able to guarantee this never happens)
+ * 
+ * If failure of the local node occurs after casting the message and after persisting the message in the database.
+ * Then failure of the local node casues the remote node to check it's holding area - it will then check
+ * if the message has been persisted - in this case it has so it will excute the transaction in memory which will add the
+ * message to the in memory queue.
+ * 
+ * 
+ * 
+ * 
+ * 
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
@@ -78,6 +108,10 @@
    private boolean multicast;
    
    private int toNodeId;
+   
+   //Used in failure testing
+   private boolean failBeforeCommit;
+   private boolean failAfterCommit;
       
    /*
     * We store the id of one of the channels that the ref was inserted into
@@ -129,13 +163,17 @@
       }
    }
    
-   CastMessagesCallback(int nodeId, long txId, PostOfficeInternal office)
+   CastMessagesCallback(int nodeId, long txId, PostOfficeInternal office, boolean failBeforeCommit, boolean failAfterCommit)
    {
       this.nodeId = nodeId;
       
       this.txId = txId;
       
       this.office = office;
+      
+      this.failBeforeCommit = failBeforeCommit;
+      
+      this.failAfterCommit = failAfterCommit;
    }
 
    public void afterCommit(boolean onePhase) throws Exception
@@ -150,6 +188,12 @@
       
       if (persistent != null)
       {
+         //Only used in testing
+         if (failAfterCommit)
+         {
+            throw new TransactionException("Forced failure for testing");
+         }
+         
          // Cast a commit message
          ClusterRequest req = new SendTransactionRequest(nodeId, txId);
          
@@ -177,6 +221,12 @@
          
          sendRequest(req);
       }
+      
+      //Only used in testing
+      if (failBeforeCommit)
+      {
+         throw new TransactionException("Forced failure for testing");
+      }
    }
 
    public void beforePrepare() throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/ClusterRouter.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -38,5 +38,5 @@
 {
    List getQueues();
    
-   LocalClusteredQueue getLocalQueue();
+   ClusteredQueue getLocalQueue();
 }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -81,6 +81,10 @@
 {
    private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
    
+   //Used for failure testing
+   private boolean failBeforeCommit;
+   private boolean failAfterCommit;
+     
    private boolean trace = log.isTraceEnabled();
                         
    private Channel syncChannel;
@@ -471,15 +475,14 @@
                   if (numberRemote == 1)
                   {
                      if (trace) { log.trace(this.nodeId + " unicasting message to " + lastNodeId); }
-                     //Unicast - only one node is interested in the message
-                                        
+                     
+                     //Unicast - only one node is interested in the message                                        
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
-                     
-                     //syncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId, false);
                   }
                   else
                   {
                      if (trace) { log.trace(this.nodeId + " multicasting message to group"); }
+                     
                      //Multicast - more than one node is interested
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
                   }                                 
@@ -490,7 +493,7 @@
                   
                   if (callback == null)
                   {
-                     callback = new CastMessagesCallback(nodeId, tx.getId(), DefaultClusteredPostOffice.this);
+                     callback = new CastMessagesCallback(nodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
                      
                      //This callback MUST be executed first
                      
@@ -978,7 +981,7 @@
                   
                   //Maybe the local queue now wants to pull message(s) from the remote queue given that the 
                   //stats for the remote queue have changed
-                  LocalClusteredQueue localQueue = router.getLocalQueue();
+                  LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
                   
                   if (localQueue != null)
                   {               
@@ -1031,10 +1034,23 @@
       return dels;
    }
    
+                     
+   // Public ------------------------------------------------------------------------------------------
    
-                   
-   // Public ------------------------------------------------------------------------------------------
-      
+   
+   //Used for testing only
+   public void setFail(boolean beforeCommit, boolean afterCommit)
+   {
+      this.failBeforeCommit = beforeCommit;
+      this.failAfterCommit = afterCommit;
+   }
+   
+   //Used for testing only
+   public Collection getHoldingTransactions()
+   {
+      return holdingArea.values();
+   }
+     
    // Protected ---------------------------------------------------------------------------------------
         
    protected void addToConditionMap(Binding binding)

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouter.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -46,6 +46,8 @@
  * In this case, with the assumption that producers and consumers are distributed evenly across the cluster
  * then sending the message to the local queue is the most efficient policy.
  * 
+ * The exception to this if there are no consumers on the local queue.
+ * 
  * In the case of a durable subscription, there may well be no local queue since the durable subscription lives
  * only on the number of nodes that it is looked up at.
  * 
@@ -66,7 +68,7 @@
    //MUST be an arraylist for fast index access
    private ArrayList queues;
    
-   private LocalClusteredQueue localQueue;
+   private ClusteredQueue localQueue;
    
    private int target;
    
@@ -80,7 +82,7 @@
       return queues.size();
    }
    
-   public LocalClusteredQueue getLocalQueue()
+   public ClusteredQueue getLocalQueue()
    {
       return localQueue;
    }
@@ -95,7 +97,7 @@
          {
             throw new IllegalStateException("Already has local queue");
          }
-         localQueue = (LocalClusteredQueue)queue;
+         localQueue = queue;
       }
       
       queues.add(queue); 
@@ -148,8 +150,8 @@
       if (trace) { log.trace(this + " routing ref " + reference); }
       
       //Favour the local queue
-         
-      if (localQueue != null)
+           
+      if (localQueue != null && localQueue.numberOfReceivers() > 0)
       {
          //The only time the local queue won't accept is if the selector doesn't
          //match - in which case it won't match at any other nodes too so no point
@@ -163,24 +165,27 @@
       }
       else
       {
-         //There is no local shared queue
-         
+         //There is no local shared queue or the local queue has no consumers
+          
          //We round robin among the rest
-         if (!queues.isEmpty())
+         if ((localQueue == null && !queues.isEmpty()) || (localQueue != null && queues.size() > 1))
          {
             ClusteredQueue queue = (ClusteredQueue)queues.get(target);
             
+            if (queue == localQueue)
+            {
+               //We don't want to choose the local queue
+               incTarget();
+            }
+            
+            queue = (ClusteredQueue)queues.get(target);
+            
             Delivery del = queue.handle(observer, reference, tx);
-            
+             
             if (trace) { log.trace(this + " routed to remote queue, it returned " + del); }
                         
-            target++;
-            
-            if (target == queues.size())
-            {
-               target = 0;
-            }
-            
+            incTarget();
+
             //Again, if the selector doesn't match then it won't on any others so no point trying them
             return del;
          }                  
@@ -191,6 +196,16 @@
       return null;
    }
    
+   private void incTarget()
+   {
+      target++;
+      
+      if (target == queues.size())
+      {
+         target = 0;
+      }
+   }
+   
    public List getQueues()
    {
       return queues;

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -47,7 +47,7 @@
  * $Id$
  *
  */
-class RemoteQueueStub implements ClusteredQueue
+public class RemoteQueueStub implements ClusteredQueue
 {
    private static final Logger log = Logger.getLogger(RemoteQueueStub.class);
       
@@ -65,7 +65,8 @@
    
    private QueueStats stats;
    
-   RemoteQueueStub(int nodeId, String name, long id, boolean recoverable, PersistenceManager pm, Filter filter)
+   public RemoteQueueStub(int nodeId, String name, long id, boolean recoverable,
+                          PersistenceManager pm, Filter filter)
    {
       this.nodeId = nodeId;
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/ChannelShare_NP_NTTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -291,10 +291,8 @@
       
       //Now consume them all
       
-      log.info("Consuming them all from 1");
       this.consume(queue1, 0, refs1, 150);
        
-      log.info("Consuming them all from 2");
       this.consume(queue2, 0, refs2, 150);
       
       //    Queue1

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -465,9 +465,7 @@
       Connection conn = ds.getConnection();
       
       List msgIds = new ArrayList();
-
-      //log.info("channel id:" + channelId);
-      
+ 
       String sql = "SELECT MESSAGEID, ORD, PAGE_ORD FROM JMS_MESSAGE_REFERENCE WHERE CHANNELID=? ORDER BY PAGE_ORD";
       PreparedStatement ps = conn.prepareStatement(sql);
       ps.setLong(1, channelId);

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -685,9 +685,6 @@
       //First load exactly 10
       PersistenceManager.InitialLoadInfo info = pm.getInitialReferenceInfos(channel.getChannelID(), 10);
       
-      log.info("min:" + info.getMinPageOrdering());
-      log.info("max:" + info.getMaxPageOrdering());
-      
       assertNull(info.getMinPageOrdering());
       assertNull(info.getMaxPageOrdering());
       

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -621,9 +621,7 @@
          }
       }
    }
-   
-   
-   
+ 
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -663,9 +661,7 @@
          
          Message msg1 = CoreMessageFactory.createCoreMessage(1);      
          MessageReference ref1 = ms.reference(msg1);  
-         log.info("Sending message 1");
          boolean routed = office1.route(ref1, "topic1", null);   
-         log.info("Sent message 1");
          assertTrue(routed);
          
          

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeWithDefaultRouterTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -0,0 +1,556 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * 
+ * A DefaultClusteredPostOfficeWithDefaultRouterTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultClusteredPostOfficeWithDefaultRouterTest extends ClusteringTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+
+   public DefaultClusteredPostOfficeWithDefaultRouterTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {      
+      super.tearDown();
+   }
+   
+   public void testNotLocalPersistent() throws Throwable
+   {
+      notLocal(true);
+   }
+   
+   public void testNotLocalNonPersistent() throws Throwable
+   {
+      notLocal(false);
+   }
+   
+   public void testLocalPersistent() throws Throwable
+   {
+      local(true);
+   }
+   
+   public void testLocalNonPersistent() throws Throwable
+   {
+      local(false);
+   }
+   
+   
+   public void testLocalNonConsumersPersistent() throws Throwable
+   {
+      localNoConsumers(true);
+   }
+   
+   public void testLocalNoConsumersNonPersistent() throws Throwable
+   {
+      localNoConsumers(false);
+   }
+   
+   
+   protected void notLocal(boolean persistent) throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      ClusteredPostOffice office3 = null;
+      
+      ClusteredPostOffice office4 = null;
+      
+      ClusteredPostOffice office5 = null;
+      
+      ClusteredPostOffice office6 = null;
+          
+      try
+      {   
+         office1 = createClusteredPostOffice(1, "testgroup");
+         
+         office2 = createClusteredPostOffice(2, "testgroup");
+         
+         office3 = createClusteredPostOffice(3, "testgroup");
+         
+         office4 = createClusteredPostOffice(4, "testgroup");
+         
+         office5 = createClusteredPostOffice(5, "testgroup");
+         
+         office6 = createClusteredPostOffice(6, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.add(receiver1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.add(receiver2);
+         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.add(receiver3);
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue4.add(receiver4);
+         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue5.add(receiver5);
+               
+         List msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkEmpty(receiver2);
+         checkContainsAndAcknowledge(msgs, receiver3, queue1);                           
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkContainsAndAcknowledge(msgs, receiver4, queue1);                                    
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkContainsAndAcknowledge(msgs, receiver5, queue1); 
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+                     
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {            
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {            
+            office5.stop();
+         }
+         
+         if (office6 != null)
+         {
+            office6.stop();
+         }
+      }
+   }
+   
+   //if the local queue has no consumers then we treat as if there was no local queue
+   protected void localNoConsumers(boolean persistent) throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      ClusteredPostOffice office3 = null;
+      
+      ClusteredPostOffice office4 = null;
+      
+      ClusteredPostOffice office5 = null;
+      
+      ClusteredPostOffice office6 = null;
+          
+      try
+      {   
+         office1 = createClusteredPostOffice(1, "testgroup");
+         
+         office2 = createClusteredPostOffice(2, "testgroup");
+         
+         office3 = createClusteredPostOffice(3, "testgroup");
+         
+         office4 = createClusteredPostOffice(4, "testgroup");
+         
+         office5 = createClusteredPostOffice(5, "testgroup");
+         
+         office6 = createClusteredPostOffice(6, "testgroup");
+         
+         LocalClusteredQueue queueLocal = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding bindingLocal = office1.bindClusteredQueue("topic", queueLocal);
+         //No consumer on the local one
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.add(receiver1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.add(receiver2);
+         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.add(receiver3);
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue4.add(receiver4);
+         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue5.add(receiver5);
+               
+         List msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkEmpty(receiver2);
+         checkContainsAndAcknowledge(msgs, receiver3, queue1);                           
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkContainsAndAcknowledge(msgs, receiver4, queue1);                                    
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkContainsAndAcknowledge(msgs, receiver5, queue1); 
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office1, 1, null);         
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+                     
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {            
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {            
+            office5.stop();
+         }
+         
+         if (office6 != null)
+         {
+            office6.stop();
+         }
+      }
+   }
+   
+   
+   protected void local(boolean persistent) throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      ClusteredPostOffice office3 = null;
+      
+      ClusteredPostOffice office4 = null;
+      
+      ClusteredPostOffice office5 = null;
+      
+      ClusteredPostOffice office6 = null;
+          
+      try
+      {   
+         office1 = createClusteredPostOffice(1, "testgroup");
+         
+         office2 = createClusteredPostOffice(2, "testgroup");
+         
+         office3 = createClusteredPostOffice(3, "testgroup");
+         
+         office4 = createClusteredPostOffice(4, "testgroup");
+         
+         office5 = createClusteredPostOffice(5, "testgroup");
+         
+         office6 = createClusteredPostOffice(6, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 = office2.bindClusteredQueue("topic", queue1);
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.add(receiver1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 = office3.bindClusteredQueue("topic", queue2);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.add(receiver2);
+         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding3 = office4.bindClusteredQueue("topic", queue3);
+         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue3.add(receiver3);
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding4 = office5.bindClusteredQueue("topic", queue4);
+         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue4.add(receiver4);
+         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding5 = office6.bindClusteredQueue("topic", queue5);
+         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue5.add(receiver5);
+               
+         List msgs = sendMessages("topic", persistent, office2, 3, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office2, 3, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office2, 3, null);         
+         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
+         checkEmpty(receiver2);
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         
+         msgs = sendMessages("topic", persistent, office3, 3, null); 
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office3, 3, null); 
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+         msgs = sendMessages("topic", persistent, office3, 3, null); 
+         checkEmpty(receiver1);
+         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
+         checkEmpty(receiver3);
+         checkEmpty(receiver4);
+         checkEmpty(receiver5);
+         
+                     
+      }
+      finally
+      {
+         if (office1 != null)
+         {            
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {            
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {            
+            office5.stop();
+         }
+         
+         if (office6 != null)
+         {
+            office6.stop();
+         }
+      }
+   }
+   
+   
+   
+   protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+   {
+      MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+      
+      FilterFactory ff = new SimpleFilterFactory();
+      
+      ClusterRouterFactory rf = new DefaultRouterFactory();
+      
+      DefaultClusteredPostOffice postOffice = 
+         new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+                                 null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+                                 groupName,
+                                 JGroupsUtil.getControlStackProperties(),
+                                 JGroupsUtil.getDataStackProperties(),
+                                 5000, 5000, redistPolicy, rf, 1, 1000);
+      
+      postOffice.start();      
+      
+      return postOffice;
+   }
+
+   // Private -------------------------------------------------------
+   
+   
+   // Inner classes -------------------------------------------------
+   
+
+}
+
+
+

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -21,23 +21,33 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
+import java.util.Iterator;
 import java.util.List;
 
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouter;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouter;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.QueueStats;
+import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.core.SimpleFilterFactory;
 import org.jboss.test.messaging.core.SimpleReceiver;
 import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+import org.jboss.test.messaging.util.CoreMessageFactory;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * A DefaultRouterTest
@@ -75,299 +85,292 @@
       super.tearDown();
    }
    
-   public void testNotLocalPersistent() throws Throwable
+   // The router only has a local queue with a consumer
+   public void testRouterOnlyLocalWithConsumer() throws Exception
    {
-      notLocal(true);
+      DefaultRouter dr = new DefaultRouter();
+                    
+      ClusteredQueue queue = new SimpleQueue(true);
+      
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      queue.add(receiver1);
+               
+      dr.add(queue);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver1);
    }
    
-   public void testNotLocalNonPersistent() throws Throwable
+   //The router only has a local queue with no consumer
+   public void testRouterOnlyLocalNoConsumer() throws Exception
    {
-      notLocal(false);
+      DefaultRouter dr = new DefaultRouter();
+        
+      ClusteredQueue queue = new SimpleQueue(true);
+               
+      dr.add(queue);
+      
+      Message msg = CoreMessageFactory.createCoreMessage(0, false, null);      
+      
+      MessageReference ref = ms.reference(msg);         
+      
+      Delivery del = dr.handle(null, ref, null);
+      
+      assertNull(del);             
+
    }
    
-   public void testLocalPersistent() throws Throwable
+   //The router has only one non local queues
+   public void testRouterOnlyOneNonLocal() throws Exception
    {
-      local(true);
+      DefaultRouter dr = new DefaultRouter();
+                    
+      ClusteredQueue queue = new SimpleQueue(false);
+      
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      queue.add(receiver1);
+      
+      dr.add(queue);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver1);              
    }
    
-   public void testLocalNonPersistent() throws Throwable
+   //The router has multiple non local queues and no local queue
+   public void testRouterMultipleNonLocal() throws Exception
    {
-      local(false);
+      DefaultRouter dr = new DefaultRouter();
+                   
+      ClusteredQueue remote1 = new SimpleQueue(false);
+     
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote1.add(receiver1);
+      
+      dr.add(remote1);
+      
+      
+      ClusteredQueue remote2 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote2.add(receiver2);
+      
+      dr.add(remote2);
+      
+      
+      ClusteredQueue remote3 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote3.add(receiver3);
+      
+      dr.add(remote3);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver2);
+      
+      sendAndCheck(dr, receiver3);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver2);
+      
+      sendAndCheck(dr, receiver3);
    }
    
-   protected void notLocal(boolean persistent) throws Throwable
+   
+   // The router has one local with consumer and one non local queue
+   public void testRouterOneLocalWithConsumerOneNonLocal() throws Exception
    {
-      ClusteredPostOffice office1 = null;
+      DefaultRouter dr = new DefaultRouter();
+                             
+      ClusteredQueue remote1 = new SimpleQueue(false);
+     
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
       
-      ClusteredPostOffice office2 = null;
+      remote1.add(receiver1);
       
-      ClusteredPostOffice office3 = null;
+      dr.add(remote1);
       
-      ClusteredPostOffice office4 = null;
+      ClusteredQueue queue = new SimpleQueue(true);
       
-      ClusteredPostOffice office5 = null;
+      SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
       
-      ClusteredPostOffice office6 = null;
-          
-      try
-      {   
-         office1 = createClusteredPostOffice(1, "testgroup");
-         
-         office2 = createClusteredPostOffice(2, "testgroup");
-         
-         office3 = createClusteredPostOffice(3, "testgroup");
-         
-         office4 = createClusteredPostOffice(4, "testgroup");
-         
-         office5 = createClusteredPostOffice(5, "testgroup");
-         
-         office6 = createClusteredPostOffice(6, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding1 = office2.bindClusteredQueue("topic", queue1);
-         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue1.add(receiver1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding2 = office3.bindClusteredQueue("topic", queue2);
-         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue2.add(receiver2);
-         
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding3 = office4.bindClusteredQueue("topic", queue3);
-         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue3.add(receiver3);
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding4 = office5.bindClusteredQueue("topic", queue4);
-         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue4.add(receiver4);
-         
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding5 = office6.bindClusteredQueue("topic", queue5);
-         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue5.add(receiver5);
-               
-         List msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkEmpty(receiver2);
-         checkContainsAndAcknowledge(msgs, receiver3, queue1);                           
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkContainsAndAcknowledge(msgs, receiver4, queue1);                                    
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkContainsAndAcknowledge(msgs, receiver5, queue1); 
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office1, 1, null);         
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-                     
-      }
-      finally
-      {
-         if (office1 != null)
-         {            
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {
-            office2.stop();
-         }
-         
-         if (office3 != null)
-         {            
-            office3.stop();
-         }
-         
-         if (office4 != null)
-         {
-            office4.stop();
-         }
-         
-         if (office5 != null)
-         {            
-            office5.stop();
-         }
-         
-         if (office6 != null)
-         {
-            office6.stop();
-         }
-      }
+      queue.add(receiver2);
+      
+      dr.add(queue);
+
+      sendAndCheck(dr, receiver2);
+      
+      sendAndCheck(dr, receiver2);
+      
+      sendAndCheck(dr, receiver2);                  
    }
    
+   // The router has multiple non local queues and one local queue with consumer
+   public void testRouterMultipleNonLocalOneLocalNoConsumer() throws Exception
+   {
+      DefaultRouter dr = new DefaultRouter();            
+                  
+      ClusteredQueue remote1 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote1.add(receiver1);
+      
+      dr.add(remote1);
+      
+      
+      ClusteredQueue remote2 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote2.add(receiver2);
+      
+      dr.add(remote2);
+      
+      
+      ClusteredQueue remote3 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote3.add(receiver3);
+      
+      dr.add(remote3);
+      
+      
+      ClusteredQueue queue = new SimpleQueue(true);
+            
+      SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      queue.add(receiver4);
+      
+      dr.add(queue);
+      
+      
+      sendAndCheck(dr, receiver4);
+      
+      sendAndCheck(dr, receiver4);
+      
+      sendAndCheck(dr, receiver4);
+   }
    
-   protected void local(boolean persistent) throws Throwable
+   // The router has multiple non local queues and one local queue without consumer
+   public void testRouterMultipleNonLocalOneLocalWithConsumer() throws Exception
    {
-      ClusteredPostOffice office1 = null;
+      DefaultRouter dr = new DefaultRouter();
+                  
+      ClusteredQueue remote1 = new SimpleQueue(false);
       
-      ClusteredPostOffice office2 = null;
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
       
-      ClusteredPostOffice office3 = null;
+      remote1.add(receiver1);
       
-      ClusteredPostOffice office4 = null;
+      dr.add(remote1);
       
-      ClusteredPostOffice office5 = null;
       
-      ClusteredPostOffice office6 = null;
-          
-      try
-      {   
-         office1 = createClusteredPostOffice(1, "testgroup");
-         
-         office2 = createClusteredPostOffice(2, "testgroup");
-         
-         office3 = createClusteredPostOffice(3, "testgroup");
-         
-         office4 = createClusteredPostOffice(4, "testgroup");
-         
-         office5 = createClusteredPostOffice(5, "testgroup");
-         
-         office6 = createClusteredPostOffice(6, "testgroup");
-         
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding1 = office2.bindClusteredQueue("topic", queue1);
-         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue1.add(receiver1);
-         
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding2 = office3.bindClusteredQueue("topic", queue2);
-         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue2.add(receiver2);
-         
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding3 = office4.bindClusteredQueue("topic", queue3);
-         SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue3.add(receiver3);
-         
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding4 = office5.bindClusteredQueue("topic", queue4);
-         SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue4.add(receiver4);
-         
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
-         Binding binding5 = office6.bindClusteredQueue("topic", queue5);
-         SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
-         queue5.add(receiver5);
-               
-         List msgs = sendMessages("topic", persistent, office2, 3, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office2, 3, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office2, 3, null);         
-         checkContainsAndAcknowledge(msgs, receiver1, queue1);         
-         checkEmpty(receiver2);
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         
-         msgs = sendMessages("topic", persistent, office3, 3, null); 
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office3, 3, null); 
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-         msgs = sendMessages("topic", persistent, office3, 3, null); 
-         checkEmpty(receiver1);
-         checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
-         checkEmpty(receiver3);
-         checkEmpty(receiver4);
-         checkEmpty(receiver5);
-         
-                     
-      }
-      finally
-      {
-         if (office1 != null)
-         {            
-            office1.stop();
-         }
-         
-         if (office2 != null)
-         {
-            office2.stop();
-         }
-         
-         if (office3 != null)
-         {            
-            office3.stop();
-         }
-         
-         if (office4 != null)
-         {
-            office4.stop();
-         }
-         
-         if (office5 != null)
-         {            
-            office5.stop();
-         }
-         
-         if (office6 != null)
-         {
-            office6.stop();
-         }
-      }
+      ClusteredQueue remote2 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote2.add(receiver2);
+      
+      dr.add(remote2);
+      
+      
+      ClusteredQueue remote3 = new SimpleQueue(false);
+      
+      SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote3.add(receiver3);
+      
+      dr.add(remote3);
+      
+      
+      ClusteredQueue queue = new SimpleQueue(true);
+      
+      
+      dr.add(queue);
+      
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver2);
+      
+      sendAndCheck(dr, receiver3);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver2);
+      
+      sendAndCheck(dr, receiver3);
    }
    
+   // The router has one local without consumer and one non local queue
+   public void testRouterMultipleOneLocalWithoutConsumerOneNonLocal() throws Exception
+   {
+      DefaultRouter dr = new DefaultRouter();
+                             
+      ClusteredQueue remote1 = new SimpleQueue(false);
+     
+      SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+      
+      remote1.add(receiver1);
+      
+      dr.add(remote1);
+      
+      ClusteredQueue queue = new SimpleQueue(true);
+             
+      dr.add(queue);
+
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver1);
+      
+      sendAndCheck(dr, receiver1);                       
+   }
    
+   private long nextId;
    
+   private void sendAndCheck(ClusterRouter router, SimpleReceiver receiver) throws Exception
+   {
+      Message msg = CoreMessageFactory.createCoreMessage(nextId++, false, null);      
+      
+      MessageReference ref = ms.reference(msg);         
+      
+      Delivery del = router.handle(null, ref, null);
+      
+      assertNotNull(del);
+      
+      assertTrue(del.isSelectorAccepted());
+            
+      Thread.sleep(250);
+      
+      List msgs = receiver.getMessages();
+      
+      assertNotNull(msgs);
+      
+      assertEquals(1, msgs.size());
+      
+      Message msgRec = (Message)msgs.get(0);
+      
+      assertTrue(msg == msgRec);  
+      
+      receiver.clear();
+   }
+   
+   
+   
    protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
    {
       MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
@@ -394,7 +397,218 @@
    
    // Inner classes -------------------------------------------------
    
+   class SimpleQueue implements ClusteredQueue
+   {
+      private boolean local;
+      
+      private Receiver receiver;
+        
+      SimpleQueue(boolean local)
+      {
+         this.local = local;
+      }
 
+      public int getNodeId()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      public QueueStats getStats()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public boolean isLocal()
+      {
+         return local;
+      }
+
+      public Filter getFilter()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public String getName()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public boolean isClustered()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public boolean acceptReliableMessages()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public void activate()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public List browse()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public List browse(Filter filter)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public void clear()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public void close()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public void deactivate()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public void deliver(boolean synchronous)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public List delivering(Filter filter)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public long getChannelID()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      public boolean isActive()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public boolean isRecoverable()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public void load() throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public int messageCount()
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
+      public void removeAllReferences() throws Throwable
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public List undelivered(Filter filter)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public void unload() throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+      {
+         if (receiver != null)
+         {
+            Delivery del = receiver.handle(observer, reference, tx);
+            
+            return del;
+         }
+         
+         return new SimpleDelivery(observer, reference);
+      }
+
+      public void acknowledge(Delivery d, Transaction tx) throws Throwable
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public void cancel(Delivery d) throws Throwable
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public boolean add(Receiver receiver)
+      {
+         this.receiver = receiver;
+         
+         return true;
+      }
+
+      public boolean contains(Receiver receiver)
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public Iterator iterator()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public int numberOfReceivers()
+      {
+         if (receiver != null)
+         {
+            return 1;
+         }
+         else
+         {
+            return 0;
+         }
+      }
+
+      public boolean remove(Receiver receiver)
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+      
+   }
+   
+
 }
 
 

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RecoveryTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -0,0 +1,313 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionException;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * 
+ * A RecoveryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class RecoveryTest extends ClusteringTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   // Constructors --------------------------------------------------
+
+   public RecoveryTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {      
+      super.tearDown();
+   }
+   
+   public void testCrashBeforePersist() throws Exception
+   {
+      DefaultClusteredPostOffice office1 = null;
+      
+      DefaultClusteredPostOffice office2 = null;
+      
+      try
+      {      
+         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+         
+         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 =
+            office1.bindClusteredQueue("topic1", queue1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 =
+            office2.bindClusteredQueue("topic1", queue2);
+         
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.add(receiver1);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.add(receiver2);
+         
+         //This will make it fail after casting but before persisting the message in the db
+         office1.setFail(true, false);
+         
+         Transaction tx = tr.createTransaction();
+         
+         final int NUM_MESSAGES = 10;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            Message msg = CoreMessageFactory.createCoreMessage(i);   
+            msg.setReliable(true);
+            
+            MessageReference ref = ms.reference(msg);  
+            
+            office1.route(ref, "topic1", tx);
+         }
+         
+         Thread.sleep(1000);
+         
+         List msgs = receiver1.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver2.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         try
+         {
+            //An exception should be thrown            
+            tx.commit();
+            fail();                        
+         }
+         catch (TransactionException e)
+         {
+            //Ok
+         }
+         
+         Thread.sleep(1000);
+         
+         msgs = receiver1.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver2.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         //We now kill the office - this should make the other office do it's transaction check
+         office1.stop();
+         
+         Thread.sleep(1000);
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         
+         //The tx should be removed from the holding area and nothing should be received
+         msgs = receiver1.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver2.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+      }
+      finally
+      {
+         if (office1 != null)
+         {           
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {           
+            office2.stop();
+         }
+      }
+   }
+   
+   public void testCrashAfterPersist() throws Exception
+   {
+      DefaultClusteredPostOffice office1 = null;
+      
+      DefaultClusteredPostOffice office2 = null;
+      
+      try
+      {      
+         office1 = (DefaultClusteredPostOffice)createClusteredPostOffice(1, "testgroup");
+         
+         office2 = (DefaultClusteredPostOffice)createClusteredPostOffice(2, "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding1 =
+            office1.bindClusteredQueue("topic1", queue1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         Binding binding2 =
+            office2.bindClusteredQueue("topic1", queue2);
+         
+         SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue1.add(receiver1);
+         SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+         queue2.add(receiver2);
+         
+         //This will make it fail after casting and persisting the message in the db
+         office1.setFail(false, true);
+         
+         Transaction tx = tr.createTransaction();
+         
+         final int NUM_MESSAGES = 10;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            Message msg = CoreMessageFactory.createCoreMessage(i);   
+            msg.setReliable(true);
+            
+            MessageReference ref = ms.reference(msg);  
+            
+            office1.route(ref, "topic1", tx);
+         }
+         
+         Thread.sleep(1000);
+         
+         List msgs = receiver1.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver2.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         try
+         {
+            //An exception should be thrown            
+            tx.commit();
+            fail();                       
+         }
+         catch (TransactionException e)
+         {
+            //Ok
+         }
+         
+         Thread.sleep(1000);
+         
+         msgs = receiver1.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         msgs = receiver2.getMessages();
+         assertTrue(msgs.isEmpty());
+         
+         //We now kill the office - this should make the other office do it's transaction check
+         office1.stop();
+         
+         Thread.sleep(1000);
+         
+         assertTrue(office1.getHoldingTransactions().isEmpty());
+         
+         assertTrue(office2.getHoldingTransactions().isEmpty());
+         
+         //The tx should be removed from the holding area and messages be received
+         msgs = receiver1.getMessages();
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
+         msgs = receiver2.getMessages();
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
+      }
+      finally
+      {
+         if (office1 != null)
+         {           
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {           
+            office2.stop();
+         }
+      }
+   }
+   
+   
+   protected ClusteredPostOffice createClusteredPostOffice(int nodeId, String groupName) throws Exception
+   {
+      MessagePullPolicy redistPolicy = new NullMessagePullPolicy();
+      
+      FilterFactory ff = new SimpleFilterFactory();
+      
+      ClusterRouterFactory rf = new DefaultRouterFactory();
+      
+      DefaultClusteredPostOffice postOffice = 
+         new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+                                 null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+                                 groupName,
+                                 JGroupsUtil.getControlStackProperties(),
+                                 JGroupsUtil.getDataStackProperties(),
+                                 5000, 5000, redistPolicy, rf, 1, 1000);
+      
+      postOffice.start();      
+      
+      return postOffice;
+   }
+
+   // Private -------------------------------------------------------
+      
+   // Inner classes -------------------------------------------------   
+
+}
+
+
+
+

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java	2006-10-09 19:00:59 UTC (rev 1456)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ManualClusteringTest.java	2006-10-09 20:39:19 UTC (rev 1457)
@@ -43,6 +43,7 @@
  * 
  * A ManualClusteringTest
  * 
+ * Nodes must be started up in order node1, node2, node3
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @version <tt>$Revision: 1.1 $</tt>
@@ -101,33 +102,33 @@
       
       ic2 = new InitialContext(props2);
       
-//      Properties props3 = new Properties();
-//      
-//      props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
-//      props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
-//      props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
-//      
-//      ic3 = new InitialContext(props3);
+      Properties props3 = new Properties();
       
+      props3.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
+      props3.put(Context.PROVIDER_URL, "jnp://localhost:1399");
+      props3.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
+      
+      ic3 = new InitialContext(props3);
+      
       queue1 = (Queue)ic1.lookup("queue/testDistributedQueue");
       
       queue2 = (Queue)ic2.lookup("queue/testDistributedQueue");
       
-      //queue3 = (Queue)ic3.lookup("queue/ClusteredQueue1");
+      queue3 = (Queue)ic3.lookup("queue/testDistributedQueue");
             
       topic1 = (Topic)ic1.lookup("topic/testDistributedTopic");
       
       topic2 = (Topic)ic2.lookup("topic/testDistributedTopic");
       
-      //topic3 = (Topic)ic3.lookup("topic/ClusteredTopic1");
+      topic3 = (Topic)ic3.lookup("topic/testDistributedTopic");
       
       cf1 = (ConnectionFactory)ic1.lookup("/ConnectionFactory");
       
       cf2 = (ConnectionFactory)ic2.lookup("/ConnectionFactory");
       
-      //cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
-
-      log.info("setup done");
+      cf3 = (ConnectionFactory)ic3.lookup("/ConnectionFactory");
+      
+      drainStuff();
    }
 
    protected void tearDown() throws Exception
@@ -139,37 +140,198 @@
       ic2.close();
    }
    
+   protected void drainStuff() throws Exception
+   {
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+            
+      try
+      {
+         conn1 = cf1.createConnection();
+         
+         conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
+           
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         MessageConsumer cons3 = sess3.createConsumer(queue2);
+         
+         conn1.start();
+         
+         conn2.start();
+         
+         conn3.start();
+         
+         Message msg = null;
+         
+         do
+         {
+            msg = cons1.receive(1000);
+         }
+         while (msg != null);
+         
+         do
+         {
+            msg = cons2.receive(1000);
+         }
+         while (msg != null);
+         
+         do
+         {
+            msg = cons3.receive(1000);
+         }
+         while (msg != null);
+      }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
+   }
+   
+   public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredQueueLocalConsumer(false);
+   }
+   
+   public void testClusteredQueueLocalConsumerPersistent() throws Exception
+   {
+      clusteredQueueLocalConsumer(true);
+   }
+   
+   public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredQueueNoLocalConsumer(false);
+   }
+   
+   public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
+   {
+      clusteredQueueNoLocalConsumer(true);
+   }
+   
+   
+   public void testClusteredTopicNonDurableNonPersistent() throws Exception
+   {
+      clusteredTopicNonDurable(false);
+   }
+   
+   public void testClusteredTopicNonDurablePersistent() throws Exception
+   {
+      clusteredTopicNonDurable(true);
+   }
+   
+   
+   public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
+   {
+      clusteredTopicNonDurableWithSelectors(false);
+   }
+   
+   public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
+   {
+      clusteredTopicNonDurableWithSelectors(true);
+   }
+   
+   public void testClusteredTopicDurableNonPersistent() throws Exception
+   {
+      clusteredTopicDurable(false);
+   }
+   
+   public void testClusteredTopicDurablePersistent() throws Exception
+   {
+      clusteredTopicDurable(true);
+   }
+   
+   public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableLocalConsumer(false);
+   }
+   
+   public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableLocalConsumer(true);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalConsumer(false);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalConsumerPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalConsumer(true);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalSub(false);
+   }
+   
+   public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalSub(true);
+   }
+   
+   
+   
+   
    /*
-    * Each node had consumers, send message at node, make sure local consumer gets message
+    * Create a consumer on each queue on each node.
+    * Send messages in turn from all nodes.
+    * Ensure that the local consumer gets the message
     */
-   public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+   protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
    {
-      log.info("starting test");
-
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
          conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
          MessageConsumer cons1 = sess1.createConsumer(queue1);
          
          MessageConsumer cons2 = sess2.createConsumer(queue2);
          
+         MessageConsumer cons3 = sess3.createConsumer(queue3);
+         
          conn1.start();
          
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
          MessageProducer prod1 = sess1.createProducer(queue1);
          
-         prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -180,66 +342,138 @@
             prod1.send(tm);
          }
          
-         log.info("sent messages");
-         
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
-            
             TextMessage tm = (TextMessage)cons1.receive(1000);
             
             assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
             assertEquals("message" + i, tm.getText());
          }
          
          Message m = cons2.receive(2000);
          
          assertNull(m);
-      }
-      finally
-      {      
-         try
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node2
+         
+         MessageProducer prod2 = sess2.createProducer(queue2);
+         
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            if (conn1 != null) conn1.close();
+            TextMessage tm = sess2.createTextMessage("message" + i);
             
-            if (conn2 != null) conn2.close();
+            prod2.send(tm);
          }
-         catch (Exception ignore)
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
          {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
             
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
          }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node3
+         
+         MessageProducer prod3 = sess3.createProducer(queue3);
+         
+         prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
+            
+            prod3.send(tm);
+         }
+            
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons2.receive(2000);
+         
+         assertNull(m);         
       }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
    }
    
-   public void testClusteredQueueLocalConsumerPersistent() throws Exception
+   
+   
+   
+   /*
+    * Create a consumer on two nodes out of three
+    * Send messages from the third node
+    * Ensure that the messages are received from the other two nodes in 
+    * round robin order.
+    * (Note that this test depends on us using the default router which has
+    * this round robin behaviour)
+    */
+   protected void clusteredQueueNoLocalConsumer(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
          conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          MessageConsumer cons2 = sess2.createConsumer(queue2);
          
-         conn1.start();
+         MessageConsumer cons3 = sess3.createConsumer(queue3);
          
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
          MessageProducer prod1 = sess1.createProducer(queue1);
          
-         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -250,202 +484,83 @@
             prod1.send(tm);
          }
          
-         log.info("sent messages");
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
-            log.info("i is " + i);
+            TextMessage tm = (TextMessage)cons2.receive(1000);
             
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-            
             assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
-            assertEquals("message" + i, tm.getText());
+            assertEquals("message" + i * 2, tm.getText());
          }
          
-         Message m = cons2.receive(2000);
-         
-         assertNull(m);
-      }
-      finally
-      {      
-         try
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
-            if (conn1 != null) conn1.close();
+            TextMessage tm = (TextMessage)cons3.receive(1000);
             
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
-         {
+            assertNotNull(tm);
             
+            assertEquals("message" + (i * 2 + 1), tm.getText());
          }
+      
       }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
    }
    
-//   /*
-//    * No consumer on local node, send message at node, make sure remote consumer gets messages
-//    */
-//   public void testClusteredQueueNoLocalConsumerNonPersistent() throws Exception
-//   {
-//      Connection conn1 = null;
-//      
-//      Connection conn2 = null;
-//      try
-//      {
-//         conn1 = cf1.createConnection();
-//         
-//         conn2 = cf2.createConnection();
-//           
-//         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         MessageConsumer cons2 = sess2.createConsumer(queue2);
-//         
-//         conn1.start();
-//         
-//         conn2.start();
-//         
-//         MessageProducer prod1 = sess1.createProducer(queue1);
-//         
-//         prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-//         
-//         final int NUM_MESSAGES = 100;
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            TextMessage tm = sess1.createTextMessage("message" + i);
-//            
-//            prod1.send(tm);
-//         }
-//         
-//         log.info("sent messages");
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            log.info("i is " + i);
-//            
-//            TextMessage tm = (TextMessage)cons2.receive(10000);
-//            
-//            assertNotNull(tm);
-//            
-//            log.info("Got message:" + tm);
-//            
-//            assertEquals("message" + i, tm.getText());
-//         }
-//
-//      }
-//      finally
-//      {      
-//         try
-//         {
-//            if (conn1 != null) conn1.close();
-//            
-//            if (conn2 != null) conn2.close();
-//         }
-//         catch (Exception ignore)
-//         {
-//            
-//         }
-//      }
-//   }
-//   
-//   
-//   
-//   public void testClusteredQueueNoLocalConsumerPersistent() throws Exception
-//   {
-//      Connection conn1 = null;
-//      
-//      Connection conn2 = null;
-//      try
-//      {
-//         conn1 = cf1.createConnection();
-//         
-//         conn2 = cf2.createConnection();
-//           
-//         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         MessageConsumer cons2 = sess2.createConsumer(queue2);
-//         
-//         conn1.start();
-//         
-//         conn2.start();
-//         
-//         MessageProducer prod1 = sess1.createProducer(queue1);
-//         
-//         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
-//         
-//         final int NUM_MESSAGES = 100;
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            TextMessage tm = sess1.createTextMessage("message" + i);
-//            
-//            prod1.send(tm);
-//         }
-//         
-//         log.info("sent messages");
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            log.info("i is " + i);
-//            
-//            TextMessage tm = (TextMessage)cons2.receive(1000);
-//            
-//            assertNotNull(tm);
-//            
-//            log.info("Got message:" + tm);
-//            
-//            assertEquals("message" + i, tm.getText());
-//         }
-//
-//      }
-//      finally
-//      {      
-//         try
-//         {
-//            if (conn1 != null) conn1.close();
-//            
-//            if (conn2 != null) conn2.close();
-//         }
-//         catch (Exception ignore)
-//         {
-//            
-//         }
-//      }
-//   }
-//   
-
-   public void testClusteredTopicNonDurableNonPersistent() throws Exception
+   
+   
+   /*
+    * Create non durable subscriptions on all nodes of the cluster.
+    * Ensure all messages are receive as appropriate
+    */
+   public void clusteredTopicNonDurable(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
          conn2 = cf2.createConnection();
+         
+         conn3 = cf3.createConnection();
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
          MessageConsumer cons1 = sess1.createConsumer(topic1);
          
          MessageConsumer cons2 = sess2.createConsumer(topic2);
          
+         MessageConsumer cons3 = sess3.createConsumer(topic3);
+         
+         MessageConsumer cons4 = sess1.createConsumer(topic1);
+         
+         MessageConsumer cons5 = sess2.createConsumer(topic2);
+            
          conn1.start();
          
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
          MessageProducer prod1 = sess1.createProducer(topic1);
          
-         prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -455,79 +570,112 @@
             
             prod1.send(tm);
          }
-         
-         log.info("sent messages");
-         
+            
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
-            
             TextMessage tm = (TextMessage)cons1.receive(1000);
             
             assertNotNull(tm);
-            
-            log.info("Got message:" + tm);
-            
-            assertEquals("message" + i, tm.getText());
+                        
+            assertEquals("message" + i, tm.getText());                        
          }
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
-            
             TextMessage tm = (TextMessage)cons2.receive(1000);
-            
+                      
             assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
             assertEquals("message" + i, tm.getText());
          }
          
-
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons4.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons5.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
       }
       finally
       {      
-         try
-         {
-            if (conn1 != null) conn1.close();
-            
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
-         {
-            
-         }
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
       }
    }
    
    
-   public void testClusteredTopicNonDurablePersistent() throws Exception
+   
+   
+   /*
+    * Create non durable subscriptions on all nodes of the cluster.
+    * Include some with selectors
+    * Ensure all messages are receive as appropriate
+    */
+   public void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
          conn2 = cf2.createConnection();
-           
+         
+         conn3 = cf3.createConnection();
+                             
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
          MessageConsumer cons1 = sess1.createConsumer(topic1);
          
          MessageConsumer cons2 = sess2.createConsumer(topic2);
          
+         MessageConsumer cons3 = sess3.createConsumer(topic3);
+         
+         MessageConsumer cons4 = sess1.createConsumer(topic1, "COLOUR='red'");
+         
+         MessageConsumer cons5 = sess2.createConsumer(topic2, "COLOUR='blue'");
+            
          conn1.start();
          
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
          MessageProducer prod1 = sess1.createProducer(topic1);
          
-         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -535,85 +683,140 @@
          {
             TextMessage tm = sess1.createTextMessage("message" + i);
             
+            int c = i % 3;
+            if (c == 0)
+            {
+               tm.setStringProperty("COLOUR", "red");
+            }
+            else if (c == 1)
+            {
+               tm.setStringProperty("COLOUR", "blue");
+            }
+            
             prod1.send(tm);
          }
-         
-         log.info("sent messages");
-         
+            
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
-            
             TextMessage tm = (TextMessage)cons1.receive(1000);
             
             assertNotNull(tm);
-            
-            log.info("Got message:" + tm);
-            
-            assertEquals("message" + i, tm.getText());
+                        
+            assertEquals("message" + i, tm.getText());                        
          }
          
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
-            
             TextMessage tm = (TextMessage)cons2.receive(1000);
-            
+                      
             assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
             assertEquals("message" + i, tm.getText());
          }
          
-
-      }
-      finally
-      {      
-         try
+         for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            if (conn1 != null) conn1.close();
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            int c = i % 3;
             
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
+            if (c == 0)
+            {
+               TextMessage tm = (TextMessage)cons4.receive(1000);
+                           
+               assertNotNull(tm);
+                
+               assertEquals("message" + i, tm.getText());
+            }
+         } 
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
          {
+            int c = i % 3;
             
-         }
+            if (c == 1)
+            {
+               TextMessage tm = (TextMessage)cons5.receive(1000);
+                           
+               assertNotNull(tm);
+                
+               assertEquals("message" + i, tm.getText());
+            }
+         } 
       }
+      finally
+      {      
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
+      }
    }
    
    
-   public void testClusteredTopicDurableNonPersistentLocal() throws Exception
+   
+   /*
+    * Create durable subscriptions on all nodes of the cluster.
+    * Include a couple with selectors
+    * Ensure all messages are receive as appropriate
+    * None of the durable subs are shared
+    */
+   public void clusteredTopicDurable(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
-         conn1.setClientID("id1");
-         
          conn2 = cf2.createConnection();
          
-         conn2.setClientID("id1");
+         conn3 = cf3.createConnection();
+         
+         conn1.setClientID("wib1");
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-               
-         MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
          
-         MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
+         
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub2");
+         
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub3");
+         
+         MessageConsumer cons4 = sess1.createDurableSubscriber(topic1, "sub4");
+         
+         MessageConsumer cons5 = sess2.createDurableSubscriber(topic2, "sub5");
+            
          conn1.start();
          
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
          MessageProducer prod1 = sess1.createProducer(topic1);
          
-         prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -623,92 +826,133 @@
             
             prod1.send(tm);
          }
-         
-         log.info("sent messages");
-         
-         //All the messages should be on the local sub
-         
+            
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
+            TextMessage tm = (TextMessage)cons1.receive(1000);
             
-            TextMessage tm = (TextMessage)durable1.receive(1000);
-            
             assertNotNull(tm);
+                        
+            assertEquals("message" + i, tm.getText());                        
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+                      
+            assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
             assertEquals("message" + i, tm.getText());
          }
          
-         Message m = durable2.receive(2000);
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
          
-         assertNull(m);
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons4.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
          
-         durable1.close();
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons5.receive(1000);
+                        
+            assertNotNull(tm);
+             
+            assertEquals("message" + i, tm.getText());
+         } 
          
-         durable2.close();
+         cons1.close();
          
+         cons2.close();
+         
+         cons3.close();
+         
+         cons4.close();
+         
+         cons5.close();
+         
          sess1.unsubscribe("sub1");
          
-         sess2.unsubscribe("sub1");
-
+         sess2.unsubscribe("sub2");
+         
+         sess3.unsubscribe("sub3");
+         
+         sess1.unsubscribe("sub4");
+         
+         sess2.unsubscribe("sub5");
+         
       }
       finally
       {      
-         try
-         {
-            if (conn1 != null) conn1.close();
-            
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
-         {
-            
-         }
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
       }
    }
    
-   public void testClusteredTopicDurablePersistentLocal() throws Exception
+   
+   
+   
+   /*
+    * Create shared durable subs on multiple nodes, the local instance should always get the message
+    */
+   protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
-         conn1.setClientID("id1");
-         
          conn2 = cf2.createConnection();
          
-         conn2.setClientID("id1");
+         conn3 = cf3.createConnection();
+         
+         conn1.setClientID("wib1");
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         try
-         {
-            sess1.unsubscribe("sub1");
-                  
-            sess2.unsubscribe("sub1");
-         }
-         catch (Exception ignore)
-         {            
-         }
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer durable1 = sess1.createDurableSubscriber(topic1, "sub1");
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
          
-         MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
          
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+         
          conn1.start();
          
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
          MessageProducer prod1 = sess1.createProducer(topic1);
          
-         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -719,76 +963,169 @@
             prod1.send(tm);
          }
          
-         log.info("sent messages");
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }
          
-         //All the messages should be on the local sub
+         Message m = cons2.receive(2000);
          
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node2
+         
+         MessageProducer prod2 = sess2.createProducer(topic2);
+         
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
-            log.info("i is " + i);
+            TextMessage tm = sess2.createTextMessage("message" + i);
             
-            TextMessage tm = (TextMessage)durable1.receive(1000);
+            prod2.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
             
             assertNotNull(tm);
+               
+            assertEquals("message" + i, tm.getText());
+         }
+         
+         m = cons1.receive(2000);
+         
+         assertNull(m);
+         
+         m = cons3.receive(2000);
+         
+         assertNull(m);
+         
+         // Send at node3
+         
+         MessageProducer prod3 = sess3.createProducer(topic3);
+         
+         prod3.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
             
-            log.info("Got message:" + tm);
+            prod3.send(tm);
+         }
+           
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
             
+            assertNotNull(tm);
+            
             assertEquals("message" + i, tm.getText());
          }
          
-         Message m = durable2.receive(2000);
+         m = cons1.receive(2000);
          
          assertNull(m);
          
+         m = cons2.receive(2000);
+         
+         assertNull(m);         
+         
+         cons1.close();
+         
+         cons2.close();
+         
+         cons3.close();
+         
+         //Need to unsubscribe on any node that the durable sub was created on
+         
          sess1.unsubscribe("sub1");
          
          sess2.unsubscribe("sub1");
          
+         sess3.unsubscribe("sub1");
       }
       finally
       {      
-         try
-         {
-            if (conn1 != null) conn1.close();
-            
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
-         {
-            
-         }
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
       }
    }
    
-     
-   public void testClusteredTopicDurableNonPersistentNotLocal() throws Exception
+   
+   /*
+    * Create shared durable subs on multiple nodes, but without consumer on local node
+    * even thought there is durable sub
+    * should round robin
+    * note that this test assumes round robin
+    */
+   protected void clusteredTopicSharedDurableNoLocalConsumer(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
-         conn1.setClientID("id1");
-         
          conn2 = cf2.createConnection();
          
-         conn2.setClientID("id1");
+         conn3 = cf3.createConnection();
+         
+         conn1.setClientID("wib1");
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
-         conn1.start();
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "sub1");
          
+         //Now close it on node 1
+         conn1.close();
+         
+         conn1 = cf1.createConnection();
+         
+         conn1.setClientID("wib1");         
+         
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         //This means the durable sub is inactive on node1
+         
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
+         
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+         
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
+         //Should round robin between the other 2 since there is no active consumer on sub1 on node1
+         
          MessageProducer prod1 = sess1.createProducer(topic1);
          
-         prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -799,71 +1136,92 @@
             prod1.send(tm);
          }
          
-         log.info("sent messages");
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i * 2, tm.getText());
+         }
          
-         //All the messages should be on the non local sub
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
-            log.info("i is " + i);
+            TextMessage tm = (TextMessage)cons3.receive(1000);
             
-            TextMessage tm = (TextMessage)durable2.receive(1000);
-            
             assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
-            assertEquals("message" + i, tm.getText());
+            assertEquals("message" + (i * 2 + 1), tm.getText());
          }
          
-         durable2.close();
-          
+         cons2.close();
+         
+         cons3.close();
+         
+         sess1.unsubscribe("sub1");
+         
          sess2.unsubscribe("sub1");
          
+         sess3.unsubscribe("sub1");
+      
       }
       finally
       {      
-         try
-         {
-            if (conn1 != null) conn1.close();
-            
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
-         {
-            
-         }
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
       }
    }
    
-   public void testClusteredTopicDurablePersistentNotLocal() throws Exception
+   
+   
+   /*
+    * Create shared durable subs on multiple nodes, but without sub on local node
+    * should round robin
+    * note that this test assumes round robin
+    */
+   protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
    {
       Connection conn1 = null;
       
       Connection conn2 = null;
+      
+      Connection conn3 = null;
       try
       {
          conn1 = cf1.createConnection();
          
-         conn1.setClientID("id1");
-         
          conn2 = cf2.createConnection();
          
-         conn2.setClientID("id1");
+         conn3 = cf3.createConnection();
+         
+         conn2.setClientID("wib1");
+         
+         conn3.setClientID("wib1");
            
          Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          
          Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-          
-         MessageConsumer durable2 = sess2.createDurableSubscriber(topic2, "sub1");
          
-         conn1.start();
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                  
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "sub1");
          
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic3, "sub1");
+         
          conn2.start();
          
+         conn3.start();
+         
+         //Send at node1
+         
+         //Should round robin between the other 2 since there is no active consumer on sub1 on node1
+         
          MessageProducer prod1 = sess1.createProducer(topic1);
          
-         prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
          
          final int NUM_MESSAGES = 100;
          
@@ -874,44 +1232,43 @@
             prod1.send(tm);
          }
          
-         log.info("sent messages");
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message" + i * 2, tm.getText());
+         }
          
-         //All the messages should be on the non local sub
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
          {
-            log.info("i is " + i);
+            TextMessage tm = (TextMessage)cons3.receive(1000);
             
-            TextMessage tm = (TextMessage)durable2.receive(1000);
-            
             assertNotNull(tm);
             
-            log.info("Got message:" + tm);
-            
-            assertEquals("message" + i, tm.getText());
+            assertEquals("message" + (i * 2 + 1), tm.getText());
          }
          
-         durable2.close();
+         cons2.close();
          
+         cons3.close();
+         
          sess2.unsubscribe("sub1");
          
+         sess3.unsubscribe("sub1");
+      
       }
       finally
       {      
-         try
-         {
-            if (conn1 != null) conn1.close();
-            
-            if (conn2 != null) conn2.close();
-         }
-         catch (Exception ignore)
-         {
-            
-         }
+         if (conn1 != null) conn1.close();
+         
+         if (conn2 != null) conn2.close();
+         
+         if (conn3 != null) conn3.close();
       }
    }
 
-   
    class MyListener implements MessageListener
    {
       private int i;




More information about the jboss-cvs-commits mailing list