[Jboss-cvs] JBoss Messaging SVN: r1353 - in trunk: src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 22 15:47:17 EDT 2006


Author: timfox
Date: 2006-09-22 15:47:06 -0400 (Fri, 22 Sep 2006)
New Revision: 1353

Added:
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Modified:
   trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
   trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
Log:
More clustering debugging


Modified: trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/etc/server/default/deploy/hsqldb-persistence-service.xml	2006-09-22 19:47:06 UTC (rev 1353)
@@ -60,6 +60,7 @@
       <attribute name="StateTimeout">5000</attribute>
       <attribute name="CastTimeout">5000</attribute>
       <attribute name="PullSize">1</attribute>
+      <attribute name="StatsSendPeriod">1000</attribute>
       <attribute name="SyncChannelConfig">
          <UDP mcast_addr="228.8.8.8" mcast_port="45568"
               ip_ttl="8" ip_mcast="true"

Modified: trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-09-22 19:47:06 UTC (rev 1353)
@@ -65,6 +65,12 @@
       <type>int</type>
    </attribute> 
    
+   <attribute access="read-write" getMethod="getStatsSendPeriod" setMethod="setStatsSendPeriod">
+      <description>The period in milliseconds between a post office casting it's statistics across the cluster</description>
+      <name>StatsSendPeriod/name>
+      <type>long</type>
+   </attribute>    
+   
    <attribute access="read-write" getMethod="getSyncChannelConfig" setMethod="setSyncChannelConfig">
       <description>The JGroups stack configuration for the synchronous channel</description>
       <name>SyncChannelConfig</name>

Modified: trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -73,6 +73,8 @@
    
    private int pullSize = 1;   
    
+   private long statsSendPeriod = 1000;
+   
    // Constructors --------------------------------------------------------
    
    public ClusteredPostOfficeService()
@@ -178,6 +180,16 @@
       return pullSize;
    }
    
+   public void setStatsSendPeriod(long period)
+   {
+      this.statsSendPeriod = period;
+   }
+   
+   public long getStatsSendPeriod()
+   {
+      return statsSendPeriod;
+   }
+   
    // ServiceMBeanSupport overrides ---------------------------------
    
    protected synchronized void startService() throws Exception
@@ -218,7 +230,8 @@
                                                syncChannelConfig, asyncChannelConfig,
                                                stateTimeout, castTimeout,
                                                pullPolicy, rf,
-                                               pullSize);
+                                               pullSize,
+                                               statsSendPeriod);
          
          postOffice.start();
          

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -128,6 +128,10 @@
    private int pullSize;
    
    private Map routerMap;
+   
+   private StatsSender statsSender;
+   
+   private long statsSendPeriod;
       
    public DefaultClusteredPostOffice()
    {        
@@ -157,11 +161,12 @@
             long stateTimeout, long castTimeout,
             MessagePullPolicy redistributionPolicy,
             ClusterRouterFactory rf,
-            int pullSize) throws Exception
+            int pullSize,
+            long statsSendPeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
-           rf, pullSize);
+           rf, pullSize, statsSendPeriod);
       
       this.syncChannelConfigE = syncChannelConfig;      
       this.asyncChannelConfigE = asyncChannelConfig;     
@@ -183,11 +188,12 @@
                               long stateTimeout, long castTimeout,
                               MessagePullPolicy redistributionPolicy,                      
                               ClusterRouterFactory rf,
-                              int pullSize) throws Exception
+                              int pullSize,
+                              long statsSendPeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
-           rf, pullSize);
+           rf, pullSize, statsSendPeriod);
 
       this.syncChannelConfigS = syncChannelConfig;      
       this.asyncChannelConfigS = asyncChannelConfig;     
@@ -204,7 +210,8 @@
                                long stateTimeout, long castTimeout,                             
                                MessagePullPolicy redistributionPolicy,                               
                                ClusterRouterFactory rf,
-                               int pullSize)
+                               int pullSize,
+                               long statsSendPeriod)
    {
       super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
              pool);
@@ -223,8 +230,12 @@
       
       this.pullSize = pullSize;
       
+      this.statsSendPeriod = statsSendPeriod;
+      
       routerMap = new HashMap();
       
+      statsSender = new StatsSender(this, statsSendPeriod);
+      
       init();
    }
 
@@ -274,11 +285,15 @@
       handleAddressNodeMapping(currentAddress, nodeId);
       
       syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));           
+      
+      statsSender.start();
    }
 
    public void stop() throws Exception
    {
       super.stop();
+      
+      statsSender.stop();
          
       syncChannel.close();
       
@@ -1111,8 +1126,16 @@
       ClusterRequest req = new PullMessagesRequest(this.nodeId, tx.getId(), remoteQueue.getChannelID(),
                                                    localQueue.getName(), num);
       
-      List msgs = (List)syncSendRequest(req, fromAddress);
+      byte[] bytes = (byte[])syncSendRequest(req, fromAddress);
       
+      PullMessagesResponse response = new PullMessagesResponse();
+      
+      StreamUtils.fromBytes(response, bytes);
+
+      List msgs = response.getMessages();
+      
+      log.info("I have " + msgs.size() + " messages");
+      
       Iterator iter = msgs.iterator();
       
       while (iter.hasNext())
@@ -1155,9 +1178,12 @@
       //and send a checkrequest
       //This applies to a normal message and messages requests too
             
-      req = new PullMessagesRequest(this.nodeId, tx.getId());
-      
-      asyncSendRequest(req, fromAddress);
+      if (!msgs.isEmpty())
+      {         
+         req = new PullMessagesRequest(this.nodeId, tx.getId());
+         
+         asyncSendRequest(req, fromAddress);
+      }
    }
    
    
@@ -1544,7 +1570,9 @@
             
             ClusterRequest request = readRequest(bytes);
             
-            request.execute(DefaultClusteredPostOffice.this);
+            Object result = request.execute(DefaultClusteredPostOffice.this);
+            
+            return result;
          }
          catch (Throwable e)
          {
@@ -1552,8 +1580,7 @@
             IllegalStateException e2 = new IllegalStateException(e.getMessage());
             e2.setStackTrace(e.getStackTrace());
             throw e2;
-         }
-         return null;
+         }         
       }      
    }
 }
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicy.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -54,13 +54,16 @@
          {
             QueueStats stats = queue.getStats();
             
-            int cnt = stats.getMessageCount();
-            
-            if (cnt > maxMessages)
-            {
-               maxMessages = cnt;
+            if (stats != null)
+            {               
+               int cnt = stats.getMessageCount();
                
-               chosenQueue = queue;
+               if (cnt > maxMessages)
+               {
+                  maxMessages = cnt;
+                  
+                  chosenQueue = queue;
+               }
             }
          }
       }

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -27,7 +27,9 @@
 import java.util.Iterator;
 import java.util.List;
 
+import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.util.StreamUtils;
 
 /**
  * A PullMessagesRequest
@@ -40,6 +42,8 @@
  */
 public class PullMessagesRequest extends TransactionRequest implements ClusterTransaction
 {
+   private static final Logger log = Logger.getLogger(PullMessagesRequest.class);
+      
    private String queueName;
    
    private int numMessages;
@@ -68,12 +72,16 @@
 
    Object execute(PostOfficeInternal office) throws Throwable
    {
+      log.info("********* executign pull messages requiest");
+      
       TransactionId id = new TransactionId(nodeId, txId);
       
       if (hold)
       {         
          List dels = office.getDeliveries(queueName, numMessages);
          
+         log.info("Got a list of " + dels.size() + " deliveries");
+         
          PullMessagesResponse response = new PullMessagesResponse(dels.size());
          
          if (!dels.isEmpty())
@@ -107,7 +115,12 @@
             office.holdTransaction(id, this);
          }
          
-         return response;
+         log.info("returning response:" + response);
+         
+         //Convert to bytes since the response isn't serializable (nor do we want it to be)
+         byte[] bytes = StreamUtils.toBytes(response);
+         
+         return bytes;
       }
       else
       {
@@ -184,16 +197,26 @@
    
    public void read(DataInputStream in) throws Exception
    {
-      queueName = in.readUTF();
+      super.read(in);
       
-      numMessages = in.readInt();
+      if (hold)
+      {
+         queueName = in.readUTF();
+         
+         numMessages = in.readInt();
+      }
    }
 
    public void write(DataOutputStream out) throws Exception
    {
-      out.writeUTF(queueName);
+      super.write(out);
       
-      out.writeInt(numMessages);
+      if (hold)
+      {      
+         out.writeUTF(queueName);
+         
+         out.writeInt(numMessages);
+      }
    }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -23,6 +23,7 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -40,10 +41,14 @@
  * $Id$
  *
  */
-public class PullMessagesResponse implements Streamable
+public class PullMessagesResponse implements Streamable, Serializable
 {
    private List messages;
    
+   PullMessagesResponse()
+   {
+   }
+   
    PullMessagesResponse(int size)
    {
       messages = new ArrayList(size);
@@ -53,6 +58,11 @@
    {
       messages.add(msg);
    }
+   
+   List getMessages()
+   {
+      return messages;
+   }
 
    public void read(DataInputStream in) throws Exception
    {

Added: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/StatsSender.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.plugin.contract.MessagingComponent;
+
+/**
+ * A StatsSender
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class StatsSender implements MessagingComponent
+{
+   private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);   
+   
+   private PostOfficeInternal office;
+   
+   private boolean started;
+   
+   private Timer timer;
+   
+   private long period;
+   
+   StatsSender(PostOfficeInternal office, long period)
+   {
+      this.office = office;
+      
+      this.period = period;
+   }
+
+   public synchronized void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      //Needs to be daemon
+      timer = new Timer(true);
+      
+      //Add a random delay to prevent all timers starting at once
+      long delay = (long)(period * Math.random());
+      
+      TimerTask task = new SendStatsTimerTask();
+      
+      timer.schedule(task, delay, period);      
+   }
+
+   public synchronized void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      timer.cancel();
+      
+      timer = null;
+   }
+   
+   
+   
+   class SendStatsTimerTask extends TimerTask
+   {
+
+      public void run()
+      {
+         try
+         {
+            office.sendStats();
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to send statistics", e);
+         }
+      }
+      
+   }
+
+}

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -0,0 +1,321 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.core.plugin.base;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.naming.InitialContext;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+
+import org.jboss.jms.server.QueuedExecutorPool;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.Message;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Queue;
+import org.jboss.messaging.core.plugin.IdManager;
+import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
+import org.jboss.messaging.core.plugin.SimpleMessageStore;
+import org.jboss.messaging.core.plugin.contract.MessageStore;
+import org.jboss.messaging.core.plugin.contract.PersistenceManager;
+import org.jboss.messaging.core.plugin.contract.PostOffice;
+import org.jboss.messaging.core.plugin.postoffice.DefaultPostOffice;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.messaging.core.tx.TransactionRepository;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+import org.jboss.test.messaging.util.CoreMessageFactory;
+import org.jboss.tm.TransactionManagerService;
+
+/**
+ * 
+ * A PostOfficeTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class ClusteringTestBase extends MessagingTestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+
+   protected ServiceContainer sc;
+
+   protected IdManager im;   
+   
+   protected PersistenceManager pm;
+      
+   protected MessageStore ms;
+   
+   protected TransactionRepository tr;
+   
+   protected QueuedExecutorPool pool;
+   
+   // Constructors --------------------------------------------------
+
+   public ClusteringTestBase(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      sc = new ServiceContainer("all");
+      
+      sc.start();                
+      
+      pm =
+         new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
+                                    true, true, true, 100);      
+      pm.start();
+      
+      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
+      tr.start();
+      
+      ms = new SimpleMessageStore();
+      ms.start();
+      
+      pool = new QueuedExecutorPool(10);
+      
+      im = new IdManager("CHANNEL_ID", 10, pm);
+            
+      log.debug("setup done");
+   }
+
+   public void tearDown() throws Exception
+   {      
+      if (!ServerManagement.isRemote())
+      {
+         sc.stop();
+         sc = null;
+      }
+      pm.stop();
+      tr.stop();
+      ms.stop();
+      
+      super.tearDown();
+   }
+   
+   // Public --------------------------------------------------------     
+   
+   protected PostOffice createPostOffice() throws Exception
+   {
+      FilterFactory ff = new SimpleFilterFactory();
+      
+      DefaultPostOffice postOffice = 
+         new DefaultPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+                            null, true, "node1", "Simple", ms, pm, tr, ff, pool);
+      
+      postOffice.start();      
+      
+      return postOffice;
+   }
+   
+   protected boolean checkNoBindingData() throws Exception
+   {
+      InitialContext ctx = new InitialContext();
+
+      TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+      DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+      
+      javax.transaction.Transaction txOld = mgr.suspend();
+      mgr.begin();
+      
+      Connection conn = null;
+      
+      PreparedStatement ps = null;
+      
+      ResultSet rs = null;
+
+      try
+      {
+         conn = ds.getConnection();
+         String sql = "SELECT * FROM JMS_POSTOFFICE";
+         ps = conn.prepareStatement(sql);
+         
+         rs = ps.executeQuery();
+         
+         return rs.next();
+      }
+      finally
+      {
+         if (rs != null) rs.close();
+         
+         if (ps != null) ps.close();
+         
+         if (conn != null) conn.close();
+         
+         mgr.commit();
+
+         if (txOld != null)
+         {
+            mgr.resume(txOld);
+         }
+                  
+      } 
+   }
+   
+   protected boolean checkNoMessageData() throws Exception
+   {
+      InitialContext ctx = new InitialContext();
+
+      TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
+      DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
+      
+      javax.transaction.Transaction txOld = mgr.suspend();
+      mgr.begin();
+      
+      Connection conn = null;
+      
+      PreparedStatement ps = null;
+      
+      ResultSet rs = null;
+
+      try
+      {
+         conn = ds.getConnection();
+         String sql = "SELECT * FROM JMS_MESSAGE_REFERENCE";
+         ps = conn.prepareStatement(sql);
+         
+         rs = ps.executeQuery();
+         
+         boolean exists = rs.next();
+         
+         if (!exists)
+         {
+            rs.close();
+            
+            ps.close();
+            
+            ps = conn.prepareStatement("SELECT * FROM JMS_MESSAGE");
+            
+            rs = ps.executeQuery();
+           
+            exists = rs.next();
+         }
+         
+         return exists;
+      }
+      finally
+      {
+         if (rs != null) rs.close();
+         
+         if (ps != null) ps.close();
+         
+         if (conn != null) conn.close();
+         
+         mgr.commit();
+
+         if (txOld != null)
+         {
+            mgr.resume(txOld);
+         }
+                  
+      } 
+   }
+   
+   protected List sendMessages(String condition, boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
+   {
+      List list = new ArrayList();
+      
+      for (int i = 0; i < num; i++)
+      {         
+         Message msg = CoreMessageFactory.createCoreMessage(i + 1, persistent, null);      
+         
+         MessageReference ref = ms.reference(msg);         
+         
+         boolean routed = office.route(ref, condition, null);         
+         
+         assertTrue(routed);
+         
+         list.add(msg);
+      }
+      
+      Thread.sleep(1000);
+      
+      return list;
+   }
+   
+   protected void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
+   {
+      List msgs = receiver.getMessages();
+      assertNotNull(msgs);
+      assertEquals(1, msgs.size());
+      Message msgRec = (Message)msgs.get(0);
+      assertEquals(msg.getMessageID(), msgRec.getMessageID());
+      receiver.acknowledge(msgRec, null);
+      msgs = queue.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      receiver.clear();
+   }
+   
+   protected void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
+   {
+      List msgs = receiver.getMessages();
+      assertNotNull(msgs);
+      assertEquals(msgList.size(), msgs.size());
+      
+      for (int i = 0; i < msgList.size(); i++)
+      {
+         Message msgRec = (Message)msgs.get(i);
+         Message msgCheck = (Message)msgList.get(i);
+         assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
+         receiver.acknowledge(msgRec, null);
+      }
+      
+      msgs = queue.browse();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty()); 
+      receiver.clear();
+   }
+   
+   protected void checkEmpty(SimpleReceiver receiver) throws Throwable
+   {
+      List msgs = receiver.getMessages();
+      assertNotNull(msgs);
+      assertTrue(msgs.isEmpty());
+   }
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -54,6 +54,7 @@
 import org.jboss.test.messaging.core.SimpleFilter;
 import org.jboss.test.messaging.core.SimpleFilterFactory;
 import org.jboss.test.messaging.core.SimpleReceiver;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.jmx.ServiceContainer;
 import org.jboss.test.messaging.util.CoreMessageFactory;
@@ -71,7 +72,7 @@
  * $Id$
  *
  */
-public class DefaultPostOfficeTest extends MessagingTestCase
+public class DefaultPostOfficeTest extends ClusteringTestBase
 {
    // Constants -----------------------------------------------------
 
@@ -79,18 +80,6 @@
    
    // Attributes ----------------------------------------------------
 
-   protected ServiceContainer sc;
-
-   protected IdManager im;   
-   
-   protected PersistenceManager pm;
-      
-   protected MessageStore ms;
-   
-   protected TransactionRepository tr;
-   
-   protected QueuedExecutorPool pool;
-   
    // Constructors --------------------------------------------------
 
    public DefaultPostOfficeTest(String name)
@@ -103,40 +92,11 @@
    public void setUp() throws Exception
    {
       super.setUp();
-
-      sc = new ServiceContainer("all");
-      
-      sc.start();                
-      
-      pm =
-         new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
-                                    true, true, true, 100);      
-      pm.start();
-      
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
-      tr.start();
-      
-      ms = new SimpleMessageStore();
-      ms.start();
-      
-      pool = new QueuedExecutorPool(10);
-      
-      im = new IdManager("CHANNEL_ID", 10, pm);
-            
-      log.debug("setup done");
+     
    }
 
    public void tearDown() throws Exception
-   {      
-      if (!ServerManagement.isRemote())
-      {
-         sc.stop();
-         sc = null;
-      }
-      pm.stop();
-      tr.stop();
-      ms.stop();
-      
+   {            
       super.tearDown();
    }
    
@@ -1140,122 +1100,8 @@
       assertEquals(binding1.getQueue().isRecoverable(), binding2.getQueue().isRecoverable());
    }
    
-   protected PostOffice createPostOffice() throws Exception
-   {
-      FilterFactory ff = new SimpleFilterFactory();
-      
-      DefaultPostOffice postOffice = 
-         new DefaultPostOffice(sc.getDataSource(), sc.getTransactionManager(),
-                            null, true, "node1", "Simple", ms, pm, tr, ff, pool);
-      
-      postOffice.start();      
-      
-      return postOffice;
-   }
    
-   protected boolean checkNoBindingData() throws Exception
-   {
-      InitialContext ctx = new InitialContext();
-
-      TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
-      DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
-      
-      javax.transaction.Transaction txOld = mgr.suspend();
-      mgr.begin();
-      
-      Connection conn = null;
-      
-      PreparedStatement ps = null;
-      
-      ResultSet rs = null;
-
-      try
-      {
-         conn = ds.getConnection();
-         String sql = "SELECT * FROM JMS_POSTOFFICE";
-         ps = conn.prepareStatement(sql);
-         
-         rs = ps.executeQuery();
-         
-         return rs.next();
-      }
-      finally
-      {
-         if (rs != null) rs.close();
-         
-         if (ps != null) ps.close();
-         
-         if (conn != null) conn.close();
-         
-         mgr.commit();
-
-         if (txOld != null)
-         {
-            mgr.resume(txOld);
-         }
-                  
-      } 
-   }
    
-   protected boolean checkNoMessageData() throws Exception
-   {
-      InitialContext ctx = new InitialContext();
-
-      TransactionManager mgr = (TransactionManager)ctx.lookup(TransactionManagerService.JNDI_NAME);
-      DataSource ds = (DataSource)ctx.lookup("java:/DefaultDS");
-      
-      javax.transaction.Transaction txOld = mgr.suspend();
-      mgr.begin();
-      
-      Connection conn = null;
-      
-      PreparedStatement ps = null;
-      
-      ResultSet rs = null;
-
-      try
-      {
-         conn = ds.getConnection();
-         String sql = "SELECT * FROM JMS_MESSAGE_REFERENCE";
-         ps = conn.prepareStatement(sql);
-         
-         rs = ps.executeQuery();
-         
-         boolean exists = rs.next();
-         
-         if (!exists)
-         {
-            rs.close();
-            
-            ps.close();
-            
-            ps = conn.prepareStatement("SELECT * FROM JMS_MESSAGE");
-            
-            rs = ps.executeQuery();
-           
-            exists = rs.next();
-         }
-         
-         return exists;
-      }
-      finally
-      {
-         if (rs != null) rs.close();
-         
-         if (ps != null) ps.close();
-         
-         if (conn != null) conn.close();
-         
-         mgr.commit();
-
-         if (txOld != null)
-         {
-            mgr.resume(txOld);
-         }
-                  
-      } 
-   }
-   
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -1178,7 +1178,7 @@
          //========================
          
          log.info("******** sending");
-         List msgs = sendMessages(persistent, office1, 3, null);
+         List msgs = sendMessages("topic", persistent, office1, 3, null);
          log.info("********** sent");
          
          //n2
@@ -1211,7 +1211,7 @@
          //Send 3 messages at node2
          //========================
          
-         msgs = sendMessages(persistent, office2, 3, null);
+         msgs = sendMessages("topic", persistent, office2, 3, null);
          
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1242,7 +1242,7 @@
          //Send 3 messages at node3
          //========================
          
-         msgs = sendMessages(persistent, office3, 3, null);
+         msgs = sendMessages("topic", persistent, office3, 3, null);
          
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1281,7 +1281,7 @@
 //         * node6: 1 shared durable (shared2), 1 non durable
 //         * node7: 1 shared durable (shared2)
          
-         msgs = sendMessages(persistent, office4, 3, null);
+         msgs = sendMessages("topic", persistent, office4, 3, null);
                
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1312,7 +1312,7 @@
          //Send 3 messages at node5
          //========================
          
-         msgs = sendMessages(persistent, office5, 3, null);
+         msgs = sendMessages("topic", persistent, office5, 3, null);
              
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1343,7 +1343,7 @@
          //Send 3 messages at node6
          //========================
          
-         msgs = sendMessages(persistent, office6, 3, null);
+         msgs = sendMessages("topic", persistent, office6, 3, null);
              
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1375,7 +1375,7 @@
          //Send 3 messages at node7
          //========================
          
-         msgs = sendMessages(persistent, office7, 3, null);
+         msgs = sendMessages("topic", persistent, office7, 3, null);
 
          //n2
          checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -2101,7 +2101,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, pullPolicy, rf, 1);
+                                 5000, 5000, pullPolicy, rf, 1, 1000);
       
       postOffice.start();      
       
@@ -2110,65 +2110,7 @@
 
    // Private -------------------------------------------------------
    
-   private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
-   {
-      List list = new ArrayList();
-      
-      Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);      
-      
-      MessageReference ref = ms.reference(msg);         
-      
-      boolean routed = office.route(ref, "topic", null);         
-      
-      assertTrue(routed);
-      
-      list.add(msg);
-      
-      Thread.sleep(1000);
-      
-      return list;
-   }
    
-   private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
-   {
-      List msgs = receiver.getMessages();
-      assertNotNull(msgs);
-      assertEquals(1, msgs.size());
-      Message msgRec = (Message)msgs.get(0);
-      assertEquals(msg.getMessageID(), msgRec.getMessageID());
-      receiver.acknowledge(msgRec, null);
-      msgs = queue.browse();
-      assertNotNull(msgs);
-      assertTrue(msgs.isEmpty()); 
-      receiver.clear();
-   }
-   
-   private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
-   {
-      List msgs = receiver.getMessages();
-      assertNotNull(msgs);
-      assertEquals(msgList.size(), msgs.size());
-      
-      for (int i = 0; i < msgList.size(); i++)
-      {
-         Message msgRec = (Message)msgs.get(i);
-         Message msgCheck = (Message)msgList.get(i);
-         assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
-         receiver.acknowledge(msgRec, null);
-      }
-      
-      msgs = queue.browse();
-      assertNotNull(msgs);
-      assertTrue(msgs.isEmpty()); 
-      receiver.clear();
-   }
-   
-   private void checkEmpty(SimpleReceiver receiver) throws Throwable
-   {
-      List msgs = receiver.getMessages();
-      assertNotNull(msgs);
-      assertTrue(msgs.isEmpty());
-   }
 
    // Inner classes -------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -21,21 +21,10 @@
   */
 package org.jboss.test.messaging.core.plugin.postoffice.cluster;
 
-import java.util.ArrayList;
 import java.util.List;
 
-import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.messaging.core.FilterFactory;
-import org.jboss.messaging.core.Message;
-import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Queue;
-import org.jboss.messaging.core.plugin.IdManager;
-import org.jboss.messaging.core.plugin.JDBCPersistenceManager;
-import org.jboss.messaging.core.plugin.SimpleMessageStore;
 import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.MessageStore;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
-import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
@@ -43,14 +32,9 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
 import org.jboss.messaging.core.plugin.postoffice.cluster.NullMessagePullPolicy;
-import org.jboss.messaging.core.tx.Transaction;
-import org.jboss.messaging.core.tx.TransactionRepository;
-import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.core.SimpleFilterFactory;
 import org.jboss.test.messaging.core.SimpleReceiver;
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.jmx.ServiceContainer;
-import org.jboss.test.messaging.util.CoreMessageFactory;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -64,7 +48,7 @@
  * $Id$
  *
  */
-public class DefaultRouterTest extends MessagingTestCase
+public class DefaultRouterTest extends ClusteringTestBase
 {
    // Constants -----------------------------------------------------
 
@@ -72,18 +56,6 @@
    
    // Attributes ----------------------------------------------------
    
-   protected ServiceContainer sc;
-
-   protected IdManager im;   
-   
-   protected PersistenceManager pm;
-      
-   protected MessageStore ms;
-   
-   protected TransactionRepository tr;
-   
-   protected QueuedExecutorPool pool;
-   
    // Constructors --------------------------------------------------
 
    public DefaultRouterTest(String name)
@@ -96,40 +68,10 @@
    public void setUp() throws Exception
    {
       super.setUp();
-
-      sc = new ServiceContainer("all");
-      
-      sc.start();                
-      
-      pm =
-         new JDBCPersistenceManager(sc.getDataSource(), sc.getTransactionManager(), null,
-                                    true, true, true, 100);      
-      pm.start();
-      
-      tr = new TransactionRepository(pm, new IdManager("TRANSACTION_ID", 10, pm));
-      tr.start();
-      
-      ms = new SimpleMessageStore();
-      ms.start();
-      
-      pool = new QueuedExecutorPool(10);
-      
-      im = new IdManager("CHANNEL_ID", 10, pm);
-            
-      log.debug("setup done");
    }
 
    public void tearDown() throws Exception
    {      
-      if (!ServerManagement.isRemote())
-      {
-         sc.stop();
-         sc = null;
-      }
-      pm.stop();
-      tr.stop();
-      ms.stop();
-      
       super.tearDown();
    }
    
@@ -206,49 +148,49 @@
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);
                
-         List msgs = sendMessages(persistent, office1, 3, null);         
+         List msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkContainsAndAcknowledge(msgs, receiver1, queue1);         
          checkEmpty(receiver2);
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office1, 3, null);         
+         msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkEmpty(receiver1);
          checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office1, 3, null);         
+         msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkEmpty(receiver1);
          checkEmpty(receiver2);
          checkContainsAndAcknowledge(msgs, receiver3, queue1);                           
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office1, 3, null);         
+         msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkEmpty(receiver1);
          checkEmpty(receiver2);
          checkEmpty(receiver3);
          checkContainsAndAcknowledge(msgs, receiver4, queue1);                                    
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office1, 3, null);         
+         msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkEmpty(receiver1);
          checkEmpty(receiver2);
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkContainsAndAcknowledge(msgs, receiver5, queue1); 
          
-         msgs = sendMessages(persistent, office1, 3, null);         
+         msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkContainsAndAcknowledge(msgs, receiver1, queue1);         
          checkEmpty(receiver2);
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office1, 3, null);         
+         msgs = sendMessages("topic", persistent, office1, 3, null);         
          checkEmpty(receiver1);
          checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
          checkEmpty(receiver3);
@@ -345,21 +287,21 @@
          SimpleReceiver receiver5 = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
          queue5.add(receiver5);
                
-         List msgs = sendMessages(persistent, office2, 3, null);         
+         List msgs = sendMessages("topic", persistent, office2, 3, null);         
          checkContainsAndAcknowledge(msgs, receiver1, queue1);         
          checkEmpty(receiver2);
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office2, 3, null);         
+         msgs = sendMessages("topic", persistent, office2, 3, null);         
          checkContainsAndAcknowledge(msgs, receiver1, queue1);         
          checkEmpty(receiver2);
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office2, 3, null);         
+         msgs = sendMessages("topic", persistent, office2, 3, null);         
          checkContainsAndAcknowledge(msgs, receiver1, queue1);         
          checkEmpty(receiver2);
          checkEmpty(receiver3);
@@ -367,21 +309,21 @@
          checkEmpty(receiver5);
          
          
-         msgs = sendMessages(persistent, office3, 3, null); 
+         msgs = sendMessages("topic", persistent, office3, 3, null); 
          checkEmpty(receiver1);
          checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office3, 3, null); 
+         msgs = sendMessages("topic", persistent, office3, 3, null); 
          checkEmpty(receiver1);
          checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
          checkEmpty(receiver3);
          checkEmpty(receiver4);
          checkEmpty(receiver5);
          
-         msgs = sendMessages(persistent, office3, 3, null); 
+         msgs = sendMessages("topic", persistent, office3, 3, null); 
          checkEmpty(receiver1);
          checkContainsAndAcknowledge(msgs, receiver2, queue1);                  
          checkEmpty(receiver3);
@@ -440,7 +382,7 @@
                                  groupName,
                                  JGroupsUtil.getControlStackProperties(),
                                  JGroupsUtil.getDataStackProperties(),
-                                 5000, 5000, redistPolicy, rf, 1);
+                                 5000, 5000, redistPolicy, rf, 1, 1000);
       
       postOffice.start();      
       
@@ -449,70 +391,7 @@
 
    // Private -------------------------------------------------------
    
-   //TODO these methods are duplicated from DefaultClusteredPostOfficeTest - put in common super class or somewhere
-   //else
    
-   private List sendMessages(boolean persistent, PostOffice office, int num, Transaction tx) throws Exception
-   {
-      List list = new ArrayList();
-      
-      Message msg = CoreMessageFactory.createCoreMessage(1, persistent, null);      
-      
-      MessageReference ref = ms.reference(msg);         
-      
-      boolean routed = office.route(ref, "topic", null);         
-      
-      assertTrue(routed);
-      
-      list.add(msg);
-      
-      Thread.sleep(1000);
-      
-      return list;
-   }
-   
-   
-   private void checkContainsAndAcknowledge(Message msg, SimpleReceiver receiver, Queue queue) throws Throwable
-   {
-      List msgs = receiver.getMessages();
-      assertNotNull(msgs);
-      assertEquals(1, msgs.size());
-      Message msgRec = (Message)msgs.get(0);
-      assertEquals(msg.getMessageID(), msgRec.getMessageID());
-      receiver.acknowledge(msgRec, null);
-      msgs = queue.browse();
-      assertNotNull(msgs);
-      assertTrue(msgs.isEmpty()); 
-      receiver.clear();
-   }
-   
-   private void checkContainsAndAcknowledge(List msgList, SimpleReceiver receiver, Queue queue) throws Throwable
-   {
-      List msgs = receiver.getMessages();
-      assertNotNull(msgs);
-      assertEquals(msgList.size(), msgs.size());
-      
-      for (int i = 0; i < msgList.size(); i++)
-      {
-         Message msgRec = (Message)msgs.get(i);
-         Message msgCheck = (Message)msgList.get(i);
-         assertEquals(msgCheck.getMessageID(), msgRec.getMessageID());
-         receiver.acknowledge(msgRec, null);
-      }
-      
-      msgs = queue.browse();
-      assertNotNull(msgs);
-      assertTrue(msgs.isEmpty()); 
-      receiver.clear();
-   }
-   
-   private void checkEmpty(SimpleReceiver receiver) throws Throwable
-   {
-      List msgs = receiver.getMessages();
-      assertNotNull(msgs);
-      assertTrue(msgs.isEmpty());
-   }
-
    // Inner classes -------------------------------------------------
    
 

Added: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-22 19:46:49 UTC (rev 1352)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java	2006-09-22 19:47:06 UTC (rev 1353)
@@ -0,0 +1,270 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.test.messaging.core.plugin.postoffice.cluster;
+
+import java.util.List;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.FilterFactory;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.Binding;
+import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultMessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultRouterFactory;
+import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.core.SimpleFilterFactory;
+import org.jboss.test.messaging.core.plugin.base.ClusteringTestBase;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+
+public class RedistributionTest extends ClusteringTestBase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public RedistributionTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {      
+      super.tearDown();
+   }
+   
+   public void testRedist() throws Throwable
+   {
+      redistTest(true);
+   }
+   
+   /*
+    * 
+    * 
+    * 
+    */
+   public void redistTest(boolean persistent) throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      ClusteredPostOffice office3 = null;
+      
+      ClusteredPostOffice office4 = null;
+      
+      ClusteredPostOffice office5 = null;
+          
+      try
+      {   
+         office1 = createClusteredPostOffice("node1", "testgroup");
+         
+         office2 = createClusteredPostOffice("node2", "testgroup");
+         
+         office3 = createClusteredPostOffice("node3", "testgroup");
+         
+         office4 = createClusteredPostOffice("node4", "testgroup");
+         
+         office5 = createClusteredPostOffice("node5", "testgroup");
+         
+         LocalClusteredQueue queue1 = new LocalClusteredQueue(office1, "node1", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         Binding binding1 = office1.bindClusteredQueue("queue1", queue1);
+         PullingReceiver receiver1 = new PullingReceiver();
+         queue1.add(receiver1);
+         
+         LocalClusteredQueue queue2 = new LocalClusteredQueue(office2, "node2", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         Binding binding2 = office2.bindClusteredQueue("queue1", queue2);
+         PullingReceiver receiver2 = new PullingReceiver();
+         queue2.add(receiver2);
+         
+         LocalClusteredQueue queue3 = new LocalClusteredQueue(office3, "node3", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         Binding binding3 = office3.bindClusteredQueue("queue1", queue3);
+         PullingReceiver receiver3 = new PullingReceiver();
+         queue3.add(receiver3);
+         
+         LocalClusteredQueue queue4 = new LocalClusteredQueue(office4, "node4", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         Binding binding4 = office4.bindClusteredQueue("queue1", queue4);
+         PullingReceiver receiver4 = new PullingReceiver();
+         queue4.add(receiver4);
+         
+         LocalClusteredQueue queue5 = new LocalClusteredQueue(office5, "node5", "queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);         
+         Binding binding5 = office5.bindClusteredQueue("queue1", queue5);
+         PullingReceiver receiver5 = new PullingReceiver();
+         queue5.add(receiver5);
+         
+         //Send 30 messages to each queue
+         this.sendMessages("queue1", persistent, office1, 30, null);
+         this.sendMessages("queue1", persistent, office2, 30, null);
+         this.sendMessages("queue1", persistent, office3, 30, null);
+         this.sendMessages("queue1", persistent, office4, 30, null);
+         this.sendMessages("queue1", persistent, office5, 30, null);
+         
+         Thread.sleep(500);
+         
+         List msgs = queue1.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue2.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue3.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue4.browse();
+         assertEquals(30, msgs.size());
+         
+         msgs = queue5.browse();
+         assertEquals(30, msgs.size());
+         
+         //Consume all the messages from queue 3
+         for (int i = 0; i < 30; i++)
+         {
+            Delivery del = receiver3.getDelivery();
+            log.info("Got delivery: " + del.getReference().getMessageID());
+            del.acknowledge(null);
+            queue3.deliver(false);
+         }
+         
+         msgs = queue3.browse();
+         assertEquals(0, msgs.size());
+         
+         queue3.deliver(false);
+         
+         Delivery del = receiver3.getDelivery();
+         
+         log.info("delivery is " + del);
+         
+      }
+      finally
+      { 
+         if (office1 != null)
+         {
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {            
+            office2.stop();
+         }
+         
+         if (office3 != null)
+         {
+            office3.stop();
+         }
+         
+         if (office4 != null)
+         {            
+            office4.stop();
+         }
+         
+         if (office5 != null)
+         {
+            office5.stop();
+         }
+      }
+   }
+   
+   class PullingReceiver implements Receiver
+   {
+      private Delivery del;
+
+      public synchronized Delivery handle(DeliveryObserver observer, MessageReference reference, Transaction tx)
+      {
+         if (del != null)
+         {
+            return null;
+         }
+         
+         del = new SimpleDelivery(observer, reference, false);
+         
+         this.notify();
+         
+         return del;
+      }
+      
+      public synchronized Delivery getDelivery()
+      {
+         while (del == null)
+         {
+            try
+            {
+               this.wait();
+            }
+            catch (InterruptedException e)
+            {               
+            }
+         }
+         Delivery ret = del;
+         del = null;
+         return ret;
+      }
+      
+   }
+   
+   protected ClusteredPostOffice createClusteredPostOffice(String nodeId, String groupName) throws Exception
+   {
+      MessagePullPolicy pullPolicy = new DefaultMessagePullPolicy();
+      
+      FilterFactory ff = new SimpleFilterFactory();
+      
+      ClusterRouterFactory rf = new DefaultRouterFactory();
+      
+      DefaultClusteredPostOffice postOffice = 
+         new DefaultClusteredPostOffice(sc.getDataSource(), sc.getTransactionManager(),
+                                 null, true, nodeId, "Clustered", ms, pm, tr, ff, pool,
+                                 groupName,
+                                 JGroupsUtil.getControlStackProperties(),
+                                 JGroupsUtil.getDataStackProperties(),
+                                 5000, 5000, pullPolicy, rf, 1, 1000);
+      
+      postOffice.start();      
+      
+      return postOffice;
+   }
+   
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   
+}
+
+
+




More information about the jboss-cvs-commits mailing list