[jboss-cvs] JBoss Messaging SVN: r1755 - in branches/Branch_Client_Failover_Experiment: src/etc/xmdesc src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/clustering/base tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 10 17:16:20 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-12-10 17:16:12 -0500 (Sun, 10 Dec 2006)
New Revision: 1755

Added:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
Log:
introducing a group management interface (Peer)

Modified: branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/etc/xmdesc/ClusteredPostOffice-xmbean.xml	2006-12-10 22:16:12 UTC (rev 1755)
@@ -17,6 +17,12 @@
       <type>org.jboss.messaging.core.plugin.contract.MessagingComponent</type>
    </attribute>
 
+   <attribute access="read-only" getMethod="getNodeIDView">
+      <description>The set containing cluster group members' node IDs</description>
+      <name>NodeIDView</name>
+      <type>java.util.Set</type>
+   </attribute>
+
    <attribute access="read-write" getMethod="getDataSource" setMethod="setDataSource">
       <description>The JNDI name of the DataSource used by this ChannelMapper instance</description>
       <name>DataSource</name>

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -32,7 +32,6 @@
 import org.jboss.jms.server.Version;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
-import org.jboss.remoting.ConnectionListener;
 
 import EDU.oswego.cs.dl.util.concurrent.SyncSet;
 import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/endpoint/ConnectionFactoryEndpoint.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -40,7 +40,9 @@
    ConnectionDelegate createConnectionDelegate(String username, String password)
       throws JMSException;
    
-   ConnectionStatus createFailoverConnectionDelegate(String username, String password, int failedNodeId)
+   ConnectionStatus createFailoverConnectionDelegate(String username,
+                                                     String password, 
+                                                     int failedNodeId)
       throws JMSException;
    
    byte[] getClientAOPConfig() throws JMSException;

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -36,66 +36,78 @@
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
 import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultFailoverMapper;
 import org.jboss.messaging.core.plugin.postoffice.cluster.MessagePullPolicy;
+import org.jboss.messaging.core.plugin.postoffice.cluster.Peer;
 import org.jboss.messaging.core.tx.TransactionRepository;
 import org.w3c.dom.Element;
 
+import java.util.Set;
+import java.util.Collections;
+
 /**
  * A ClusteredPostOfficeService
  * 
  * MBean wrapper for a clustered post office
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
  *
  */
-public class ClusteredPostOfficeService extends JDBCServiceSupport
+public class ClusteredPostOfficeService extends JDBCServiceSupport implements Peer
 {
-   private DefaultClusteredPostOffice postOffice;
-   
-   private ObjectName serverPeerObjectName;
-   
-   private String officeName;
-   
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
    private boolean started;
-   
+
    private Element syncChannelConfig;
-   
    private Element asyncChannelConfig;
-   
+
+   private ObjectName serverPeerObjectName;
+
+   private String officeName;
    private long stateTimeout = 5000;
-   
    private long castTimeout = 5000;
-     
    private String groupName;
-   
    private long statsSendPeriod = 1000;
-   
    private String clusterRouterFactory;
-   
    private String messagePullPolicy;
-   
-   // Constructors --------------------------------------------------------
-   
-   public ClusteredPostOfficeService()
-   {      
-   }
-   
-   // ServerPlugin implementation ------------------------------------------
-   
+
+   private DefaultClusteredPostOffice postOffice;
+
+   // Constructors --------------------------------------------------
+
+   // ServerPlugin implementation -----------------------------------
+
    public MessagingComponent getInstance()
    {
       return postOffice;
    }
-   
-   // MBean attributes -----------------------------------------------------
-   
+
+   // Peer implementation -------------------------------------------
+
+   public Set getNodeIDView()
+   {
+      if (postOffice == null)
+      {
+         return Collections.EMPTY_SET;
+      }
+
+      return postOffice.getNodeIDView();
+   }
+
+   // MBean attributes ----------------------------------------------
+
    public synchronized ObjectName getServerPeer()
    {
       return serverPeerObjectName;
    }
-   
+
    public synchronized void setServerPeer(ObjectName on)
    {
       if (started)
@@ -105,12 +117,12 @@
       }
       this.serverPeerObjectName = on;
    }
-   
+
    public synchronized String getPostOfficeName()
    {
       return officeName;
    }
-   
+
    public synchronized void setPostOfficeName(String name)
    {
       if (started)
@@ -120,7 +132,7 @@
       }
       this.officeName = name;
    }
-   
+
    public void setSyncChannelConfig(Element config) throws Exception
    {
       syncChannelConfig = config;
@@ -130,7 +142,7 @@
    {
       return syncChannelConfig;
    }
-   
+
    public void setAsyncChannelConfig(Element config) throws Exception
    {
       asyncChannelConfig = config;
@@ -140,62 +152,62 @@
    {
       return asyncChannelConfig;
    }
-   
+
    public void setStateTimeout(long timeout)
    {
       this.stateTimeout = timeout;
    }
-   
+
    public long getStateTimeout()
    {
       return stateTimeout;
    }
-   
+
    public void setCastTimeout(long timeout)
    {
       this.castTimeout = timeout;
    }
-   
+
    public long getCastTimeout()
    {
       return castTimeout;
    }
-   
+
    public void setGroupName(String groupName)
    {
       this.groupName = groupName;
    }
-   
+
    public String getGroupName()
    {
       return groupName;
    }
-   
+
    public void setStatsSendPeriod(long period)
    {
       this.statsSendPeriod = period;
    }
-   
+
    public long getStatsSendPeriod()
    {
       return statsSendPeriod;
    }
-   
+
    public String getClusterRouterFactory()
    {
       return clusterRouterFactory;
    }
-   
+
    public String getMessagePullPolicy()
    {
       return messagePullPolicy;
    }
-   
+
    public void setClusterRouterFactory(String clusterRouterFactory)
    {
       this.clusterRouterFactory = clusterRouterFactory;
    }
-   
+
    public void setMessagePullPolicy(String messagePullPolicy)
    {
       this.messagePullPolicy = messagePullPolicy;
@@ -205,90 +217,94 @@
    {
       return postOffice.printBindingInformation();
    }
-   
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
    // ServiceMBeanSupport overrides ---------------------------------
-   
+
    protected synchronized void startService() throws Exception
    {
       if (started)
       {
          throw new IllegalStateException("Service is already started");
       }
-      
+
       super.startService();
-      
+
       try
-      {  
+      {
          TransactionManager tm = getTransactionManagerReference();
-                           
+
          ServerPeer serverPeer = (ServerPeer)server.getAttribute(serverPeerObjectName, "Instance");
-         
          MessageStore ms = serverPeer.getMessageStore();
-         
          TransactionRepository tr = serverPeer.getTxRepository();
-         
          PersistenceManager pm = serverPeer.getPersistenceManagerInstance();
-         
          QueuedExecutorPool pool = serverPeer.getQueuedExecutorPool();
-                  
          int nodeId = serverPeer.getServerPeerID();
-         
+
          Class clazz = Class.forName(messagePullPolicy);
-         
          MessagePullPolicy pullPolicy = (MessagePullPolicy)clazz.newInstance();
-         
          clazz = Class.forName(clusterRouterFactory);
-         
+
          ClusterRouterFactory rf = (ClusterRouterFactory)clazz.newInstance();
-         
+
          FilterFactory ff = new SelectorFactory();
-         
          FailoverMapper mapper = new DefaultFailoverMapper();
 
-         postOffice =  new DefaultClusteredPostOffice(ds, tm, sqlProperties, createTablesOnStartup,
+         postOffice =  new DefaultClusteredPostOffice(ds, tm, sqlProperties,
+                                                      createTablesOnStartup,
                                                       nodeId, officeName, ms,
                                                       pm, tr, ff, pool,
                                                       groupName,
-                                                      syncChannelConfig, asyncChannelConfig,
+                                                      syncChannelConfig,
+                                                      asyncChannelConfig,
                                                       stateTimeout, castTimeout,
                                                       pullPolicy, rf,
                                                       mapper,
                                                       statsSendPeriod);
-         
+
          postOffice.start();
-         
-         started = true;         
+
+         started = true;
       }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
-      } 
+      }
    }
-   
+
    protected void stopService() throws Exception
    {
       if (!started)
       {
          throw new IllegalStateException("Service is not started");
       }
-      
-      super.stopService();      
-      
+
+      super.stopService();
+
       try
-      {      
+      {
          postOffice.stop();
-         
+
          postOffice = null;
-               
+
          started = false;
-         
+
          log.debug(this + " stopped");
       }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMXInvocation(t, this + " startService");
-      } 
+      }
    }
-      
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
 }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -25,6 +25,7 @@
 
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
+import org.jboss.messaging.core.plugin.postoffice.cluster.Peer;
 
 /**
  * 
@@ -37,7 +38,7 @@
  * $Id$
  *
  */
-public interface ClusteredPostOffice extends PostOffice
+public interface ClusteredPostOffice extends PostOffice, Peer
 {
    /**
     * Bind a queue to the post office under a specific condition

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -92,64 +92,64 @@
    implements ClusteredPostOffice, PostOfficeInternal, Replicator
 {
    private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
-   
+
    // Key for looking up node id -> address info mapping from replicated data
    public static final String ADDRESS_INFO_KEY = "address_info";
-   
+
    // Key for looking up node id -> failed over for node id mapping from replicated data
    public static final String FAILED_OVER_FOR_KEY = "failed_over_for";
-   
+
    //Used for failure testing
    private boolean failBeforeCommit;
-   
+
    private boolean failAfterCommit;
-   
+
    private boolean failHandleResult;
    //End of failure testing attributes
-     
+
    private boolean trace = log.isTraceEnabled();
 
    private Channel syncChannel;
-   
+
    private Channel asyncChannel;
-   
+
    private String groupName;
-   
+
    private MessageDispatcher controlMessageDispatcher;
-   
+
    private Object setStateLock = new Object();
-   
+
    private boolean stateSet;
-   
+
    private View currentView;
-   
-   private Map replicatedData;   
-   
+
+   private Map replicatedData;
+
    private Set replicationListeners;
-      
+
    private Map holdingArea;
-   
+
    //Map < node id , failover node id>
    private Map failoverMap;
-   
+
    private Set leftSet;
-   
+
    private Element syncChannelConfigElement;
-   
+
    private Element asyncChannelConfigElement;
-   
+
    private String syncChannelConfig;
-   
+
    private String asyncChannelConfig;
-   
+
    private long stateTimeout;
-   
+
    private long castTimeout;
-   
+
    private MessagePullPolicy messagePullPolicy;
-   
+
    private ClusterRouterFactory routerFactory;
-   
+
    private FailoverMapper failoverMapper;
 
    private Map routerMap;
@@ -159,11 +159,11 @@
    private Map failedBindings;
 
    private StatsSender statsSender;
-   
+
    private ReplicationListener nodeAddressMapListener;
 
    private boolean started;
-     
+
    /*
     * Constructor using Element for configuration
     */
@@ -191,11 +191,11 @@
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
            rf, failoverMapper, statsSendPeriod);
-      
+
       this.syncChannelConfigElement = syncChannelConfig;
       this.asyncChannelConfigElement = asyncChannelConfig;
    }
-     
+
    /*
     * Constructor using String for configuration
     */
@@ -218,7 +218,7 @@
                                      ClusterRouterFactory rf,
                                      FailoverMapper failoverMapper,
                                      long statsSendPeriod) throws Exception
-   {            
+   {
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
            rf, failoverMapper, statsSendPeriod);
@@ -247,50 +247,50 @@
    {
       super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
              filterFactory, pool);
-                   
+
       this.groupName = groupName;
-      
+
       this.stateTimeout = stateTimeout;
-      
+
       this.castTimeout = castTimeout;
-      
+
       this.messagePullPolicy = redistributionPolicy;
-      
+
       this.routerFactory = rf;
-      
+
       this.failoverMapper = failoverMapper;
-      
+
       routerMap = new HashMap();
 
       failedBindings = new LinkedHashMap();
-      
+
       statsSender = new StatsSender(this, statsSendPeriod);
-      
+
       holdingArea = new HashMap();
-      
-      replicatedData = new HashMap();      
-      
+
+      replicatedData = new HashMap();
+
       replicationListeners = new LinkedHashSet();
-      
+
       failoverMap = new LinkedHashMap();
-      
+
       leftSet = new HashSet();
    }
 
    // MessagingComponent overrides
    // --------------------------------------------------------------
-   
+
    public synchronized void start() throws Exception
-   {    
+   {
       if (started)
       {
          log.warn("Attempt to start() but " + this + " is already started");
       }
 
       if (trace) { log.trace(this + " starting"); }
-      
+
       if (syncChannelConfigElement != null)
-      {        
+      {
          this.syncChannel = new JChannel(syncChannelConfigElement);
          this.asyncChannel = new JChannel(asyncChannelConfigElement);
       }
@@ -299,48 +299,48 @@
          this.syncChannel = new JChannel(syncChannelConfig);
          this.asyncChannel = new JChannel(asyncChannelConfig);
       }
-      
+
       // We don't want to receive local messages on any of the channels
       syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-      
+
       asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-      
+
       MessageListener cml = new ControlMessageListener();
       MembershipListener ml = new ControlMembershipListener();
       RequestHandler rh = new PostOfficeRequestHandler();
-      
+
       //Register as a listener for nodeid-adress mapping events
       nodeAddressMapListener = new NodeAddressMapListener();
-      
+
       registerListener(nodeAddressMapListener);
 
       this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
 
       Receiver r = new DataReceiver();
-      
+
       asyncChannel.setReceiver(r);
 
       syncChannel.connect(groupName);
-      
+
       asyncChannel.connect(groupName);
-      
+
       super.start();
-            
+
       Address syncAddress = syncChannel.getLocalAddress();
-      
+
       Address asyncAddress = asyncChannel.getLocalAddress();
 
       PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
-      
+
       put(ADDRESS_INFO_KEY, info);
 
       statsSender.start();
-      
+
       started = true;
 
       log.debug(this + " started");
    }
-   
+
    public synchronized void stop(boolean sendNotification) throws Exception
    {
       if (!started)
@@ -348,41 +348,82 @@
          log.warn("Attempt to stop() but " + this + " is not started");
       }
       else
-      {   
+      {
          syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
-         
+
          super.stop(sendNotification);
-         
+
          unregisterListener(nodeAddressMapListener);
-         
+
          statsSender.stop();
-            
+
          syncChannel.close();
-         
+
          asyncChannel.close();
-         
+
          started = false;
-         
+
          if (trace) { log.trace("Stopped " + this); }
       }
-   }  
-   
-   // PostOffice implementation ---------------------------------------        
+   }
 
+   // Peer implementation -------------------------------------------
+
+   public Set getNodeIDView()
+   {
+      if (syncChannel == null)
+      {
+         return Collections.EMPTY_SET;
+      }
+
+      Map addressInfo = null;
+
+      synchronized(replicatedData)
+      {
+         addressInfo = new HashMap((Map)replicatedData.get(ADDRESS_INFO_KEY));
+      }
+
+      Set nodeIDView = null;
+
+      for (Iterator i = syncChannel.getView().getMembers().iterator(); i.hasNext(); )
+      {
+         if (nodeIDView == null)
+         {
+            nodeIDView = new HashSet();
+         }
+
+         Address addr = (Address)i.next();
+
+         for(Iterator j = addressInfo.entrySet().iterator(); j.hasNext(); )
+         {
+            Map.Entry entry = (Map.Entry)j.next();
+
+            if (((PostOfficeAddressInfo)(entry.getValue())).getSyncChannelAddress().equals(addr))
+            {
+               nodeIDView.add(entry.getKey());
+            }
+         }
+      }
+
+      return nodeIDView;
+   }
+
+   // ClusteredPostOffice implementation ----------------------------
+
    public Binding bindClusteredQueue(String condition, LocalClusteredQueue queue) throws Exception
    {
       if (trace)
       {
          log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition);
       }
-        
+
       if (queue.getNodeId() != this.currentNodeId)
       {
           log.warn("queue.getNodeId is not this node");
          //throw new IllegalArgumentException("Queue node id does not match office node id");
          // todo what to do when HA failing?
       }
-      
+
       Binding binding = (Binding)super.bindQueue(condition, queue);
 
       sendBindRequest(condition, queue, binding);
@@ -406,64 +447,64 @@
       {
          log.trace(this.currentNodeId + " unbind clustered queue: " + queueName);
       }
-      
+
       Binding binding = (Binding)super.unbindQueue(queueName);
-      
+
       UnbindRequest request = new UnbindRequest(this.currentNodeId, queueName);
-      
+
       syncSendRequest(request);
-      
+
       return binding;
    }
-   
+
    public boolean route(MessageReference ref, String condition, Transaction tx) throws Exception
    {
       if (trace)
       {
          log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
       }
-      
+
       //debug
       try
       {
          TextMessage tm = (TextMessage)ref.getMessage();
-         
+
          log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
       }
       catch (Exception e)
       {
          e.printStackTrace();
       }
-      
+
       if (ref == null)
       {
          throw new IllegalArgumentException("Message reference is null");
       }
-      
+
       if (condition == null)
       {
          throw new IllegalArgumentException("Condition is null");
       }
-      
+
       boolean routed = false;
-      
+
       lock.readLock().acquire();
-   
+
       try
-      {      
+      {
          ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
-         
+
          boolean startInternalTx = false;
-         
+
          int lastNodeId = -1;
-         
+
          if (cb != null)
          {
             if (tx == null && ref.isReliable())
-            {                
+            {
                if (!(cb.getDurableCount() == 0 || (cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1)))
                {
-                  // When routing a persistent message without a transaction then we may need to start an 
+                  // When routing a persistent message without a transaction then we may need to start an
                   // internal transaction in order to route it.
                   // This is so we can guarantee the message is delivered to all or none of the subscriptions.
                   // We need to do this if there is anything other than
@@ -474,45 +515,45 @@
                      log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
                   }
                }
-            }                        
-            
+            }
+
             if (startInternalTx)
             {
                tx = tr.createTransaction();
             }
-                
+
             int numberRemote = 0;
-            
+
             Map queueNameNodeIdMap = null;
-            
+
             long lastChannelId = -1;
-            
+
             Collection routers = cb.getRouters();
 
             Iterator iter = routers.iterator();
-                     
+
             while (iter.hasNext())
             {
                ClusterRouter router = (ClusterRouter)iter.next();
-               
+
                Delivery del = router.handle(null, ref, tx);
-               
+
                if (del != null && del.isSelectorAccepted())
                {
                   routed = true;
-               
+
                   ClusteredQueue queue = (ClusteredQueue)del.getObserver();
-                  
+
                   if (trace)
                   {
                      log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
                                queue.getNodeId() + " local:" + queue.isLocal());
-                     
+
                   }
-                  
+
                   log.info(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
                            queue.getNodeId() + " local:" + queue.isLocal());
-                  
+
                   if (router.numberOfReceivers() > 1)
                   {
                      //We have now chosen which one will receive the message so we need to add this
@@ -522,26 +563,26 @@
                      {
                         queueNameNodeIdMap = new HashMap();
                      }
-                     
+
                      queueNameNodeIdMap.put(queue.getName(), new Integer(queue.getNodeId()));
                   }
-                  
+
                   if (!queue.isLocal())
                   {
                      //We need to send the message remotely
                      numberRemote++;
-                     
-                     lastNodeId = queue.getNodeId();                                                               
-                                          
+
+                     lastNodeId = queue.getNodeId();
+
                      lastChannelId = queue.getChannelID();
                   }
                }
             }
-            
+
             //Now we've sent the message to any local queues, we might also need
             //to send the message to the other office instances on the cluster if there are
             //queues on those nodes that need to receive the message
-            
+
             //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
             //of the number remote may be on the same node, so we could end up multicasting
             //when unicast would do
@@ -552,28 +593,28 @@
                   if (numberRemote == 1)
                   {
                      if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
-                     
-                     //Unicast - only one node is interested in the message                                        
+
+                     //Unicast - only one node is interested in the message
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
                   }
                   else
                   {
                      if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
-                     
+
                      //Multicast - more than one node is interested
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
-                  }                                 
+                  }
                }
                else
                {
                   CastMessagesCallback callback = (CastMessagesCallback)tx.getCallback(this);
-                  
+
                   if (callback == null)
                   {
                      callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
-                     
+
                      //This callback MUST be executed first
-                     
+
                      //Execution order is as follows:
                      //Before commit:
                      //1. Cast messages across network - get added to holding area (if persistent) on receiving
@@ -583,49 +624,49 @@
                      //1. Cast commit message across network
                      tx.addFirstCallback(callback, this);
                   }
-                      
+
                   callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap,
                                       numberRemote == 1 ? lastNodeId : -1,
-                                      lastChannelId);    
+                                      lastChannelId);
                }
             }
-                                                
+
             if (startInternalTx)
-            {               
+            {
                tx.commit();
                if (trace) { log.trace("Committed internal transaction"); }
             }
          }
       }
       finally
-      {                  
+      {
          lock.readLock().release();
       }
-         
-      return routed; 
+
+      return routed;
    }
-   
+
    public boolean isLocal()
    {
       return false;
    }
-   
+
    public Collection listAllBindingsForCondition(String condition) throws Exception
    {
       return listBindingsForConditionInternal(condition, false);
    }
-   
+
    public int getFailoverNodeID(int nodeId)
    {
       synchronized (failoverMap)
-      {         
+      {
          Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
-         
+
          if (failoverNode == null)
          {
             return nodeId;
          }
-         
+
          return failoverNode.intValue();
       }
    }
@@ -636,13 +677,13 @@
    }
 
    // Replicator implementation --------------------------------------------------------------------------
-   
+
    public Map get(Serializable key) throws Exception
    {
       synchronized (replicatedData)
-      {         
+      {
          Map m = (Map)replicatedData.get(key);
-            
+
          return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
       }
    }
@@ -650,20 +691,20 @@
    public void put(Serializable key, Serializable replicant) throws Exception
    {
       putReplicantLocally(currentNodeId, key, replicant);
-      
+
       PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
-      
+
       syncSendRequest(request);
    }
-   
+
    public boolean remove(Serializable key) throws Exception
    {
       if (removeReplicantLocally(this.currentNodeId, key))
-      {      
+      {
          RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
-         
+
          syncSendRequest(request);
-         
+
          return true;
       }
       else
@@ -671,7 +712,7 @@
          return false;
       }
    }
-   
+
    public void registerListener(ReplicationListener listener)
    {
       synchronized (replicationListeners)
@@ -683,22 +724,22 @@
          replicationListeners.add(listener);
       }
    }
-   
+
    public void unregisterListener(ReplicationListener listener)
    {
       synchronized (replicationListeners)
       {
          boolean removed = replicationListeners.remove(listener);
-         
+
          if (!removed)
          {
             throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
          }
       }
    }
-    
+
    // PostOfficeInternal implementation ------------------------------------------------------------------
-   
+
    public void handleNodeLeft(int nodeId) throws Exception
    {
       synchronized (leftSet)
@@ -714,18 +755,18 @@
       throws Exception
    {
       synchronized (replicatedData)
-      {         
+      {
          Map m = (Map)replicatedData.get(key);
-         
+
          if (m == null)
          {
             m = new LinkedHashMap();
-            
+
             replicatedData.put(key, m);
          }
-         
+
          m.put(new Integer(originatorNodeID), replicant);
-         
+
          notifyListeners(key, m, true, originatorNodeID);
       }
    }
@@ -738,31 +779,31 @@
       synchronized (replicatedData)
       {
          log.info(this.currentNodeId + " removing key " + key + " from node " + originatorNodeID);
-         
+
          Map m = (Map)replicatedData.get(key);
-         
+
          if (m == null)
          {
             return false;
          }
-         
+
          Object obj = m.remove(new Integer(originatorNodeID));
-         
+
          if (obj == null)
          {
             return false;
          }
-         
+
          if (m.isEmpty())
          {
             replicatedData.remove(key);
-         } 
+         }
          notifyListeners(key, m, false, originatorNodeID);
-      
-         return true; 
+
+         return true;
       }
    }
-     
+
    /*
     * Called when another node adds a binding
     */
@@ -771,74 +812,74 @@
       throws Exception
    {
       lock.writeLock().acquire();
-    
+
       if (trace)
       {
          log.info(this.currentNodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
       }
-            
+
       try
-      {                     
+      {
          //Sanity
 
          if (!knowAboutNodeId(nodeId))
          {
             throw new IllegalStateException("Don't know about node id: " + nodeId);
          }
-         
+
          // We currently only allow one binding per name per node
          Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-         
+
          Binding binding = null;
-         
+
          if (nameMap != null)
          {
             binding = (Binding)nameMap.get(queueName);
          }
-         
+
          if (binding != null && failed)
          {
             throw new IllegalArgumentException(this.currentNodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
          }
-            
+
          binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
-         
-         addBinding(binding);         
+
+         addBinding(binding);
       }
       finally
       {
          lock.writeLock().release();
       }
    }
-   
+
    /*
     * Called when another node removes a binding
     */
    public void removeBindingFromCluster(int nodeId, String queueName) throws Exception
    {
       lock.writeLock().acquire();
-      
+
       if (trace)
       {
-         log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName);        
+         log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName);
       }
-      
+
       try
-      {         
+      {
          // Sanity
          if (!knowAboutNodeId(nodeId))
          {
             throw new IllegalStateException("Don't know about node id: " + nodeId);
          }
-         
-         removeBinding(nodeId, queueName);         
+
+         removeBinding(nodeId, queueName);
       }
       finally
       {
          lock.writeLock().release();
       }
    }
-   
+
    public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKey,
                                 Map queueNameNodeIdMap) throws Exception
    {
@@ -847,14 +888,14 @@
          log.trace(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
                   routingKey + " map " + queueNameNodeIdMap);
       }
-      
+
       log.info(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
                routingKey + " map " + queueNameNodeIdMap);
-      
-      
-            
-      lock.readLock().acquire();  
 
+
+
+      lock.readLock().acquire();
+
       // Need to reference the message
       MessageReference ref = null;
       try
@@ -864,54 +905,54 @@
             // It will already have been persisted on the sender's side
             message.setPersisted(true);
          }
-         
+
          ref = ms.reference(message);
-              
+
          // We route on the condition
          DefaultClusteredBindings cb = (DefaultClusteredBindings)conditionMap.get(routingKey);
-         
+
          if (cb != null)
-         {                      
+         {
             Collection bindings = cb.getAllBindings();
-            
+
             Iterator iter = bindings.iterator();
-            
+
             while (iter.hasNext())
             {
                Binding binding = (Binding)iter.next();
-                                                     
+
                if (binding.getNodeId() == this.currentNodeId)
-               {  
+               {
                   boolean handle = true;
-                  
+
                   if (queueNameNodeIdMap != null)
-                  {           
+                  {
                      Integer in = (Integer)queueNameNodeIdMap.get(binding.getQueue().getName());
-                     
+
                      //When there are more than one queues with the same name across the cluster we only
                      //want to chose one of them
-                     
+
                      if (in != null)
                      {
                         handle = in.intValue() == currentNodeId;
                      }
                   }
-                  
+
                   if (handle)
-                  {                     
+                  {
                      //It's a local binding so we pass the message on to the subscription
-                     
+
                      LocalClusteredQueue queue = (LocalClusteredQueue)binding.getQueue();
-                     
-                     Delivery del = queue.handleFromCluster(ref);    
-                     
+
+                     Delivery del = queue.handleFromCluster(ref);
+
                      if (trace)
                      {
                         log.trace(this.currentNodeId + " queue " + queue.getName() + " handled reference from cluster " + del);
                      }
                   }
                }
-            }                          
+            }
          }
       }
       finally
@@ -923,42 +964,42 @@
          lock.readLock().release();
       }
    }
-   
+
    /*
     * Multicast a message to all members of the group
     */
    public void asyncSendRequest(ClusterRequest request) throws Exception
-   {     
+   {
       if (trace) { log.trace(this.currentNodeId + " sending asynch request to group, request: " + request); }
-      
+
       byte[] bytes = writeRequest(request);
-              
+
       asyncChannel.send(new Message(null, null, bytes));
    }
-   
+
    /*
     * Unicast a message to one member of the group
     */
    public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
-   {               
+   {
       if (trace) { log.trace(this.currentNodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
-      
+
       Address address = this.getAddressForNodeId(nodeId, false);
-      
+
       if (trace) { log.trace(this.currentNodeId + " sending to address " + address); }
-      
+
       if (address == null)
       {
          throw new IllegalArgumentException("Cannot find address for node " + nodeId);
       }
-      
+
       byte[] bytes = writeRequest(request);
-            
+
       Message m = new Message(address, null, bytes);
-      
-      asyncChannel.send(m);      
+
+      asyncChannel.send(m);
    }
-   
+
    /*
     * We put the transaction in the holding area
     */
@@ -967,82 +1008,82 @@
       synchronized (holdingArea)
       {
          holdingArea.put(id, tx);
-         
+
          if (trace) { log.trace(this.currentNodeId + " added transaction " + tx + " to holding area with id " + id); }
-      } 
+      }
    }
-   
+
    public void commitTransaction(TransactionId id) throws Throwable
    {
       if (trace) { log.trace(this.currentNodeId + " committing transaction " + id ); }
-      
+
       ClusterTransaction tx = null;
-        
+
       synchronized (holdingArea)
       {
-         tx = (ClusterTransaction)holdingArea.remove(id);                
+         tx = (ClusterTransaction)holdingArea.remove(id);
       }
-      
+
       if (tx == null)
       {
          throw new IllegalStateException("Cannot find transaction transaction id: " + id);
       }
-      
+
       tx.commit(this);
-      
+
       if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
    }
-   
+
    public void rollbackTransaction(TransactionId id) throws Throwable
    {
       if (trace) { log.trace(this.currentNodeId + " rolling back transaction " + id ); }
-      
+
       ClusterTransaction tx = null;
-        
+
       synchronized (holdingArea)
       {
-         tx = (ClusterTransaction)holdingArea.remove(id);                
+         tx = (ClusterTransaction)holdingArea.remove(id);
       }
-      
+
       if (tx == null)
       {
          throw new IllegalStateException("Cannot find transaction transaction id: " + id);
       }
-      
+
       tx.rollback(this);
-      
+
       if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
    }
-   
+
    /**
     * Check for any transactions that need to be committed or rolled back
     */
    public void check(Integer nodeId) throws Throwable
    {
       if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
-      
+
       synchronized (holdingArea)
       {
          Iterator iter = holdingArea.entrySet().iterator();
-         
+
          List toRemove = new ArrayList();
-         
+
          while (iter.hasNext())
          {
             Map.Entry entry = (Map.Entry)iter.next();
-            
+
             TransactionId id = (TransactionId)entry.getKey();
-            
+
             if (id.getNodeId() == nodeId.intValue())
             {
                ClusterTransaction tx = (ClusterTransaction)entry.getValue();
-               
+
                if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
-                
+
                boolean commit = tx.check(this);
-               
+
                if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
-               
+
                if (commit)
                {
                   tx.commit(this);
@@ -1051,56 +1092,56 @@
                {
                   tx.rollback(this);
                }
-               
+
                toRemove.add(id);
-               
+
                if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
             }
          }
-         
+
          //Remove the transactions from the holding area
-         
+
          iter = toRemove.iterator();
-         
+
          while (iter.hasNext())
          {
             TransactionId id = (TransactionId)iter.next();
-            
+
             holdingArea.remove(id);
          }
       }
       if (trace) { log.trace(this.currentNodeId + " check complete"); }
    }
-   
+
    public void sendQueueStats() throws Exception
    {
       if (!started)
       {
          return;
       }
-      
+
       lock.readLock().acquire();
- 
-      List statsList = null;      
-      
+
+      List statsList = null;
+
       try
-      {         
+      {
          Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
-         
+
          if (nameMap != null)
-         {            
+         {
             Iterator iter = nameMap.values().iterator();
-                     
+
             while (iter.hasNext())
             {
                Binding bb = (Binding)iter.next();
-               
+
                LocalClusteredQueue q = (LocalClusteredQueue)bb.getQueue();
-                             
+
                if (q.isActive())
-               {                                                      
+               {
                   QueueStats stats = q.getStats();
-                                              
+
                   if (stats != null)
                   {
                      if (statsList == null)
@@ -1109,9 +1150,9 @@
                      }
 
                      statsList.add(stats);
-                     
+
                      if (trace) { log.trace(this.currentNodeId + " adding stat for send " + stats); }
-                  } 
+                  }
                }
             }
          }
@@ -1120,64 +1161,64 @@
       {
          lock.readLock().release();
       }
-      
+
       if (statsList != null)
       {
          ClusterRequest req = new QueueStatsRequest(currentNodeId, statsList);
-         
+
          asyncSendRequest(req);
-         
+
          if (trace) { log.trace(this.currentNodeId + " Sent stats"); }
       }
    }
-   
+
    public void updateQueueStats(int nodeId, List statsList) throws Exception
    {
       lock.readLock().acquire();
 
       if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
-      
+
       try
-      {      
+      {
          if (nodeId == this.currentNodeId)
          {
             //Sanity check
             throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
          }
-         
+
          Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-         
+
          if (nameMap == null)
          {
             //This is ok, the node might have left
             if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
          }
          else
-         {     
+         {
             Iterator iter = statsList.iterator();
-            
+
             while (iter.hasNext())
             {
                QueueStats st = (QueueStats)iter.next();
-               
+
                Binding bb = (Binding)nameMap.get(st.getQueueName());
-               
+
                if (bb == null)
                {
                   //I guess this is possible if the queue was unbound
                   if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
                }
                else
-               {                  
+               {
                   RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
-                  
+
                   stub.setStats(st);
-                  
+
                   if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
-                  
+
                   ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
-                  
-                  //Maybe the local queue now wants to pull message(s) from the remote queue given that the 
+
+                  //Maybe the local queue now wants to pull message(s) from the remote queue given that the
                   //stats for the remote queue have changed
                   LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
 
@@ -1185,79 +1226,79 @@
                   {
                      //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
                      RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
-                     
+
                      if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
-                                    
+
                      localQueue.setPullQueue(toQueue);
-                     
+
                      if (toQueue != null && localQueue.getRefCount() == 0)
                      {
-                        //We now trigger delivery - this may cause a pull event                                                
+                        //We now trigger delivery - this may cause a pull event
                         //We only do this if there are no refs in the local queue
-                        
+
                         localQueue.deliver(false);
-                                                                    
+
                         if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
                      }
-                  } 
+                  }
                }
-            }         
-         }         
+            }
+         }
       }
       finally
       {
-         lock.readLock().release();      
+         lock.readLock().release();
       }
-   }      
-   
+   }
+
    public boolean referenceExistsInStorage(long channelID, long messageID) throws Exception
    {
       return pm.referenceExists(channelID, messageID);
-   } 
-  
-   
+   }
+
+
    public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
                                        String queueName, org.jboss.messaging.core.Message message) throws Throwable
    {
       if (trace) { log.trace(this.currentNodeId + " handling pull result " + message + " for " + queueName); }
-               
+
       Binding binding = getBindingForQueueName(queueName);
-      
+
       //The binding might be null if the queue was unbound
-      
+
       boolean handled = false;
-      
+
       if (!failHandleResult && binding != null)
-      {                     
+      {
          LocalClusteredQueue localQueue = (LocalClusteredQueue)binding.getQueue();
-              
+
          RemoteQueueStub remoteQueue = localQueue.getPullQueue();
-         
+
          if (remoteNodeId != remoteQueue.getNodeId())
          {
             //It might have changed since the request was sent
             Map bindings = (Map)nameMaps.get(new Integer(remoteNodeId));
-            
+
             if (bindings != null)
             {
                binding = (Binding)bindings.get(queueName);
-               
+
                if (binding != null)
-               {                     
-                 remoteQueue = (RemoteQueueStub)binding.getQueue();                              
+               {
+                 remoteQueue = (RemoteQueueStub)binding.getQueue();
                }
             }
          }
-         
+
          if (remoteQueue != null)
          {
             localQueue.handlePullMessagesResult(remoteQueue, message, holdingTxId,
                                                 failBeforeCommit, failAfterCommit);
-            
+
             handled = true;
-         }     
+         }
       }
-      
+
       if (!handled)
       {
          //If we didn't handle it for what ever reason, then we might have to send a rollback
@@ -1267,12 +1308,12 @@
          {
             //Only reliable messages will be in holding area
             this.asyncSendRequest(new RollbackPullRequest(this.currentNodeId, holdingTxId), remoteNodeId);
-            
+
             if (trace) { log.trace(this.currentNodeId + " send rollback pull request"); }
          }
-      }      
+      }
    }
-   
+
    public int getNodeId()
    {
       return currentNodeId;
@@ -1303,9 +1344,9 @@
       sb.append("]");
       return sb.toString();
    }
-                        
+
    // Public ------------------------------------------------------------------------------------------
-      
+
    //MUST ONLY be used for testing
    public int getNumberOfNodesInCluster()
    {
@@ -1318,7 +1359,7 @@
          return 0;
       }
    }
-   
+
    //MUST ONLY be used for testing
    public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
    {
@@ -1326,7 +1367,7 @@
       this.failAfterCommit = afterCommit;
       this.failHandleResult = handleResult;
    }
-   
+
    //MUST ONLY be used for testing
    public Collection getHoldingTransactions()
    {
@@ -1361,24 +1402,24 @@
          }
       }
    }
-   
+
    /**
     * This method fails over all the queues from node <failedNodeId> onto this node
-    * It is triggered when a JGroups view change occurs due to a member leaving and 
+    * It is triggered when a JGroups view change occurs due to a member leaving and
     * it's determined the member didn't leave cleanly
-    * 
+    *
     * @param failedNodeId
     * @throws Exception
     */
    private void failOver(int failedNodeId) throws Exception
    {
       //Need to lock
-      lock.writeLock().acquire();            
+      lock.writeLock().acquire();
 
       try
       {
          log.info("Preparing failover against node " + failedNodeId);
-         
+
          /*
          We make sure a FailoverStatus object is put in the replicated data for the node
          The real failover node will always add this in.
@@ -1389,60 +1430,60 @@
          And clients may failover after that and need to know if they have the correct node.
          Since this is the first thing we do after detecting failover, it should be very quick that
          all nodes know, however there is still a chance that a client tries to failover before
-         the information is replicated.                  
+         the information is replicated.
          */
-         
+
          Map replicants = (Map)get(FAILED_OVER_FOR_KEY);
-         
+
          FailoverStatus status = (FailoverStatus)replicants.get(new Integer(currentNodeId));
-         
+
          if (status == null)
          {
-            status = new FailoverStatus();            
+            status = new FailoverStatus();
          }
-         
+
          status.startFailingOverForNode(failedNodeId);
-         
+
          put(FAILED_OVER_FOR_KEY, status);
-         
+
          log.info("Put into failed over map that starting failover");
-         
+
          //Get the map of queues for the failed node
-         
+
          Map subMaps = (Map)nameMaps.get(new Integer(failedNodeId));
          if (subMaps==null || subMaps.size()==0)
          {
             log.warn("Couldn't find any binding to failOver from serverId=" +failedNodeId);
             return;
          }
-                  
+
          //Compile a list of the queue names to remove
          //Note that any non durable bindings will already have been removed (in removeDataForNode()) when the
          //node leave was detected, so if there are any non durable bindings left here then
          //this is an error
-         
+
          //We iterate through twice to avoid ConcurrentModificationException
          ArrayList namesToRemove = new ArrayList();
          for (Iterator iterNames = subMaps.entrySet().iterator(); iterNames.hasNext();)
          {
             Map.Entry entry = (Map.Entry)iterNames.next();
-            
+
             Binding binding = (Binding )entry.getValue();
-            
+
             //Sanity check
             if (!binding.getQueue().isRecoverable())
             {
                throw new IllegalStateException("Find non recoverable queue in map, these should have been removed!");
             }
-            
+
             //Sanity check
             if (!binding.getQueue().isClustered())
             {
                throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
             }
-            
+
             ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
-            
+
             //Sanity check
             if (queue.isLocal())
             {
@@ -1452,29 +1493,29 @@
          }
 
          log.info("Deleting " + namesToRemove.size() + " bindings from old node");
-         
+
          for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
          {
             Map.Entry entry = (Map.Entry)iterNames.next();
-                        
+
             Binding binding = (Binding)entry.getValue();
-            
+
             RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
-            
+
             String queueName = (String)entry.getKey();
-                        
+
             //First the binding is removed from the in memory condition and name maps
             this.removeBinding(failedNodeId, queueName);
 
             //Then deleted from the database
             this.deleteBinding(failedNodeId, queueName);
-            
+
             log.info("deleted binding for " + queueName);
 
             //Then an unbind request is sent - this cause other nodes to also remove it from the in memory
             //condition and name maps
             UnbindRequest unbindRequest = new UnbindRequest(failedNodeId, queueName);
-            
+
             syncSendRequest(unbindRequest);
 
             //If there is already a queue registered with the same name, then we set a flag "failed" on the
@@ -1496,43 +1537,43 @@
             {
                log.info("There is already a queue with that name so adding to failed map");
             }
-           
+
             //Create a new binding
             Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
                                                     stub.getName(), stub.getChannelID(),
                                                     stub.getFilter(), stub.isRecoverable(), failed);
-            
+
             log.info("Created new binding");
 
             //Insert it into the database
             insertBinding(newBinding);
 
             LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
-            
+
             clusteredQueue.deactivate();
             clusteredQueue.load();
             clusteredQueue.activate();
-            
+
             log.info("Loaded queue");
-            
+
             //Add the new binding in memory
             addBinding(newBinding);
-            
+
             //Send a bind request so other nodes add it too
             sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
-            
+
             //FIXME there is a problem in the above code.
             //If the server crashes between deleting the binding from the database
             //and creating the new binding in the database, then the binding will be completely
             //lost from the database when the server is resurrected.
             //To remedy, both db operations need to be done in the same JBDC tx
          }
-         
+
          log.info("Server side fail over is now complete");
-         
+
          //TODO - should this be in a finally? I'm not sure
          status.finishFailingOver();
-         
+
          put(FAILED_OVER_FOR_KEY, status);
       }
       finally
@@ -1667,7 +1708,7 @@
       {
          Map.Entry entry = (Map.Entry) iter.next();
          Map subMap = (Map)entry.getValue();
-         boolean firstTime=true;
+
          for (Iterator subIterator = subMap.entrySet().iterator(); subIterator.hasNext();)
          {
             Map.Entry subValue = (Map.Entry) subIterator.next();
@@ -1802,25 +1843,25 @@
          routerMap.remove(queueName);
       }
    }
-   
+
    protected void loadBindings() throws Exception
    {
       if (trace) { log.trace(this + " loading bindings"); }
-      
+
       boolean isState = syncChannel.getState(null, stateTimeout);
 
       if (!isState)
       {
          //Must be first member in group or non clustered- we load the state ourself from the database
-         
+
          if (trace) { log.trace(this + " is the first member of group, so will load bindings from database"); }
-         
+
          super.loadBindings();
       }
       else
       {
          //The state will be set in due course via the MessageListener - we must wait until this happens
-         
+
          if (trace) { log.trace(this.currentNodeId + " Not first member of group- so waiting for state to arrive...."); }
 
       synchronized (setStateLock)
@@ -1842,7 +1883,7 @@
       if (nodeId == this.currentNodeId)
       {
          QueuedExecutor executor = (QueuedExecutor)pool.get();
-         
+
          queue = new LocalClusteredQueue(this, nodeId, queueName, channelId, ms, pm, true,
                   durable, executor, filter, tr);
       }
@@ -1850,12 +1891,12 @@
       {
          queue = new RemoteQueueStub(nodeId, queueName, channelId, durable, pm, filter);
       }
-      
+
       return new DefaultBinding(nodeId, condition, queue, failed);
    }
-   
+
    // Private ------------------------------------------------------------------------------------------
-   
+
    private boolean leaveMessageReceived(Integer nodeId) throws Exception
    {
       synchronized (leftSet)
@@ -1863,30 +1904,30 @@
          return leftSet.remove(nodeId);
       }
    }
-   
+
    /*
     * Removes all non durable binding data, and any local replicant data for the specified node.
     */
    private void removeDataForNode(Integer nodeToRemove) throws Exception
    {
       log.info("Node " + nodeToRemove + " requested to leave cluster");
-      
+
       lock.writeLock().acquire();
 
       try
-      {          
+      {
          Map nameMap = (Map)nameMaps.get(nodeToRemove);
 
          if (nameMap != null)
          {
             List toRemove = new ArrayList();
-            
+
             Iterator iter = nameMap.values().iterator();
-            
+
             while (iter.hasNext())
             {
                Binding binding = (Binding)iter.next();
-               
+
                if (!binding.getQueue().isRecoverable())
                {
                   //We only remove the non durable bindings - we still need to be able to handle
@@ -1894,16 +1935,16 @@
                   toRemove.add(binding);
                }
             }
-            
+
             iter = toRemove.iterator();
-            
+
             while (iter.hasNext())
             {
                Binding binding = (Binding)iter.next();
-               
+
                removeBinding(nodeToRemove.intValue(), binding.getQueue().getName());
             }
-         }         
+         }
       }
       finally
       {
@@ -1939,18 +1980,18 @@
     */
    private void notifyListeners(Serializable key, Map updatedReplicantMap, boolean added,
                                 int originatorNodeId)
-   { 
+   {
       synchronized (replicationListeners)
-      {         
+      {
          for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
          {
             ReplicationListener listener = (ReplicationListener)i.next();
-            
+
             listener.onReplicationChange(key, updatedReplicantMap, added, originatorNodeId);
          }
       }
-   } 
-   
+   }
+
    /*
     * Multicast a sync request
     */
@@ -1966,56 +2007,56 @@
 
       if (trace) { log.trace(this + " request sent OK"); }
    }
-   
+
    //DEBUG ONLY - remove this
-   private void dumpNodeIdAddressMap(Map map) throws Exception   
+   private void dumpNodeIdAddressMap(Map map) throws Exception
    {
       log.info("** DUMPING NODE ADDRESS MAPPING");
-      
+
       Iterator iter = map.entrySet().iterator();
-      
+
       while (iter.hasNext())
       {
          Map.Entry entry = (Map.Entry)iter.next();
-         
+
          Integer theNodeId = (Integer)entry.getKey();
-         
+
          PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
-         
+
          log.info("node id: " + theNodeId +" --------->(async:sync) " + info.getAsyncChannelAddress() + ":" + info.getSyncChannelAddress());
       }
-      
-      log.info("** END DUMP");      
+
+      log.info("** END DUMP");
    }
-   
 
+
    //TODO - this is a bit tortuous - needs optimising
    private Integer getNodeIdForSyncAddress(Address address) throws Exception
    {
       synchronized (replicatedData)
       {
          Map map = get(ADDRESS_INFO_KEY);
-         
+
          if (map == null)
          {
             throw new IllegalStateException("Cannot find node id -> address mapping");
          }
-         
+
          this.dumpNodeIdAddressMap(map);
-         
+
          Iterator iter = map.entrySet().iterator();
-         
+
          log.info("iterating, looking for " + address);
-         
+
          Integer theNodeId = null;
          while (iter.hasNext())
          {
             Map.Entry entry = (Map.Entry)iter.next();
-            
+
             PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
-            
+
             log.info("info synch channel address: " + info.getSyncChannelAddress());
-            
+
             if (info.getSyncChannelAddress().equals(address))
             {
                log.info("equal");
@@ -2030,15 +2071,15 @@
          return theNodeId;
       }
    }
-   
+
    private boolean knowAboutNodeId(int nodeId)
    {
       //The nodeid->Address info mapping is stored in the replicated data
-      
+
       synchronized (replicatedData)
-      {         
+      {
          Map nodeIdAddressMapping = (Map)replicatedData.get(ADDRESS_INFO_KEY);
-         
+
          if (nodeIdAddressMapping == null)
          {
             return false;
@@ -2046,7 +2087,7 @@
          else
          {
             Object obj = nodeIdAddressMapping.get(new Integer(nodeId));
-            
+
             return obj != null;
          }
       }
@@ -2060,25 +2101,25 @@
    {
       return this.currentNodeId == getFailoverNodeID(nodeId);
    }
-      
+
    private byte[] getStateAsBytes() throws Exception
    {
       List bindings = new ArrayList();
-      
+
       Iterator iter = nameMaps.values().iterator();
-      
+
       while (iter.hasNext())
       {
          Map map  = (Map)iter.next();
-         
+
          Iterator iter2 = map.values().iterator();
-         
+
          while (iter2.hasNext())
          {
             Binding binding = (Binding)iter2.next();
-                      
-            Queue queue = binding.getQueue();        
-            
+
+            Queue queue = binding.getQueue();
+
             BindingInfo info = new BindingInfo(binding.getNodeId(), queue.getName(),
                                                binding.getCondition(),
                                                queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
@@ -2088,130 +2129,130 @@
             bindings.add(info);
          }
       }
-      
+
       //Need to copy
-      
+
       Map copy;
-      
+
       synchronized (replicatedData)
       {
          copy = copyReplicatedData(replicatedData);
       }
-      
+
       SharedState state = new SharedState(bindings, copy);
-      
+
       return StreamUtils.toBytes(state);
    }
-   
+
    private void processStateBytes(byte[] bytes) throws Exception
    {
       if (trace) { log.trace(this + " received state from group"); }
-      
+
       SharedState state = new SharedState();
-      
+
       StreamUtils.fromBytes(state, bytes);
-      
+
       if (trace) { log.trace(this + " received " + state.getBindings().size() + " bindings and map " + state.getReplicatedData()); }
-      
+
       nameMaps.clear();
-      
+
       conditionMap.clear();
-                 
+
       List bindings = state.getBindings();
-      
+
       Iterator iter = bindings.iterator();
-      
+
       while (iter.hasNext())
       {
          BindingInfo info = (BindingInfo)iter.next();
-         
+
          Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(),
                                               info.getFilterString(), info.isDurable(),info.isFailed());
-         
+
          if (binding.getNodeId() == this.currentNodeId)
          {
             //We deactivate if this is one of our own bindings - it can only
             //be one of our own durable bindings - and since state is retrieved before we are fully started
             //then the sub hasn't been deployed so must be deactivated
-            
-            binding.getQueue().deactivate();            
+
+            binding.getQueue().deactivate();
          }
-            
-         addBinding(binding);         
+
+         addBinding(binding);
       }
-      
+
       //Update the replicated data
-      
+
       synchronized (replicatedData)
       {
          replicatedData = copyReplicatedData(state.getReplicatedData());
       }
    }
-   
+
    private Map copyReplicatedData(Map toCopy)
    {
       Map copy = new HashMap();
-      
+
       Iterator iter = toCopy.entrySet().iterator();
-      
+
       while (iter.hasNext())
       {
          Map.Entry entry = (Map.Entry)iter.next();
-         
+
          Serializable key = (Serializable)entry.getKey();
-         
+
          Map replicants = (Map)entry.getValue();
-         
+
          Map m = new LinkedHashMap();
-         
+
          m.putAll(replicants);
-         
+
          copy.put(key, m);
       }
-      
+
       return copy;
    }
-   
 
+
    private byte[] writeRequest(ClusterRequest request) throws Exception
    {
       ByteArrayOutputStream baos = new ByteArrayOutputStream(2048);
-      
+
       DataOutputStream daos = new DataOutputStream(baos);
-      
+
       ClusterRequest.writeToStream(daos, request);
-            
+
       daos.flush();
-      
+
       return baos.toByteArray();
    }
-   
+
    private ClusterRequest readRequest(byte[] bytes) throws Exception
    {
       ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-      
+
       DataInputStream dais = new DataInputStream(bais);
-      
+
       ClusterRequest request = ClusterRequest.createFromStream(dais);
-      
+
       dais.close();
-      
-      return request;            
+
+      return request;
    }
-   
+
    private Address getAddressForNodeId(int nodeId, boolean sync) throws Exception
-   {  
+   {
       synchronized (replicatedData)
       {
          Map map = this.get(ADDRESS_INFO_KEY);
-         
+
          if (map == null)
          {
             throw new IllegalStateException("Cannot find address mapping");
          }
-         
+
          PostOfficeAddressInfo info = (PostOfficeAddressInfo)map.get(new Integer(nodeId));
-         
+
          if (info != null)
          {
             if (sync)
@@ -2227,11 +2268,11 @@
          {
             return null;
          }
-      }      
+      }
    }
-   
-   
 
+
+
    /*
     * A new node has joined the group
     */
@@ -2240,7 +2281,7 @@
       if (trace) { log.trace(this + ": " + address + " joined"); }
 
       log.info(this.currentNodeId + " Node with address: " + address + " joined");
-      
+
       //Currently does nothing
    }
 
@@ -2250,7 +2291,7 @@
    private void nodeLeft(Address address) throws Throwable
    {
       if (trace) { log.trace(this + ": " + address + " left"); }
-      
+
       log.info(this.currentNodeId + " Node with address: " + address + " left");
 
       Integer theNodeId = getNodeIdForSyncAddress(address);
@@ -2269,9 +2310,9 @@
 
       //Need to evaluate this before we regenerate the failover map
       boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
-      
+
       log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
-      
+
       log.info("Crashed: " + crashed);
 
       //Remove any replicant data and non durable bindings for the node - again we need to do this
@@ -2289,17 +2330,17 @@
          failOver(theNodeId.intValue());
       }
    }
-         
 
+
    // Inner classes -------------------------------------------------------------------
-    
+
    /*
     * This class is used to manage state on the control channel
     */
    private class ControlMessageListener implements MessageListener
    {
       public byte[] getState()
-      {     
+      {
          try
          {
             lock.writeLock().acquire();
@@ -2320,24 +2361,24 @@
             IllegalStateException e2 = new IllegalStateException(e.getMessage());
             e2.setStackTrace(e.getStackTrace());
             throw e2;
-         }     
+         }
          finally
          {
             lock.writeLock().release();
          }
       }
-      
+
       public void receive(Message message)
-      {         
+      {
       }
-      
+
       public void setState(byte[] bytes)
-      { 
+      {
          if (bytes != null)
-         {            
+         {
             try
             {
-               lock.writeLock().acquire();         
+               lock.writeLock().acquire();
             }
             catch (InterruptedException e)
             {
@@ -2361,15 +2402,15 @@
                lock.writeLock().release();
             }
          }
-               
+
          synchronized (setStateLock)
          {
             stateSet = true;
             setStateLock.notify();
          }
-      }      
+      }
    }
-   
+
    /*
     * We use this class so we notice when members leave the group
     */
@@ -2384,7 +2425,7 @@
       {
          //NOOP
       }
-      
+
       public void viewAccepted(View newView)
       {
          //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
@@ -2393,7 +2434,7 @@
                   + DefaultClusteredPostOffice.this.getOfficeName());
 
          // JGroups will make sure this method is never called by more than one thread concurrently
-         
+
          View oldView = currentView;
          currentView = newView;
 
@@ -2411,10 +2452,10 @@
       }
 
       public byte[] getState()
-      {        
+      {
          //NOOP
          return null;
-      }     
+      }
    }
 
 
@@ -2425,37 +2466,37 @@
    private class DataReceiver implements Receiver
    {
       public void block()
-      {   
+      {
          //NOOP
       }
 
       public void suspect(Address address)
-      { 
+      {
          //NOOP
       }
 
       public void viewAccepted(View view)
-      { 
+      {
          //NOOP
       }
 
       public byte[] getState()
-      {         
+      {
          //NOOP
          return null;
       }
-      
+
       public void receive(Message message)
       {
          if (trace) { log.trace(currentNodeId + " received message " + message + " on async channel"); }
-         
+
          try
          {
             byte[] bytes = message.getBuffer();
-            
+
             ClusterRequest request = readRequest(bytes);
-            
-            request.execute(DefaultClusteredPostOffice.this);            
+
+            request.execute(DefaultClusteredPostOffice.this);
          }
          catch (Throwable e)
          {
@@ -2463,13 +2504,13 @@
             IllegalStateException e2 = new IllegalStateException(e.getMessage());
             e2.setStackTrace(e.getStackTrace());
             throw e2;
-         }         
+         }
       }
-      
+
       public void setState(byte[] bytes)
       {
-         //NOOP         
-      }      
+         //NOOP
+      }
    }
 
    /*
@@ -2481,11 +2522,11 @@
       {
          if (trace) { log.info(currentNodeId + " received message " + message + " on sync channel"); }
          try
-         {   
+         {
             byte[] bytes = message.getBuffer();
-            
+
             ClusterRequest request = readRequest(bytes);
-            
+
             return request.execute(DefaultClusteredPostOffice.this);
          }
          catch (Throwable e)
@@ -2494,14 +2535,14 @@
             IllegalStateException e2 = new IllegalStateException(e.getMessage());
             e2.setStackTrace(e.getStackTrace());
             throw e2;
-         }         
-      }      
+         }
+      }
    }
-   
+
    /*
     * We use this class to respond to node address mappings being added or removed from the cluster
     * and then recalculate the node->failover node mapping
-    * 
+    *
     */
    private class NodeAddressMapListener implements ReplicationListener
    {
@@ -2512,22 +2553,22 @@
          if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
          {
             log.info(currentNodeId + " got node address change");
-            
+
             try
             {
                //DEBUG only
                dumpNodeIdAddressMap(updatedReplicantMap);
             }
             catch (Exception ignore)
-            {               
+            {
             }
-            
+
             //A node-address mapping has been added/removed from global state-
             //We need to update the failover map
             generateFailoverMap(updatedReplicantMap);
          }
       }
-      
+
       private void generateFailoverMap(Map nodeAddressMap)
       {
          failoverMap = failoverMapper.generateMapping(new ArrayList(nodeAddressMap.keySet()));

Added: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -0,0 +1,24 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.plugin.postoffice.cluster;
+
+import java.util.Set;
+
+/**
+ * Group management interface.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public interface Peer
+{
+   /**
+    * Returns a set of nodeIDs (integers) representing the IDs of cluster's nodes.
+    */
+   Set getNodeIDView();
+}

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -0,0 +1,69 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+import javax.management.ObjectName;
+import java.util.Set;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class GroupManagementTest extends MessagingTestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public GroupManagementTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testOneNodeCluster() throws Exception
+   {
+
+      ServerManagement.start("all", 0);
+
+      ObjectName on = new ObjectName("jboss.messaging:service=QueuePostOffice");
+      Set view = (Set)ServerManagement.getAttribute(on, "NodeIDView");
+
+      assertEquals(1, view.size());
+      assertTrue(view.contains(new Integer(0)));
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -179,9 +179,8 @@
       log.info("Server 1 ID: " + serverID1);
       log.info("Server 2 ID: " + serverID2);
 
-//      assertTrue(serverID0 != serverID1);
-//
-//      assertTrue(serverID1 != serverID2);
+      assertTrue(serverID0 != serverID1);
+      assertTrue(serverID1 != serverID2);
    }
 
    protected void drainQueues() throws Exception

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-12-10 22:06:23 UTC (rev 1754)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-12-10 22:16:12 UTC (rev 1755)
@@ -265,7 +265,7 @@
    {
       try
       {
-         log.info(" Server peer ID ........... " + serverPeerID + " clustered: " + clustered);
+         log.info(" Server peer ID ........... " + serverPeerID);
 
          log.debug("creating ServerPeer instance");
 




More information about the jboss-cvs-commits mailing list