[jboss-cvs] JBoss Messaging SVN: r1434 - in trunk: src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/local/base tests/src/org/jboss/test/messaging/core/paging tests/src/org/jboss/test/messaging/core/paging/base tests/src/org/jboss/test/messaging/core/plugin tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 5 12:10:45 EDT 2006


Author: timfox
Date: 2006-10-05 12:10:31 -0400 (Thu, 05 Oct 2006)
New Revision: 1434

Modified:
   trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
   trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java
   trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
   trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
A few tweaks and fixes



Modified: trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/IdManager.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -50,6 +50,8 @@
    
    protected String counterName;
    
+   private boolean started;
+   
    public IdManager(String counterName, int bigBlockSize, PersistenceManager pm) throws Exception
    {
       this.bigBlockSize = bigBlockSize;
@@ -59,18 +61,25 @@
       this.counterName = counterName;           
    }
    
-   public void start() throws Exception
+   public synchronized void start() throws Exception
    {
       getNextBigBlock();
+      
+      started = true;
    }
    
-   public void stop() throws Exception
+   public synchronized void stop() throws Exception
    {
-      //NOOP
+      started = false;
    }
    
    public synchronized IdBlock getIdBlock(int size) throws Exception
    {
+      if (!started)
+      {
+         throw new IllegalStateException(this + " is not started");
+      }
+      
       if (size <= 0)
       {
          throw new IllegalArgumentException("block size must be > 0");
@@ -104,7 +113,7 @@
    {
       nextBlock = pm.reserveIDBlock(counterName, bigBlockSize);
       
-      if (trace) { log.trace("Retrieved nex block of size " + bigBlockSize + " from pm starting at " + nextBlock); }
+      if (trace) { log.trace("Retrieved next block of size " + bigBlockSize + " from pm starting at " + nextBlock); }
       
       high = nextBlock + bigBlockSize - 1;
    }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -941,7 +941,7 @@
          if (nodeId == this.nodeId)
          {
             //Sanity check
-            throw new IllegalStateException("Cannot update queue stats for current node");
+            throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
          }
          
          Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
@@ -963,37 +963,40 @@
                
                if (bb == null)
                {
-                  throw new IllegalStateException("Cannot find binding for queue name: " + st.getQueueName());
+                  //I guess this is possible if the queue was unbound
+                  if (trace) { log.trace(this.nodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
                }
-               
-               RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
-               
-               stub.setStats(st);
-               
-               if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
-               
-               ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
-               
-               //Maybe the local queue now wants to pull message(s) from the remote queue given that the 
-               //stats for the remote queue have changed
-               LocalClusteredQueue localQueue = router.getLocalQueue();
-               
-               if (localQueue != null)
-               {               
-                  RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
+               else
+               {                  
+                  RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
                   
-                  if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+                  stub.setStats(st);
                   
-                  if (toQueue != null)
-                  {
-                     localQueue.setPullInfo(toQueue, pullSize);
+                  if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
+                  
+                  ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
+                  
+                  //Maybe the local queue now wants to pull message(s) from the remote queue given that the 
+                  //stats for the remote queue have changed
+                  LocalClusteredQueue localQueue = router.getLocalQueue();
+                  
+                  if (localQueue != null)
+                  {               
+                     RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
                      
-                     //We now trigger delivery - this may cause a pull event
-                     localQueue.deliver(false);
-                                          
-                     if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
-                  }
-               }               
+                     if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+                     
+                     if (toQueue != null)
+                     {
+                        localQueue.setPullInfo(toQueue, pullSize);
+                        
+                        //We now trigger delivery - this may cause a pull event
+                        localQueue.deliver(false);
+                                             
+                        if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
+                     }
+                  } 
+               }
             }         
          }         
       }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -333,6 +333,8 @@
       
       Iterator iter = msgs.iterator();
       
+      boolean containsReliable = false;
+      
       while (iter.hasNext())
       {
          org.jboss.messaging.core.Message msg = (org.jboss.messaging.core.Message)iter.next();
@@ -341,6 +343,8 @@
          {
             //It will already have been persisted on the other node
             msg.setPersisted(true);
+            
+            containsReliable = true;
          }
          
          MessageReference ref = null;
@@ -370,7 +374,7 @@
          
          del.acknowledge(tx);        
       }
-      
+          
       tx.commit();
       
       //TODO what if commit throws an exception - this means the commit message doesn't hit the 
@@ -380,7 +384,9 @@
       //and send a checkrequest
       //This applies to a normal message and messages requests too
             
-      if (!msgs.isEmpty())
+      //We only need to send a commit message if there were reliable messages since otherwise
+      //the transaction wouldn't have been added in the holding area
+      if (containsReliable && isRecoverable())
       {         
          req = new PullMessagesRequest(this.nodeId, tx.getId());
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -43,6 +43,8 @@
 public class PullMessagesRequest extends TransactionRequest implements ClusterTransaction
 {
    private static final Logger log = Logger.getLogger(PullMessagesRequest.class);
+   
+   private boolean trace = log.isTraceEnabled();
       
    private String queueName;
    
@@ -74,10 +76,14 @@
    { 
       TransactionId id = new TransactionId(nodeId, txId);
       
+      if (trace) { log.trace("Executing PullMessagesRequest with id: " + id + " hold: " + hold); }
+      
       if (hold)
-      {         
+      {                           
          List dels = office.getDeliveries(queueName, numMessages);
          
+         if (trace) { log.trace("PullMessagesRequest got " + dels.size() + " deliveries"); }
+         
          PullMessagesResponse response = new PullMessagesResponse(dels.size());
          
          if (!dels.isEmpty())
@@ -91,10 +97,10 @@
                //Add it to internal list
                if (reliableDels == null)
                {
-                  reliableDels  = new ArrayList();
-                  
-                  reliableDels.add(del);
+                  reliableDels  = new ArrayList();                                    
                }
+               
+               reliableDels.add(del);
             }
             else
             {
@@ -104,7 +110,7 @@
             
             response.addMessage(del.getReference().getMessage());
          }
-         
+              
          if (reliableDels != null)
          {
             //Add this to the holding area

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -47,6 +47,8 @@
    private Timer timer;
    
    private long period;
+    
+   private SendStatsTimerTask task;
    
    StatsSender(PostOfficeInternal office, long period)
    {
@@ -68,9 +70,11 @@
       //Add a random delay to prevent all timers starting at once
       long delay = (long)(period * Math.random());
       
-      TimerTask task = new SendStatsTimerTask();
+      task = new SendStatsTimerTask();
+            
+      timer.schedule(task, delay, period);      
       
-      timer.schedule(task, delay, period);      
+      started = true;
    }
 
    public synchronized void stop() throws Exception
@@ -79,16 +83,29 @@
       {
          return;
       }
+            
+      //Wait for timer task to stop
       
+      task.stop();
+      
       timer.cancel();
       
       timer = null;
+      
+      started = false;
    }      
    
    class SendStatsTimerTask extends TimerTask
    {
+      private boolean stopping;
+      private boolean stopped;
+      
+      private Object stopLock = new Object();
+      
       public void run()
       {
+         checkStop();
+         
          try
          {
             office.sendQueueStats();
@@ -97,6 +114,42 @@
          {
             log.error("Failed to send statistics", e);
          }
-      }      
+         
+         checkStop();
+      }  
+      
+      private void checkStop()
+      {
+         synchronized (stopLock)
+         {            
+            if (stopping)
+            {
+               cancel();
+               stopped = true;
+               stopLock.notify();
+               return;
+            }
+         }
+      }
+            
+      void stop()
+      {
+         synchronized (stopLock)
+         {
+            stopping = true;
+            
+            while (!stopped)
+            {
+               try
+               {
+                  stopLock.wait();               
+               }
+               catch (InterruptedException e)
+               {
+                  //Ignore
+               }
+            }
+         }
+      }
    }
 }

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleDeliveryTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -116,7 +116,10 @@
                                     true, true, true, 100);      
       pm.start();
       
-      TransactionRepository tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      IdManager idm = new IdManager("TRANSACTION_ID", 10, pm);
+      idm.start();
+      
+      TransactionRepository tr = new TransactionRepository(pm, idm);
       tr.start();
       
       Transaction tx = tr.createTransaction();

Modified: trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/base/PagingFilteredQueueTestBase.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -96,6 +96,8 @@
 
    protected PagingFilteredQueue queue;
    
+   protected IdManager idm;
+   
    // Constructors --------------------------------------------------
 
    public PagingFilteredQueueTestBase(String name)
@@ -117,7 +119,10 @@
                                     true, true, true, 100);      
       pm.start();
       
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      idm = new IdManager("TRANSACTION_ID", 10, pm);
+      idm.start();
+      
+      tr = new TransactionRepository(pm, idm);
       tr.start();
       
       ms = new SimpleMessageStore();
@@ -127,8 +132,15 @@
    public void tearDown() throws Exception
    {
       sc.stop();
-      sc = null;
       
+      pm.stop();
+      idm.stop();
+      tr.stop();
+      ms.stop();
+      
+      sc = null;   
+      pm = null;
+      idm = null;
       ms = null;
       tr = null;
       super.tearDown();

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/SingleChannel_ReloadTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -120,7 +120,7 @@
                                     true, true, true, 100);      
       pm.start();
       
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      tr = new TransactionRepository(pm, idm);
       tr.start();
       
       ms = new SimpleMessageStore();
@@ -217,7 +217,7 @@
                                     true, true, true, 100);      
       pm.start();
       
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      tr = new TransactionRepository(pm, idm);
       tr.start();
       
       ms = new SimpleMessageStore();

Modified: trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/paging/base/PagingStateTestBase.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -71,6 +71,7 @@
    protected PersistenceManager pm;
    protected SimpleMessageStore ms;
    protected TransactionRepository tr;
+   protected IdManager idm;
 
    // Constructors --------------------------------------------------
 
@@ -98,7 +99,10 @@
                                     true, true, true, 100);      
       pm.start();
       
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      idm = new IdManager("TRANSACTION_ID", 10, pm);
+      idm.start();
+      
+      tr = new TransactionRepository(pm, idm);
       tr.start();
       
       ms = new SimpleMessageStore();

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/IdManagerTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -90,6 +90,7 @@
    public void test1() throws Exception
    {
       IdManager idm = new IdManager("test_counter", 1000, pm);
+      idm.start();
       
       int blockSize = 37;
             
@@ -105,7 +106,24 @@
          
          nextLow = block.getHigh() + 1;         
       }
+      
+      idm.stop();
    }
+   
+   public void test2() throws Exception
+   {
+      IdManager idm = new IdManager("test_counter2", 100, pm);
+      idm.start();
+         
+      for (int i = 0; i < 1000; i++)
+      {
+         long id = idm.getId();
+         
+         assertEquals(i, id);
+      }
+      
+      idm.stop();
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/JDBCPersistenceManagerTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -1161,7 +1161,11 @@
       doSetup(batch, 100);
 
       Channel channel = new SimpleChannel(0, ms);
-      TransactionRepository txRep = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      
+      IdManager idm = new IdManager("TRANSACTION_ID", 10, pm);
+      idm.start();
+      
+      TransactionRepository txRep = new TransactionRepository(pm, idm);
       txRep.start();
 
       log.debug("transaction log started");
@@ -1268,7 +1272,11 @@
       doSetup(batch, 100);
 
       Channel channel = new SimpleChannel(0, ms);
-      TransactionRepository txRep = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      
+      IdManager idm = new IdManager("TRANSACTION_ID", 10, pm);
+      idm.start();
+      
+      TransactionRepository txRep = new TransactionRepository(pm, idm);
       txRep.start();
  
       Message[] messages = createMessages(10);     

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -73,8 +73,10 @@
 
    protected ServiceContainer sc;
 
-   protected IdManager im;   
+   protected IdManager channelIdManager;   
    
+   protected IdManager transactionIdManager;   
+   
    protected PersistenceManager pm;
       
    protected MessageStore ms;
@@ -102,10 +104,13 @@
       
       pm =
          new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
-                                    true, true, true, 100);      
+                                    true, false, true, 100);      
       pm.start();
       
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      transactionIdManager = new IdManager("TRANSACTION_ID", 10, pm);
+      transactionIdManager.start();
+      
+      tr = new TransactionRepository(pm, transactionIdManager);
       tr.start();
       
       ms = new SimpleMessageStore();
@@ -113,7 +118,8 @@
       
       pool = new QueuedExecutorPool(10);
       
-      im = new IdManager("CHANNEL_ID", 10, pm);
+      channelIdManager = new IdManager("CHANNEL_ID", 10, pm);
+      channelIdManager.start();
             
       log.debug("setup done");
    }
@@ -128,6 +134,8 @@
       pm.stop();
       tr.stop();
       ms.stop();
+      transactionIdManager.stop();
+      channelIdManager.stop();
       
       super.tearDown();
    }
@@ -269,7 +277,7 @@
          list.add(msg);
       }
       
-      Thread.sleep(2000);
+      Thread.sleep(1000);
       
       return list;
    }

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -120,7 +120,7 @@
          Filter filter1 = new Selector("x = 'cheese'");
          Filter filter2 = new Selector("y = 'bread'");
          
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("durableQueue", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("durableQueue", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
          
          
          Binding binding1 =
@@ -138,7 +138,7 @@
          }
                
          //Bind one non durable
-         PagingFilteredQueue queue2 = new PagingFilteredQueue("nonDurableQueue", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         PagingFilteredQueue queue2 = new PagingFilteredQueue("nonDurableQueue", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
          
          Binding binding2 =
             office1.bindQueue("condition2", queue2);
@@ -225,42 +225,42 @@
       {      
          office = createPostOffice();
          
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
          
          Binding binding1 =
             office.bindQueue("condition1", queue1);
          
-         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding2 =
             office.bindQueue("condition1", queue2);
          
-         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding3 =
             office.bindQueue("condition1", queue3);
          
-         PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding4 =
             office.bindQueue("condition1", queue4);
          
-         PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding5 =
             office.bindQueue("condition2", queue5);
          
-         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding6 =
             office.bindQueue("condition2", queue6);
          
-         PagingFilteredQueue queue7 = new PagingFilteredQueue("queue7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue7 = new PagingFilteredQueue("queue7", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding7 =
             office.bindQueue("condition2", queue7);
          
-         PagingFilteredQueue queue8 = new PagingFilteredQueue("queue8", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue8 = new PagingFilteredQueue("queue8", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding8 =
             office.bindQueue("condition2", queue8);
@@ -354,32 +354,32 @@
       
          postOffice = createPostOffice();
          
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding1 =
             postOffice.bindQueue("topic1", queue1);
          
-         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding2 =
             postOffice.bindQueue("topic1", queue2);
          
-         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding3 =
             postOffice.bindQueue("topic1", queue3);
          
-         PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding4 =
             postOffice.bindQueue("topic2", queue4);
          
-         PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding5 =
             postOffice.bindQueue("topic2", queue5);
          
-         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding6 =
             postOffice.bindQueue("topic2", queue6);
@@ -504,7 +504,7 @@
       {      
          postOffice = createPostOffice();
          
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding1 =
             postOffice.bindQueue("condition1", queue1);
@@ -554,17 +554,17 @@
          
          SimpleFilter filter = new SimpleFilter(2);
       
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter);         
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter);         
          
          Binding binding1 =
             postOffice.bindQueue("topic1", queue1);
          
-         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding2 =
             postOffice.bindQueue("topic1", queue2);
          
-         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding3 =
             postOffice.bindQueue("topic1", queue3);   
@@ -654,32 +654,32 @@
       {      
          postOffice = createPostOffice();
       
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding1 =
             postOffice.bindQueue("topic1", queue1);
          
-         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding2 =
             postOffice.bindQueue("topic1", queue2);
          
-         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue3 = new PagingFilteredQueue("queue3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
          
          Binding binding3 =
             postOffice.bindQueue("topic1", queue3);
          
-         PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue4 = new PagingFilteredQueue("queue4", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
          
          Binding binding4 =
             postOffice.bindQueue("topic2", queue4);
          
-         PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue5 = new PagingFilteredQueue("queue5", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
          
          Binding binding5 =
             postOffice.bindQueue("topic2", queue5);
          
-         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
+         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue6", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);         
          
          Binding binding6 =
             postOffice.bindQueue("topic2", queue6);
@@ -826,12 +826,12 @@
       {      
          postOffice = createPostOffice();
       
-         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
+         PagingFilteredQueue queue1 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
          
          Binding binding1 =
             postOffice.bindQueue("topic1", queue1);
          
-         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
+         PagingFilteredQueue queue2 = new PagingFilteredQueue("queue2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null);
          
          Binding binding2 =
             postOffice.bindQueue("topic1", queue2);

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -75,8 +75,7 @@
 
    public void setUp() throws Exception
    {
-      super.setUp();     
-            
+      super.setUp();           
    }
 
    public void tearDown() throws Exception
@@ -100,11 +99,11 @@
          
          //Add a couple of bindings
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, 1, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
 
          Binding binding2 =
             office1.bindClusteredQueue("topic1", queue2);
@@ -123,7 +122,7 @@
          
          //Add another binding on node 2
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "sub3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
 
          Binding binding3 =
             office2.bindClusteredQueue("topic1", queue3);
@@ -150,7 +149,7 @@
 
          //Add another binding on node 1
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "sub4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding4 =
             office2.bindClusteredQueue("topic1", queue4);
          
@@ -214,7 +213,7 @@
          
          //Add another binding on node 3
                   
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, 3, "sub5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office3, 3, "sub5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding5 =
             office3.bindClusteredQueue("topic1", queue5);
@@ -250,12 +249,12 @@
          
          //Add a durable and a non durable binding on node 1
          
-         LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, 1, "sub6", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue6 = new LocalClusteredQueue(office1, 1, "sub6", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          
          Binding binding6 =
             office1.bindClusteredQueue("topic1", queue6);
          
-         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "sub7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "sub7", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          
          Binding binding7 =
             office1.bindClusteredQueue("topic1", queue7);
@@ -423,7 +422,7 @@
          
          if (office3 != null)
          {
-            office2.stop();
+            office3.stop();
          }
          
          checkNoBindingData();
@@ -451,26 +450,46 @@
       clusteredTransactionalRoute(false);
    }
    
-   public void testClusteredNonPersistentRouteWithFilter() throws Throwable
+   public void testClusteredNonPersistentRouteWithFilterNonRecoverable() throws Throwable
    {
-      this.clusteredRouteWithFilter(false);
+      this.clusteredRouteWithFilter(false, false);
    }
    
-   public void testClusteredPersistentRouteWithFilter() throws Throwable
+   public void testClusteredPersistentRouteWithFilterNonRecoverable() throws Throwable
    {
-      this.clusteredRouteWithFilter(true);
+      this.clusteredRouteWithFilter(true, false);
    }
    
-   public void testRouteSharedPointToPointQueuePersistent() throws Throwable
+   public void testClusteredNonPersistentRouteWithFilterRecoverable() throws Throwable
    {
-      this.routeSharedQueue(true);
+      this.clusteredRouteWithFilter(false, true);
    }
    
-   public void testRouteSharedPointToPointQueueNonPersistent() throws Throwable
+   public void testClusteredPersistentRouteWithFilterRecoverable() throws Throwable
    {
-      this.routeSharedQueue(false);
+      this.clusteredRouteWithFilter(true, true);
    }
+      
+   public void testRouteSharedPointToPointQueuePersistentNonRecoverable() throws Throwable
+   {
+      this.routeSharedQueue(true, false);
+   }
    
+   public void testRouteSharedPointToPointQueueNonPersistentNonRecoverable() throws Throwable
+   {
+      this.routeSharedQueue(false, false);
+   }
+   
+   public void testRouteSharedPointToPointQueuePersistentRecoverable() throws Throwable
+   {
+      this.routeSharedQueue(true, true);
+   }
+   
+   public void testRouteSharedPointToPointQueueNonPersistentRecoverable() throws Throwable
+   {
+      this.routeSharedQueue(false, true);
+   }
+   
    public void testRouteComplexTopicPersistent() throws Throwable
    {
       this.routeComplexTopic(true);
@@ -480,18 +499,28 @@
    {
       this.routeComplexTopic(false);
    }
+         
+   public void testRouteLocalQueuesPersistentNonRecoverable() throws Throwable
+   {
+      this.routeLocalQueues(true, false);
+   }
    
-   public void testRouteLocalQueuesPersistent() throws Throwable
+   public void testRouteLocalQueuesNonPersistentNonRecoverable() throws Throwable
    {
-      this.routeLocalQueues(true);
+      this.routeLocalQueues(false, false);
    }
    
-   public void testRouteLocalQueuesNonPersistent() throws Throwable
+   public void testRouteLocalQueuesPersistentRecoverable() throws Throwable
    {
-      this.routeLocalQueues(false);
+      this.routeLocalQueues(true, true);
    }
    
+   public void testRouteLocalQueuesNonPersistentRecoverable() throws Throwable
+   {
+      this.routeLocalQueues(false, true);
+   }
    
+   
    /*
     * We should allow the clustered bind of queues with the same queue name on different nodes of the
     * cluster
@@ -508,15 +537,15 @@
          
          office2 = createClusteredPostOffice(2, "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
                   
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          try
          {
@@ -527,7 +556,7 @@
          {
             //Ok
          }
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          try
          {
@@ -543,11 +572,11 @@
          
          office2.unbindClusteredQueue("queue1");
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          
          Binding binding5 = office1.bindClusteredQueue("queue1", queue5);
          
-         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);       
+         PagingFilteredQueue queue6 = new PagingFilteredQueue("queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);       
          try
          {
             Binding binding6 = office1.bindQueue("queue1", queue6);
@@ -561,13 +590,13 @@
          office1.unbindClusteredQueue("queue1");
          
          //It should be possible to bind queues locally into a clustered post office
-         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue7 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding7 = office1.bindQueue("queue1", queue7);
          
-         LocalClusteredQueue queue8 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);       
+         LocalClusteredQueue queue8 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);       
          Binding binding8 = office2.bindQueue("queue1", queue8);
          
-         LocalClusteredQueue queue9 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);       
+         LocalClusteredQueue queue9 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);       
          try
          {
             Binding binding9 = office1.bindQueue("queue1", queue9);
@@ -599,7 +628,7 @@
 
    // Protected -----------------------------------------------------
    
-   protected void clusteredRouteWithFilter(boolean persistentMessage) throws Throwable
+   protected void clusteredRouteWithFilter(boolean persistentMessage, boolean recoverable) throws Throwable
    {
       ClusteredPostOffice office1 = null;
       
@@ -613,15 +642,15 @@
          SimpleFilter filter1 = new SimpleFilter(2);
          SimpleFilter filter2 = new SimpleFilter(3);
       
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter1, tr);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), filter1, tr);         
          Binding binding1 =
             office1.bindClusteredQueue("topic1", queue1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), filter2, tr);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue2", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), filter2, tr);         
          Binding binding2 =
             office2.bindClusteredQueue("topic1", queue2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "queue3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office2, 2, "queue3", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 =
             office2.bindClusteredQueue("topic1", queue3);   
          
@@ -721,52 +750,52 @@
          LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
          Binding[] bindings = new Binding[16];
          
-         queues[0] = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[0] = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
          
-         queues[1] = new LocalClusteredQueue(office1, 1, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[1] = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
          
-         queues[2] = new LocalClusteredQueue(office2, 2, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[2] = new LocalClusteredQueue(office2, 2, "sub3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
          
-         queues[3] = new LocalClusteredQueue(office2, 2, "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[3] = new LocalClusteredQueue(office2, 2, "sub4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
          
-         queues[4] = new LocalClusteredQueue(office2, 2, "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[4] = new LocalClusteredQueue(office2, 2, "sub5", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
          
-         queues[5] = new LocalClusteredQueue(office1, 1, "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[5] = new LocalClusteredQueue(office1, 1, "sub6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
          
-         queues[6] = new LocalClusteredQueue(office1, 1, "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[6] = new LocalClusteredQueue(office1, 1, "sub7", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
          
-         queues[7] = new LocalClusteredQueue(office1, 1, "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[7] = new LocalClusteredQueue(office1, 1, "sub8", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
          
-         queues[8] = new LocalClusteredQueue(office1, 1, "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[8] = new LocalClusteredQueue(office1, 1, "sub9", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
          
-         queues[9] = new LocalClusteredQueue(office1, 1, "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[9] = new LocalClusteredQueue(office1, 1, "sub10", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
          
-         queues[10] = new LocalClusteredQueue(office2, 2, "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[10] = new LocalClusteredQueue(office2, 2, "sub11", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
          
-         queues[11] = new LocalClusteredQueue(office2, 2, "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[11] = new LocalClusteredQueue(office2, 2, "sub12", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
          
-         queues[12] = new LocalClusteredQueue(office2, 2, "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[12] = new LocalClusteredQueue(office2, 2, "sub13", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
          
-         queues[13] = new LocalClusteredQueue(office1, 1, "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[13] = new LocalClusteredQueue(office1, 1, "sub14", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
          
-         queues[14] = new LocalClusteredQueue(office1, 1, "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[14] = new LocalClusteredQueue(office1, 1, "sub15", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
          
-         queues[15] = new LocalClusteredQueue(office1, 1, "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[15] = new LocalClusteredQueue(office1, 1, "sub16", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
        
          SimpleReceiver[] receivers = new SimpleReceiver[16];
@@ -882,7 +911,7 @@
       }
    }
    
-   protected void routeSharedQueue(boolean persistentMessage) throws Throwable
+   protected void routeSharedQueue(boolean persistentMessage, boolean recoverable) throws Throwable
    {
       ClusteredPostOffice office1 = null;
       
@@ -908,27 +937,27 @@
          //We deploy the queue on nodes 1, 2, 3, 4 and 5
          //We don't deploy on node 6
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding2 = office2.bindClusteredQueue("queue1", queue2); 
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding4 = office4.bindClusteredQueue("queue1", queue4); 
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);
@@ -1044,7 +1073,7 @@
    /*
     * Clustered post offices should be able to have local queues bound to them too.
     */
-   protected void routeLocalQueues(boolean persistentMessage) throws Throwable
+   protected void routeLocalQueues(boolean persistentMessage, boolean recoverable) throws Throwable
    {
       ClusteredPostOffice office1 = null;
       
@@ -1058,17 +1087,17 @@
          office2 = createClusteredPostOffice(2, "testgroup");
          office3 = createClusteredPostOffice(3, "testgroup");
 
-         LocalClusteredQueue sub1 = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sub1 = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding1 = office1.bindQueue("topic", sub1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sub1.add(receiver1);
          
-         LocalClusteredQueue sub2 = new LocalClusteredQueue(office2, 2, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sub2 = new LocalClusteredQueue(office2, 2, "sub2", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding2 = office2.bindQueue("topic", sub2); 
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sub2.add(receiver2);
          
-         LocalClusteredQueue sub3 = new LocalClusteredQueue(office3, 3, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sub3 = new LocalClusteredQueue(office3, 3, "sub3", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);
          Binding binding3 = office3.bindQueue("topic", sub3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sub3.add(receiver3);
@@ -1172,13 +1201,13 @@
          //======
          
          //Non durable 1 on node 2
-         LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, 2, "nondurable1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable1 = new LocalClusteredQueue(office2, 2, "nondurable1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding1 = office2.bindClusteredQueue("topic", nonDurable1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable1.add(receiver1);
          
          //Non durable 2 on node 2
-         LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, 2, "nondurable2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable2 = new LocalClusteredQueue(office2, 2, "nondurable2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding2 = office2.bindClusteredQueue("topic", nonDurable2);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable2.add(receiver2);
@@ -1187,13 +1216,13 @@
          //======
          
          //Non shared durable
-         LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, 3, "nonshareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonSharedDurable1 = new LocalClusteredQueue(office3, 3, "nonshareddurable1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding3 = office3.bindClusteredQueue("topic", nonSharedDurable1);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonSharedDurable1.add(receiver3);
          
          //Non durable
-         LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, 3, "nondurable3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable3 = new LocalClusteredQueue(office3, 3, "nondurable3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding4 = office3.bindClusteredQueue("topic", nonDurable3);
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable3.add(receiver4);
@@ -1202,31 +1231,31 @@
          //======
          
          //Shared durable
-         LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, 4, "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sharedDurable1 = new LocalClusteredQueue(office4, 4, "shareddurable1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding5 = office4.bindClusteredQueue("topic", sharedDurable1);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable1.add(receiver5);
          
          //Non shared durable
-         LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, 4, "nonshareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonSharedDurable2 = new LocalClusteredQueue(office4, 4, "nonshareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding6 = office4.bindClusteredQueue("topic", nonSharedDurable2);
          SimpleReceiver receiver6 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonSharedDurable2.add(receiver6);
          
          //Non durable
-         LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, 4, "nondurable4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable4 = new LocalClusteredQueue(office4, 4, "nondurable4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding7 = office4.bindClusteredQueue("topic", nonDurable4);
          SimpleReceiver receiver7 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable4.add(receiver7);
          
          // Non durable
-         LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, 4, "nondurable5", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable5 = new LocalClusteredQueue(office4, 4, "nondurable5", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding8 = office4.bindClusteredQueue("topic", nonDurable5);
          SimpleReceiver receiver8 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable5.add(receiver8);
          
          //Non durable
-         LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, 4, "nondurable6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable6 = new LocalClusteredQueue(office4, 4, "nondurable6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding9 = office4.bindClusteredQueue("topic", nonDurable6);
          SimpleReceiver receiver9 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable6.add(receiver9);
@@ -1234,32 +1263,32 @@
          // Node 5
          //=======
          //Shared durable
-         LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, 5, "shareddurable1", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sharedDurable2 = new LocalClusteredQueue(office5, 5, "shareddurable1", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding10 = office5.bindClusteredQueue("topic", sharedDurable2);
          SimpleReceiver receiver10 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable2.add(receiver10);
          
          //Shared durable
-         LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, 5, "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sharedDurable3 = new LocalClusteredQueue(office5, 5, "shareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding11 = office5.bindClusteredQueue("topic", sharedDurable3);
          SimpleReceiver receiver11 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable3.add(receiver11);
          
          // Node 6
          //=========
-         LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, 6, "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sharedDurable4 = new LocalClusteredQueue(office6, 6, "shareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding12 = office6.bindClusteredQueue("topic", sharedDurable4);
          SimpleReceiver receiver12 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable4.add(receiver12);
          
-         LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, 6, "nondurable7", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue nonDurable7 = new LocalClusteredQueue(office6, 6, "nondurable7", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);
          Binding binding13 = office6.bindClusteredQueue("topic", nonDurable7);
          SimpleReceiver receiver13 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          nonDurable7.add(receiver13);
          
          //Node 7
          //=======
-         LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, 7, "shareddurable2", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
+         LocalClusteredQueue sharedDurable5 = new LocalClusteredQueue(office7, 7, "shareddurable2", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);
          Binding binding14 = office7.bindClusteredQueue("topic", sharedDurable5);
          SimpleReceiver receiver14 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          sharedDurable5.add(receiver14);
@@ -1588,52 +1617,52 @@
          LocalClusteredQueue[] queues = new LocalClusteredQueue[16];
          Binding[] bindings = new Binding[16];
          
-         queues[0] = new LocalClusteredQueue(office1, 1, "sub1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[0] = new LocalClusteredQueue(office1, 1, "sub1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[0] = office1.bindClusteredQueue("topic1", queues[0]);
          
-         queues[1] = new LocalClusteredQueue(office1, 1, "sub2", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[1] = new LocalClusteredQueue(office1, 1, "sub2", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[1] = office1.bindClusteredQueue("topic1", queues[1]);
          
-         queues[2] = new LocalClusteredQueue(office2, 2, "sub3", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[2] = new LocalClusteredQueue(office2, 2, "sub3", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[2] = office2.bindClusteredQueue("topic1", queues[2]);
          
-         queues[3] = new LocalClusteredQueue(office2, 2, "sub4", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[3] = new LocalClusteredQueue(office2, 2, "sub4", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[3] = office2.bindClusteredQueue("topic1", queues[3]);
          
-         queues[4] = new LocalClusteredQueue(office2, 2, "sub5", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[4] = new LocalClusteredQueue(office2, 2, "sub5", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[4] = office2.bindClusteredQueue("topic1", queues[4]);
          
-         queues[5] = new LocalClusteredQueue(office1, 1, "sub6", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[5] = new LocalClusteredQueue(office1, 1, "sub6", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[5] = office1.bindClusteredQueue("topic1", queues[5]);
          
-         queues[6] = new LocalClusteredQueue(office1, 1, "sub7", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[6] = new LocalClusteredQueue(office1, 1, "sub7", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[6] = office1.bindClusteredQueue("topic1", queues[6]);
          
-         queues[7] = new LocalClusteredQueue(office1, 1, "sub8", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[7] = new LocalClusteredQueue(office1, 1, "sub8", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[7] = office1.bindClusteredQueue("topic1", queues[7]);
          
-         queues[8] = new LocalClusteredQueue(office1, 1, "sub9", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[8] = new LocalClusteredQueue(office1, 1, "sub9", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[8] = office1.bindClusteredQueue("topic2", queues[8]);
          
-         queues[9] = new LocalClusteredQueue(office1, 1, "sub10", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[9] = new LocalClusteredQueue(office1, 1, "sub10", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[9] = office1.bindClusteredQueue("topic2", queues[9]);
          
-         queues[10] = new LocalClusteredQueue(office2, 2, "sub11", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[10] = new LocalClusteredQueue(office2, 2, "sub11", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[10] = office2.bindClusteredQueue("topic2", queues[10]);
          
-         queues[11] = new LocalClusteredQueue(office2, 2, "sub12", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[11] = new LocalClusteredQueue(office2, 2, "sub12", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[11] = office2.bindClusteredQueue("topic2", queues[11]);
          
-         queues[12] = new LocalClusteredQueue(office2, 2, "sub13", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[12] = new LocalClusteredQueue(office2, 2, "sub13", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[12] = office2.bindClusteredQueue("topic2", queues[12]);
          
-         queues[13] = new LocalClusteredQueue(office1, 1, "sub14", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         queues[13] = new LocalClusteredQueue(office1, 1, "sub14", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          bindings[13] = office1.bindClusteredQueue("topic2", queues[13]);
          
-         queues[14] = new LocalClusteredQueue(office1, 1, "sub15", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[14] = new LocalClusteredQueue(office1, 1, "sub15", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[14] = office1.bindClusteredQueue("topic2", queues[14]);
          
-         queues[15] = new LocalClusteredQueue(office1, 1, "sub16", im.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
+         queues[15] = new LocalClusteredQueue(office1, 1, "sub16", channelIdManager.getId(), ms, pm, true, true, (QueuedExecutor)pool.get(), null, tr);         
          bindings[15] = office1.bindClusteredQueue("topic2", queues[15]);
 
          SimpleReceiver[] receivers = new SimpleReceiver[16];

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -123,27 +123,27 @@
          
          office6 = createClusteredPostOffice(6, "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 = office2.bindClusteredQueue("topic", queue1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding2 = office3.bindClusteredQueue("topic", queue2);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 = office4.bindClusteredQueue("topic", queue3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding4 = office5.bindClusteredQueue("topic", queue4);
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding5 = office6.bindClusteredQueue("topic", queue5);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);
@@ -262,27 +262,27 @@
          
          office6 = createClusteredPostOffice(6, "testgroup");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 = office2.bindClusteredQueue("topic", queue1);
          SimpleReceiver receiver1 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue1.add(receiver1);
          
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding2 = office3.bindClusteredQueue("topic", queue2);
          SimpleReceiver receiver2 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue2.add(receiver2);
          
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 = office4.bindClusteredQueue("topic", queue3);
          SimpleReceiver receiver3 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue3.add(receiver3);
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding4 = office5.bindClusteredQueue("topic", queue4);
          SimpleReceiver receiver4 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue4.add(receiver4);
          
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office6, 6, "queue1", channelIdManager.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding5 = office6.bindClusteredQueue("topic", queue5);
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-10-04 23:41:46 UTC (rev 1433)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-10-05 16:10:31 UTC (rev 1434)
@@ -69,18 +69,28 @@
       super.tearDown();
    }
    
-   public void testRedistNonPersistent() throws Throwable
+   public void testRedistNonPersistentNonRecoverable() throws Throwable
    {
-      redistTest(false);
+      redistTest(false, false);
    }
    
-   public void testRedistPersistent() throws Throwable
+   public void testRedistPersistentNonRecoverable() throws Throwable
    {
-      redistTest(true);
+      redistTest(true, false);
    }
    
-   public void redistTest(boolean persistent) throws Throwable
+   public void testRedistNonPersistentRecoverable() throws Throwable
    {
+      redistTest(false, true);
+   }
+   
+   public void testRedistPersistentRecoverable() throws Throwable
+   {
+      redistTest(true, true);
+   }
+   
+   public void redistTest(boolean persistent, boolean recoverable) throws Throwable
+   {
       ClusteredPostOffice office1 = null;
       
       ClusteredPostOffice office2 = null;
@@ -105,19 +115,19 @@
          
          log.info("Started offices");
          
-         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, 1, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
                   
-         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, 2, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
                   
-         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, 3, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding3 = office3.bindClusteredQueue("queue1", queue3);         
          
-         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, 4, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
                   
-         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null, tr);         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, 5, "queue1", channelIdManager.getId(), ms, pm, true, recoverable, (QueuedExecutor)pool.get(), null, tr);         
          Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
                   
          log.info("bound queues");




More information about the jboss-cvs-commits mailing list