[jboss-cvs] JBoss Messaging SVN: r1764 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/messaging/core/plugin src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering tests/src/org/jboss/test/messaging/jms/crash tests/src/org/jboss/test/messaging/tools tests/src/org/jboss/test/messaging/tools/jmx tests/src/org/jboss/test/messaging/tools/jmx/rmi

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Dec 11 11:56:16 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-12-11 11:56:06 -0500 (Mon, 11 Dec 2006)
New Revision: 1764

Added:
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java
Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java
Log:
added a cluster event notification mechanism, various other tests and tweaks

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -22,6 +22,10 @@
 package org.jboss.messaging.core.plugin;
 
 import javax.management.ObjectName;
+import javax.management.NotificationListener;
+import javax.management.NotificationFilter;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
 import javax.transaction.TransactionManager;
 import org.jboss.jms.selector.SelectorFactory;
 import org.jboss.jms.server.JMSConditionFactory;
@@ -103,6 +107,27 @@
       return postOffice.getNodeIDView();
    }
 
+   // NotificationBroadcaster implementation ------------------------
+
+   public void addNotificationListener(NotificationListener listener,
+                                       NotificationFilter filter,
+                                       Object object) throws IllegalArgumentException
+   {
+      postOffice.addNotificationListener(listener, filter, object);
+   }
+
+   public void removeNotificationListener(NotificationListener listener)
+      throws ListenerNotFoundException
+   {
+      postOffice.removeNotificationListener(listener);
+   }
+
+   public MBeanNotificationInfo[] getNotificationInfo()
+   {
+      return postOffice.getNotificationInfo();
+   }
+
+
    // MBean attributes ----------------------------------------------
 
    public synchronized ObjectName getServerPeer()

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ClusteredPostOffice.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -40,6 +40,8 @@
  */
 public interface ClusteredPostOffice extends PostOffice, Peer
 {
+   public static final String VIEW_CHANGED_NOTIFICATION = "VIEW_CHANGED";
+
    /**
     * Bind a queue to the post office under a specific condition
     * such that it is available across the cluster

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -44,6 +44,12 @@
 import javax.jms.TextMessage;
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationListener;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationFilter;
+import javax.management.ListenerNotFoundException;
+import javax.management.Notification;
 
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.logging.Logger;
@@ -82,7 +88,7 @@
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
- * 
+ *
  * A DefaultClusteredPostOffice
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -96,6 +102,8 @@
 public class DefaultClusteredPostOffice extends DefaultPostOffice
    implements ClusteredPostOffice, PostOfficeInternal, Replicator
 {
+   // Constants -----------------------------------------------------
+
    private static final Logger log = Logger.getLogger(DefaultClusteredPostOffice.class);
 
    // Key for looking up node id -> address info mapping from replicated data
@@ -104,22 +112,32 @@
    // Key for looking up node id -> failed over for node id mapping from replicated data
    public static final String FAILED_OVER_FOR_KEY = "failed_over_for";
 
-   //Used for failure testing
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Used for failure testing
+
    private boolean failBeforeCommit;
-
    private boolean failAfterCommit;
-
    private boolean failHandleResult;
-   //End of failure testing attributes
 
+   // End of failure testing attributes
+
    private boolean trace = log.isTraceEnabled();
 
+   private String groupName;
+
+   private boolean started;
+
+   private Element syncChannelConfigElement;
+   private String syncChannelConfig;
    private Channel syncChannel;
 
+   private Element asyncChannelConfigElement;
+   private String asyncChannelConfig;
    private Channel asyncChannel;
 
-   private String groupName;
-
    private MessageDispatcher controlMessageDispatcher;
 
    private Object setStateLock = new Object();
@@ -139,14 +157,6 @@
 
    private Set leftSet;
 
-   private Element syncChannelConfigElement;
-
-   private Element asyncChannelConfigElement;
-
-   private String syncChannelConfig;
-
-   private String asyncChannelConfig;
-
    private long stateTimeout;
 
    private long castTimeout;
@@ -167,10 +177,13 @@
 
    private ReplicationListener nodeAddressMapListener;
 
-   private boolean started;
-   
-   private QueuedExecutor viewExecutor;         
+   private NotificationBroadcasterSupport nbSupport;
 
+   private QueuedExecutor viewExecutor;
+
+
+   // Constructors --------------------------------------------------
+
    /*
     * Constructor using Element for configuration
     */
@@ -197,8 +210,8 @@
       throws Exception
    {
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
-           rf, failoverMapper, statsSendPeriod);
+           pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
+           redistributionPolicy, rf, failoverMapper, statsSendPeriod);
 
       this.syncChannelConfigElement = syncChannelConfig;
       this.asyncChannelConfigElement = asyncChannelConfig;
@@ -229,8 +242,8 @@
                                      long statsSendPeriod) throws Exception
    {
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
-           pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
-           rf, failoverMapper, statsSendPeriod);
+           pm, tr, filterFactory, conditionFactory, pool, groupName, stateTimeout, castTimeout,
+           redistributionPolicy, rf, failoverMapper, statsSendPeriod);
 
       this.syncChannelConfig = syncChannelConfig;
       this.asyncChannelConfig = asyncChannelConfig;
@@ -285,12 +298,13 @@
       failoverMap = new LinkedHashMap();
 
       leftSet = new HashSet();
-      
+
+      nbSupport = new NotificationBroadcasterSupport();
+
       viewExecutor = new QueuedExecutor();
    }
 
-   // MessagingComponent overrides
-   // --------------------------------------------------------------
+   // MessagingComponent overrides ----------------------------------
 
    public synchronized void start() throws Exception
    {
@@ -379,6 +393,26 @@
       }
    }
 
+   // NotificationBroadcaster implementation ------------------------
+
+   public void addNotificationListener(NotificationListener listener,
+                                       NotificationFilter filter,
+                                       Object object) throws IllegalArgumentException
+   {
+      nbSupport.addNotificationListener(listener, filter, object);
+   }
+
+   public void removeNotificationListener(NotificationListener listener)
+      throws ListenerNotFoundException
+   {
+      nbSupport.removeNotificationListener(listener);
+   }
+
+   public MBeanNotificationInfo[] getNotificationInfo()
+   {
+      return new MBeanNotificationInfo[0];
+   }
+
    // Peer implementation -------------------------------------------
 
    public Set getNodeIDView()
@@ -424,10 +458,7 @@
 
    public Binding bindClusteredQueue(Condition condition, LocalClusteredQueue queue) throws Exception
    {
-      if (trace)
-      {
-         log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition);
-      }
+      if (trace) { log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition); }
 
       if (queue.getNodeId() != this.currentNodeId)
       {
@@ -443,22 +474,9 @@
       return binding;
    }
 
-   private void sendBindRequest(Condition condition, LocalClusteredQueue queue, Binding binding)
-      throws Exception
-   {
-      BindRequest request =
-         new BindRequest(this.currentNodeId, queue.getName(), condition.toText(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
-                         binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
-
-      syncSendRequest(request);
-   }
-
    public Binding unbindClusteredQueue(String queueName) throws Throwable
    {
-      if (trace)
-      {
-         log.trace(this.currentNodeId + " unbind clustered queue: " + queueName);
-      }
+      if (trace) { log.trace(this.currentNodeId + " unbind clustered queue: " + queueName); }
 
       Binding binding = (Binding)super.unbindQueue(queueName);
 
@@ -469,274 +487,140 @@
       return binding;
    }
 
-   public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
+   public Collection listAllBindingsForCondition(Condition condition) throws Exception
    {
-      if (trace)
-      {
-         log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
-      }
+      return listBindingsForConditionInternal(condition, false);
+   }
 
-      //debug
-      try
-      {
-         TextMessage tm = (TextMessage)ref.getMessage();
-
-         log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-      }
-
-      if (ref == null)
-      {
-         throw new IllegalArgumentException("Message reference is null");
-      }
-
-      if (condition == null)
-      {
-         throw new IllegalArgumentException("Condition is null");
-      }
-
-      boolean routed = false;
-
+   public Binding getBindingforChannelId(long channelId) throws Exception
+   {
       lock.readLock().acquire();
 
       try
       {
-         ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
+         //First look in the failed map
+         //Failed bindings are stored in the failed map by channel id
+         Map channelMap = (Map)failedBindings.get(new Integer(currentNodeId));
+         Binding binding = null;
+         if (channelMap != null)
+         {
+            binding = (Binding)channelMap.get(new Long(channelId));
+         }
 
-         boolean startInternalTx = false;
-
-         int lastNodeId = -1;
-
-         if (cb != null)
+         if (binding == null)
          {
-            if (tx == null && ref.isReliable())
-            {
-               if (!(cb.getDurableCount() == 0 || (cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1)))
-               {
-                  // When routing a persistent message without a transaction then we may need to start an
-                  // internal transaction in order to route it.
-                  // This is so we can guarantee the message is delivered to all or none of the subscriptions.
-                  // We need to do this if there is anything other than
-                  // No durable subs or exactly one local durable sub
-                  startInternalTx = true;
-                  if (trace)
-                  {
-                     log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
-                  }
-               }
-            }
+            //Not found in the failed map - look in the name map
+            Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
 
-            if (startInternalTx)
+            if (nameMap != null)
             {
-               tx = tr.createTransaction();
-            }
-
-            int numberRemote = 0;
-
-            Map queueNameNodeIdMap = null;
-
-            long lastChannelId = -1;
-
-            Collection routers = cb.getRouters();
-
-            Iterator iter = routers.iterator();
-
-            while (iter.hasNext())
-            {
-               ClusterRouter router = (ClusterRouter)iter.next();
-
-               Delivery del = router.handle(null, ref, tx);
-
-               if (del != null && del.isSelectorAccepted())
+               for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
                {
-                  routed = true;
-
-                  ClusteredQueue queue = (ClusteredQueue)del.getObserver();
-
-                  if (trace)
+                  Binding itemBinding = (Binding)iterbindings.next();
+                  if (itemBinding.getQueue().getChannelID() == channelId)
                   {
-                     log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
-                               queue.getNodeId() + " local:" + queue.isLocal());
-
+                     binding = itemBinding;
+                     break;
                   }
-
-                  log.info(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
-                           queue.getNodeId() + " local:" + queue.isLocal());
-
-                  if (router.numberOfReceivers() > 1)
-                  {
-                     //We have now chosen which one will receive the message so we need to add this
-                     //information to a map which will get sent when casting - so the the queue
-                     //on the receiving node knows whether to receive the message
-                     if (queueNameNodeIdMap == null)
-                     {
-                        queueNameNodeIdMap = new HashMap();
-                     }
-
-                     queueNameNodeIdMap.put(queue.getName(), new Integer(queue.getNodeId()));
-                  }
-
-                  if (!queue.isLocal())
-                  {
-                     //We need to send the message remotely
-                     numberRemote++;
-
-                     lastNodeId = queue.getNodeId();
-
-                     lastChannelId = queue.getChannelID();
-                  }
                }
             }
-
-            //Now we've sent the message to any local queues, we might also need
-            //to send the message to the other office instances on the cluster if there are
-            //queues on those nodes that need to receive the message
-
-            //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
-            //of the number remote may be on the same node, so we could end up multicasting
-            //when unicast would do
-            if (numberRemote > 0)
+            else
             {
-               if (tx == null)
-               {
-                  if (numberRemote == 1)
-                  {
-                     if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
-
-                     //Unicast - only one node is interested in the message
-                     asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), null), lastNodeId);
-                  }
-                  else
-                  {
-                     if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
-
-                     //Multicast - more than one node is interested
-                     asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), queueNameNodeIdMap));
-                  }
-               }
-               else
-               {
-                  CastMessagesCallback callback = (CastMessagesCallback)tx.getCallback(this);
-
-                  if (callback == null)
-                  {
-                     callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
-
-                     //This callback MUST be executed first
-
-                     //Execution order is as follows:
-                     //Before commit:
-                     //1. Cast messages across network - get added to holding area (if persistent) on receiving
-                     //nodes
-                     //2. Persist messages in persistent store
-                     //After commit
-                     //1. Cast commit message across network
-                     tx.addFirstCallback(callback, this);
-                  }
-
-                  callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap,
-                                      numberRemote == 1 ? lastNodeId : -1,
-                                      lastChannelId);
-               }
+               log.info("nameMap is null");
             }
-
-            if (startInternalTx)
-            {
-               tx.commit();
-               if (trace) { log.trace("Committed internal transaction"); }
-            }
          }
+         log.info("Returned " + binding);
+         return binding;
       }
       finally
       {
          lock.readLock().release();
       }
-
-      return routed;
    }
 
-   public boolean isLocal()
-   {
-      return false;
-   }
+   // PostOfficeInternal implementation -----------------------------
 
-   public Collection listAllBindingsForCondition(Condition condition) throws Exception
+   /*
+    * Called when another node adds a binding
+    */
+   public void addBindingFromCluster(int nodeId, String queueName, String conditionText,
+                                     String filterString, long channelID, boolean durable,
+                                     boolean failed)
+      throws Exception
    {
-      return listBindingsForConditionInternal(condition, false);
-   }
+      lock.writeLock().acquire();
 
-   public FailoverMapper getFailoverMapper()
-   {
-      return failoverMapper;
-   }
+      if (trace)
+      {
+         log.info(this.currentNodeId + " adding binding from node: " + nodeId +
+                  " queue: " + queueName + " with condition: " + conditionText);
+      }
 
-   // Replicator implementation --------------------------------------------------------------------------
+      Condition condition = conditionFactory.createCondition(conditionText);
 
-   public Map get(Serializable key) throws Exception
-   {
-      synchronized (replicatedData)
+      try
       {
-         Map m = (Map)replicatedData.get(key);
+         //Sanity
 
-         return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
-      }
-   }
+         if (!knowAboutNodeId(nodeId))
+         {
+            throw new IllegalStateException("Don't know about node id: " + nodeId);
+         }
 
-   public void put(Serializable key, Serializable replicant) throws Exception
-   {
-      putReplicantLocally(currentNodeId, key, replicant);
+         // We currently only allow one binding per name per node
+         Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
 
-      PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
+         Binding binding = null;
 
-      syncSendRequest(request);
-   }
+         if (nameMap != null)
+         {
+            binding = (Binding)nameMap.get(queueName);
+         }
 
-   public boolean remove(Serializable key) throws Exception
-   {
-      if (removeReplicantLocally(this.currentNodeId, key))
-      {
-         RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
+         if (binding != null && failed)
+         {
+            throw new IllegalArgumentException(this.currentNodeId +
+                     " Binding already exists for node Id " + nodeId + " queue name " + queueName);
+         }
 
-         syncSendRequest(request);
+         binding = this.createBinding(nodeId, condition, queueName, channelID, filterString,
+                                      durable, failed);
 
-         return true;
+         addBinding(binding);
       }
-      else
+      finally
       {
-         return false;
+         lock.writeLock().release();
       }
+
+      log.info("****** binding added");
    }
 
-   public void registerListener(ReplicationListener listener)
+   /*
+    * Called when another node removes a binding
+    */
+   public void removeBindingFromCluster(int nodeId, String queueName) throws Exception
    {
-      synchronized (replicationListeners)
+      lock.writeLock().acquire();
+
+      if (trace) { log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName); }
+
+      try
       {
-         if (replicationListeners.contains(listener))
+         // Sanity
+         if (!knowAboutNodeId(nodeId))
          {
-            throw new IllegalArgumentException("Listener " + listener + " is already registered");
+            throw new IllegalStateException("Don't know about node id: " + nodeId);
          }
-         replicationListeners.add(listener);
+
+         removeBinding(nodeId, queueName);
       }
-   }
-
-   public void unregisterListener(ReplicationListener listener)
-   {
-      synchronized (replicationListeners)
+      finally
       {
-         boolean removed = replicationListeners.remove(listener);
-
-         if (!removed)
-         {
-            throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
-         }
+         lock.writeLock().release();
       }
    }
 
-   // PostOfficeInternal implementation ------------------------------------------------------------------
-
    public void handleNodeLeft(int nodeId) throws Exception
    {
       synchronized (leftSet)
@@ -752,7 +636,7 @@
       throws Exception
    {
       log.info("##########putReplicantLocally received, before lock");
-      
+
       synchronized (replicatedData)
       {
          log.info("putReplicantLocally received, after lock");
@@ -769,7 +653,7 @@
 
          notifyListeners(key, m, true, originatorNodeID);
       }
-      
+
       log.info("putReplicantLocally, completed");
    }
 
@@ -806,88 +690,6 @@
       }
    }
 
-   /*
-    * Called when another node adds a binding
-    */
-   public void addBindingFromCluster(int nodeId, String queueName, String conditionText,
-                                     String filterString, long channelID, boolean durable, boolean failed)
-      throws Exception
-   {
-      lock.writeLock().acquire();
-
-      if (trace)
-      {
-         log.info(this.currentNodeId + " adding binding from node: " + nodeId +
-                  " queue: " + queueName + " with condition: " + conditionText);
-      }
-
-      Condition condition = conditionFactory.createCondition(conditionText);
-            
-      try
-      {
-         //Sanity
-
-         if (!knowAboutNodeId(nodeId))
-         {
-            throw new IllegalStateException("Don't know about node id: " + nodeId);
-         }
-
-         // We currently only allow one binding per name per node
-         Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
-         Binding binding = null;
-
-         if (nameMap != null)
-         {
-            binding = (Binding)nameMap.get(queueName);
-         }
-
-         if (binding != null && failed)
-         {
-            throw new IllegalArgumentException(this.currentNodeId +
-                     " Binding already exists for node Id " + nodeId + " queue name " + queueName);
-         }
-
-         binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
-
-         addBinding(binding);
-      }
-      finally
-      {
-         lock.writeLock().release();
-      }
-      
-      log.info("****** binding added");
-   }
-
-   /*
-    * Called when another node removes a binding
-    */
-   public void removeBindingFromCluster(int nodeId, String queueName) throws Exception
-   {
-      lock.writeLock().acquire();
-
-      if (trace)
-      {
-         log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName);
-      }
-
-      try
-      {
-         // Sanity
-         if (!knowAboutNodeId(nodeId))
-         {
-            throw new IllegalStateException("Don't know about node id: " + nodeId);
-         }
-
-         removeBinding(nodeId, queueName);
-      }
-      finally
-      {
-         lock.writeLock().release();
-      }
-   }
-
    public void routeFromCluster(org.jboss.messaging.core.Message message, String routingKeyText,
                                 Map queueNameNodeIdMap) throws Exception
    {
@@ -901,9 +703,9 @@
                routingKeyText + " map " + queueNameNodeIdMap);
 
       Condition routingKey = conditionFactory.createCondition(routingKeyText);
-            
-      lock.readLock().acquire();  
 
+      lock.readLock().acquire();
+
       // Need to reference the message
       MessageReference ref = null;
       try
@@ -1063,62 +865,83 @@
       if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
    }
 
-   /**
-    * Check for any transactions that need to be committed or rolled back
-    */
-   public void check(Integer nodeId) throws Throwable
+   public void updateQueueStats(int nodeId, List statsList) throws Exception
    {
-      if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
+      lock.readLock().acquire();
 
-      synchronized (holdingArea)
+      if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
+
+      try
       {
-         Iterator iter = holdingArea.entrySet().iterator();
+         if (nodeId == this.currentNodeId)
+         {
+            //Sanity check
+            throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
+         }
 
-         List toRemove = new ArrayList();
+         Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
 
-         while (iter.hasNext())
+         if (nameMap == null)
          {
-            Map.Entry entry = (Map.Entry)iter.next();
+            //This is ok, the node might have left
+            if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
+         }
+         else
+         {
+            Iterator iter = statsList.iterator();
 
-            TransactionId id = (TransactionId)entry.getKey();
-
-            if (id.getNodeId() == nodeId.intValue())
+            while (iter.hasNext())
             {
-               ClusterTransaction tx = (ClusterTransaction)entry.getValue();
+               QueueStats st = (QueueStats)iter.next();
 
-               if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
+               Binding bb = (Binding)nameMap.get(st.getQueueName());
 
-               boolean commit = tx.check(this);
-
-               if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
-
-               if (commit)
+               if (bb == null)
                {
-                  tx.commit(this);
+                  //I guess this is possible if the queue was unbound
+                  if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
                }
                else
                {
-                  tx.rollback(this);
-               }
+                  RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
 
-               toRemove.add(id);
+                  stub.setStats(st);
 
-               if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
-            }
-         }
+                  if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
 
-         //Remove the transactions from the holding area
+                  ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
 
-         iter = toRemove.iterator();
+                  //Maybe the local queue now wants to pull message(s) from the remote queue given that the
+                  //stats for the remote queue have changed
+                  LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
 
-         while (iter.hasNext())
-         {
-            TransactionId id = (TransactionId)iter.next();
+                  if (localQueue!=null)
+                  {
+                     //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
+                     RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
 
-            holdingArea.remove(id);
+                     if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+
+                     localQueue.setPullQueue(toQueue);
+
+                     if (toQueue != null && localQueue.getRefCount() == 0)
+                     {
+                        //We now trigger delivery - this may cause a pull event
+                        //We only do this if there are no refs in the local queue
+
+                        localQueue.deliver(false);
+
+                        if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
+                     }
+                  }
+               }
+            }
          }
       }
-      if (trace) { log.trace(this.currentNodeId + " check complete"); }
+      finally
+      {
+         lock.readLock().release();
+      }
    }
 
    public void sendQueueStats() throws Exception
@@ -1180,91 +1003,11 @@
       }
    }
 
-   public void updateQueueStats(int nodeId, List statsList) throws Exception
-   {
-      lock.readLock().acquire();
-
-      if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
-
-      try
-      {
-         if (nodeId == this.currentNodeId)
-         {
-            //Sanity check
-            throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
-         }
-
-         Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
-
-         if (nameMap == null)
-         {
-            //This is ok, the node might have left
-            if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
-         }
-         else
-         {
-            Iterator iter = statsList.iterator();
-
-            while (iter.hasNext())
-            {
-               QueueStats st = (QueueStats)iter.next();
-
-               Binding bb = (Binding)nameMap.get(st.getQueueName());
-
-               if (bb == null)
-               {
-                  //I guess this is possible if the queue was unbound
-                  if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
-               }
-               else
-               {
-                  RemoteQueueStub stub = (RemoteQueueStub)bb.getQueue();
-
-                  stub.setStats(st);
-
-                  if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
-
-                  ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
-
-                  //Maybe the local queue now wants to pull message(s) from the remote queue given that the
-                  //stats for the remote queue have changed
-                  LocalClusteredQueue localQueue = (LocalClusteredQueue)router.getLocalQueue();
-
-                  if (localQueue!=null)
-                  {
-                     //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
-                     RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
-
-                     if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
-
-                     localQueue.setPullQueue(toQueue);
-
-                     if (toQueue != null && localQueue.getRefCount() == 0)
-                     {
-                        //We now trigger delivery - this may cause a pull event
-                        //We only do this if there are no refs in the local queue
-
-                        localQueue.deliver(false);
-
-                        if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
-                     }
-                  }
-               }
-            }
-         }
-      }
-      finally
-      {
-         lock.readLock().release();
-      }
-   }
-
    public boolean referenceExistsInStorage(long channelID, long messageID) throws Exception
    {
       return pm.referenceExists(channelID, messageID);
    }
 
-
    public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
                                        String queueName, org.jboss.messaging.core.Message message) throws Throwable
    {
@@ -1322,320 +1065,348 @@
       }
    }
 
-   public int getNodeId()
+   // Replicator implementation -------------------------------------
+
+   public void put(Serializable key, Serializable replicant) throws Exception
    {
-      return currentNodeId;
+      putReplicantLocally(currentNodeId, key, replicant);
+
+      PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
+
+      syncSendRequest(request);
    }
 
-   public String toString()
+   public Map get(Serializable key) throws Exception
    {
-      StringBuffer sb = new StringBuffer("ClusteredPostOffice[");
-      sb.append(currentNodeId).append(":").append(getOfficeName()).append(":");
+      synchronized (replicatedData)
+      {
+         Map m = (Map)replicatedData.get(key);
 
-      if (syncChannel == null)
-      {
-         sb.append("UNINITIALIZED");
+         return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
       }
-      else
-      {
-         Address addr = syncChannel.getLocalAddress();
-         if (addr == null)
-         {
-            sb.append("UNCONNECTED");
-         }
-         else
-         {
-            sb.append(addr);
-         }
-      }
-
-      sb.append("]");
-      return sb.toString();
    }
 
-   // Public ------------------------------------------------------------------------------------------
-
-   //MUST ONLY be used for testing
-   public int getNumberOfNodesInCluster()
+   public boolean remove(Serializable key) throws Exception
    {
-      if (currentView != null)
+      if (removeReplicantLocally(this.currentNodeId, key))
       {
-         return currentView.size();
+         RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
+
+         syncSendRequest(request);
+
+         return true;
       }
       else
       {
-         return 0;
+         return false;
       }
    }
 
-   //MUST ONLY be used for testing
-   public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
+   public void registerListener(ReplicationListener listener)
    {
-      this.failBeforeCommit = beforeCommit;
-      this.failAfterCommit = afterCommit;
-      this.failHandleResult = handleResult;
-   }
-
-   //MUST ONLY be used for testing
-   public Collection getHoldingTransactions()
-   {
-      return holdingArea.values();
-   }
-
-
-   /**
-    *  Verifies changes on the View deciding if a node joined or left the cluster
-    *
-    * */
-   private void verifyMembership(View oldView, View newView) throws Throwable
-   {
-      if (oldView != null)
+      synchronized (replicationListeners)
       {
-         for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+         if (replicationListeners.contains(listener))
          {
-            Address address = (Address)i.next();
-            if (!newView.containsMember(address))
-            {
-               nodeLeft(address);
-            }
+            throw new IllegalArgumentException("Listener " + listener + " is already registered");
          }
+         replicationListeners.add(listener);
       }
+   }
 
-      for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+   public void unregisterListener(ReplicationListener listener)
+   {
+      synchronized (replicationListeners)
       {
-         Address address = (Address)i.next();
-         if (oldView == null || !oldView.containsMember(address))
+         boolean removed = replicationListeners.remove(listener);
+
+         if (!removed)
          {
-            nodeJoined(address);
+            throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
          }
       }
    }
 
-   /**
-    * This method fails over all the queues from node <failedNodeId> onto this node
-    * It is triggered when a JGroups view change occurs due to a member leaving and
-    * it's determined the member didn't leave cleanly
-    *
-    * @param failedNodeId
-    * @throws Exception
-    */
-   private void failOver(int failedNodeId) throws Exception
+   public FailoverMapper getFailoverMapper()
    {
-      //Need to lock
-      lock.writeLock().acquire();
+      return failoverMapper;
+   }
 
+   // Public --------------------------------------------------------
+
+   public boolean route(MessageReference ref, Condition condition, Transaction tx) throws Exception
+   {
+      if (trace) { log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx); }
+
+      //debug
       try
       {
-         log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
+         TextMessage tm = (TextMessage)ref.getMessage();
 
-         /*
-         We make sure a FailoverStatus object is put in the replicated data for the node
-         The real failover node will always add this in.
-         This means that each node knows which node has really started the failover for another node, and
-         which node did failover for other nodes in the past
-         We cannot rely on the failoverMap for this, since that will regenerated once failover is done,
-         because of the change in membership.
-         And clients may failover after that and need to know if they have the correct node.
-         Since this is the first thing we do after detecting failover, it should be very quick that
-         all nodes know, however there is still a chance that a client tries to failover before
-         the information is replicated.
-         */
+         log.info(this.currentNodeId + " *********** Routing ref: " + tm.getText() + " with condition " + condition + " and transaction " + tx);
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
 
-         Map replicants = (Map)get(FAILED_OVER_FOR_KEY);
+      if (ref == null)
+      {
+         throw new IllegalArgumentException("Message reference is null");
+      }
 
-         FailoverStatus status = (FailoverStatus)replicants.get(new Integer(currentNodeId));
+      if (condition == null)
+      {
+         throw new IllegalArgumentException("Condition is null");
+      }
 
-         if (status == null)
-         {
-            status = new FailoverStatus();
-         }
+      boolean routed = false;
 
-         status.startFailingOverForNode(failedNodeId);
+      lock.readLock().acquire();
 
-         log.info("Putting state that failover is starting");
-         
-         put(FAILED_OVER_FOR_KEY, status);
+      try
+      {
+         ClusteredBindings cb = (ClusteredBindings)conditionMap.get(condition);
 
-         log.info("Put state that failover is starting");
+         boolean startInternalTx = false;
 
-         //Get the map of queues for the failed node
+         int lastNodeId = -1;
 
-         Map subMaps = (Map)nameMaps.get(new Integer(failedNodeId));
-         if (subMaps==null || subMaps.size()==0)
+         if (cb != null)
          {
-            log.warn("Couldn't find any binding to failOver from serverId=" +failedNodeId);
-            return;
-         }
+            if (tx == null && ref.isReliable())
+            {
+               if (!(cb.getDurableCount() == 0 || (cb.getDurableCount() == 1 && cb.getLocalDurableCount() == 1)))
+               {
+                  // When routing a persistent message without a transaction then we may need to
+                  // start an internal transaction in order to route it. This is so we can guarantee
+                  // the message is delivered to all or none of the subscriptions. We need to do
+                  // this if there is anything other than. No durable subs or exactly one local
+                  // durable sub.
 
-         //Compile a list of the queue names to remove
-         //Note that any non durable bindings will already have been removed (in removeDataForNode()) when the
-         //node leave was detected, so if there are any non durable bindings left here then
-         //this is an error
+                  startInternalTx = true;
 
-         //We iterate through twice to avoid ConcurrentModificationException
-         ArrayList namesToRemove = new ArrayList();
-         for (Iterator iterNames = subMaps.entrySet().iterator(); iterNames.hasNext();)
-         {
-            Map.Entry entry = (Map.Entry)iterNames.next();
-
-            Binding binding = (Binding )entry.getValue();
-
-            //Sanity check
-            if (!binding.getQueue().isRecoverable())
-            {
-               throw new IllegalStateException("Find non recoverable queue in map, these should have been removed!");
+                  if (trace) { log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs"); }
+               }
             }
 
-            //Sanity check
-            if (!binding.getQueue().isClustered())
+            if (startInternalTx)
             {
-               throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
+               tx = tr.createTransaction();
             }
 
-            ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+            int numberRemote = 0;
 
-            //Sanity check
-            if (queue.isLocal())
+            Map queueNameNodeIdMap = null;
+
+            long lastChannelId = -1;
+
+            Collection routers = cb.getRouters();
+
+            Iterator iter = routers.iterator();
+
+            while (iter.hasNext())
             {
-               throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
-            }
-            namesToRemove.add(entry);
-         }
+               ClusterRouter router = (ClusterRouter)iter.next();
 
-         log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+               Delivery del = router.handle(null, ref, tx);
 
-         for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
-         {
-            Map.Entry entry = (Map.Entry)iterNames.next();
+               if (del != null && del.isSelectorAccepted())
+               {
+                  routed = true;
 
-            Binding binding = (Binding)entry.getValue();
+                  ClusteredQueue queue = (ClusteredQueue)del.getObserver();
 
-            RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+                  if (trace) { log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " + queue.getNodeId() + " local:" + queue.isLocal()); }
 
-            String queueName = (String)entry.getKey();
+                  log.info(this.currentNodeId + " Routing message to queue or stub:" +
+                           queue.getName() + " on node " + queue.getNodeId() + " local:" +
+                           queue.isLocal());
 
-            //First the binding is removed from the in memory condition and name maps
-            this.removeBinding(failedNodeId, queueName);
+                  if (router.numberOfReceivers() > 1)
+                  {
+                     //We have now chosen which one will receive the message so we need to add this
+                     //information to a map which will get sent when casting - so the the queue
+                     //on the receiving node knows whether to receive the message
+                     if (queueNameNodeIdMap == null)
+                     {
+                        queueNameNodeIdMap = new HashMap();
+                     }
 
-            //Then deleted from the database
-            this.deleteBinding(failedNodeId, queueName);
+                     queueNameNodeIdMap.put(queue.getName(), new Integer(queue.getNodeId()));
+                  }
 
-            log.info("deleted binding for " + queueName);
-            
-            //Note we do not need to send an unbind request across the cluster - this is because
-            //when the node crashes a view change will hit the other nodes and that will cause
-            //all binding data for that node to be removed anyway
+                  if (!queue.isLocal())
+                  {
+                     //We need to send the message remotely
+                     numberRemote++;
 
-            //If there is already a queue registered with the same name, then we set a flag "failed" on the
-            //binding and then the queue will go into a special list of failed bindings
-            //otherwise we treat at as a normal queue
-            //This is because we cannot deal with more than one queue with the same name
-            //Any new consumers will always only connect to queues in the main name map
-            //This may mean that queues in the failed map have messages stranded in them if consumers
-            //disconnect (since no more can reconnect)
-            //However we message redistribution activated other queues will be able to consume from them.
-            //TODO allow message redistribution for queues in the failed list
-            boolean failed = this.internalGetBindingForQueueName(queueName) != null;
+                     lastNodeId = queue.getNodeId();
 
-            if (!failed)
-            {
-               log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
+                     lastChannelId = queue.getChannelID();
+                  }
+               }
             }
-            else
-            {
-               log.info("There is already a queue with that name so adding to failed map");
-            }
 
-            //Create a new binding
-            Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
-                                                    stub.getName(), stub.getChannelID(),
-                                                    stub.getFilter(), stub.isRecoverable(), failed);
+            //Now we've sent the message to any local queues, we might also need
+            //to send the message to the other office instances on the cluster if there are
+            //queues on those nodes that need to receive the message
 
-            log.info("Created new binding");
+            //TODO - there is an innefficiency here, numberRemote does not take into account that more than one
+            //of the number remote may be on the same node, so we could end up multicasting
+            //when unicast would do
+            if (numberRemote > 0)
+            {
+               if (tx == null)
+               {
+                  if (numberRemote == 1)
+                  {
+                     if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
 
-            //Insert it into the database
-            insertBinding(newBinding);
+                     //Unicast - only one node is interested in the message
+                     asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), null), lastNodeId);
+                  }
+                  else
+                  {
+                     if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
 
-            LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+                     //Multicast - more than one node is interested
+                     asyncSendRequest(new MessageRequest(condition.toText(), ref.getMessage(), queueNameNodeIdMap));
+                  }
+               }
+               else
+               {
+                  CastMessagesCallback callback = (CastMessagesCallback)tx.getCallback(this);
 
-            clusteredQueue.deactivate();
-            clusteredQueue.load();
-            clusteredQueue.activate();
+                  if (callback == null)
+                  {
+                     callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
 
-            log.info("Loaded queue");
+                     //This callback MUST be executed first
 
-            //Add the new binding in memory
-            addBinding(newBinding);
+                     //Execution order is as follows:
+                     //Before commit:
+                     //1. Cast messages across network - get added to holding area (if persistent) on receiving
+                     //nodes
+                     //2. Persist messages in persistent store
+                     //After commit
+                     //1. Cast commit message across network
+                     tx.addFirstCallback(callback, this);
+                  }
 
-            //Send a bind request so other nodes add it too
-            sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+                  callback.addMessage(condition, ref.getMessage(), queueNameNodeIdMap,
+                                      numberRemote == 1 ? lastNodeId : -1,
+                                      lastChannelId);
+               }
+            }
 
-            //FIXME there is a problem in the above code.
-            //If the server crashes between deleting the binding from the database
-            //and creating the new binding in the database, then the binding will be completely
-            //lost from the database when the server is resurrected.
-            //To remedy, both db operations need to be done in the same JBDC tx
+            if (startInternalTx)
+            {
+               tx.commit();
+               if (trace) { log.trace("Committed internal transaction"); }
+            }
          }
-
-         log.info("Server side fail over is now complete");
-
-         //TODO - should this be in a finally? I'm not sure
-         status.finishFailingOver();
-
-         log.info("Putting state that failover has completed");
-         put(FAILED_OVER_FOR_KEY, status);
-         log.info("Put state that failover has completed");
       }
       finally
       {
-         lock.writeLock().release();
+         lock.readLock().release();
       }
+
+      return routed;
    }
 
-   public Binding getBindingforChannelId(long channelId) throws Exception
+   public boolean isLocal()
    {
-      lock.readLock().acquire();
+      return false;
+   }
 
-      try
+   /**
+    * Check for any transactions that need to be committed or rolled back
+    */
+   public void check(Integer nodeId) throws Throwable
+   {
+      if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
+
+      synchronized (holdingArea)
       {
-         //First look in the failed map
-         //Failed bindings are stored in the failed map by channel id
-         Map channelMap = (Map)failedBindings.get(new Integer(currentNodeId));
-         Binding binding = null;
-         if (channelMap != null)
-         {
-            binding = (Binding)channelMap.get(new Long(channelId));
-         }
+         Iterator iter = holdingArea.entrySet().iterator();
 
-         if (binding == null)
+         List toRemove = new ArrayList();
+
+         while (iter.hasNext())
          {
-            //Not found in the failed map - look in the name map
-            Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
+            Map.Entry entry = (Map.Entry)iter.next();
 
-            if (nameMap != null)
+            TransactionId id = (TransactionId)entry.getKey();
+
+            if (id.getNodeId() == nodeId.intValue())
             {
-               for (Iterator iterbindings = nameMap.values().iterator(); iterbindings.hasNext();)
+               ClusterTransaction tx = (ClusterTransaction)entry.getValue();
+
+               if (trace) { log.trace("Found transaction " + tx + " in holding area"); }
+
+               boolean commit = tx.check(this);
+
+               if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
+
+               if (commit)
                {
-                  Binding itemBinding = (Binding)iterbindings.next();
-                  if (itemBinding.getQueue().getChannelID() == channelId)
-                  {
-                     binding = itemBinding;
-                     break;
-                  }
+                  tx.commit(this);
                }
+               else
+               {
+                  tx.rollback(this);
+               }
+
+               toRemove.add(id);
+
+               if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
             }
-            else
-            {
-               log.info("nameMap is null");
-            }
          }
-         log.info("Returned " + binding);
-         return binding;
+
+         //Remove the transactions from the holding area
+
+         iter = toRemove.iterator();
+
+         while (iter.hasNext())
+         {
+            TransactionId id = (TransactionId)iter.next();
+
+            holdingArea.remove(id);
+         }
       }
-      finally
+      if (trace) { log.trace(this.currentNodeId + " check complete"); }
+   }
+
+   public int getNodeId()
+   {
+      return currentNodeId;
+   }
+
+   public String toString()
+   {
+      StringBuffer sb = new StringBuffer("ClusteredPostOffice[");
+      sb.append(currentNodeId).append(":").append(getOfficeName()).append(":");
+
+      if (syncChannel == null)
       {
-         lock.readLock().release();
+         sb.append("UNINITIALIZED");
       }
+      else
+      {
+         Address addr = syncChannel.getLocalAddress();
+         if (addr == null)
+         {
+            sb.append("UNCONNECTED");
+         }
+         else
+         {
+            sb.append(addr);
+         }
+      }
+
+      sb.append("]");
+      return sb.toString();
    }
 
    public String printBindingInformation()
@@ -1747,10 +1518,27 @@
       return buffer.toString();
    }
 
+   /**
+    * MUST ONLY be used for testing!
+    */
+   public void setFail(boolean beforeCommit, boolean afterCommit, boolean handleResult)
+   {
+      this.failBeforeCommit = beforeCommit;
+      this.failAfterCommit = afterCommit;
+      this.failHandleResult = handleResult;
+   }
 
+   /**
+    * MUST ONLY be used for testing!
+    */
+   public Collection getHoldingTransactions()
+   {
+      return holdingArea.values();
+   }
 
-   // Protected ---------------------------------------------------------------------------------------
+   // Package protected ---------------------------------------------
 
+   // Protected -----------------------------------------------------
 
    protected void addToNameMap(Binding binding)
    {
@@ -1764,21 +1552,6 @@
       }
    }
 
-   private void addIntoFailedMaps(Binding binding)
-   {
-      Map channelMap = (Map)failedBindings.get(new Integer(binding.getNodeId()));
-
-      if (channelMap == null)
-      {
-         channelMap = new LinkedHashMap();
-
-         failedBindings.put(new Integer(binding.getNodeId()), channelMap);
-      }
-
-      channelMap.put(new Long(binding.getQueue().getChannelID()), binding);
-   }
-
-
    protected void addToConditionMap(Binding binding)
    {
       Condition condition = binding.getCondition();
@@ -1905,8 +1678,18 @@
       return new DefaultBinding(nodeId, condition, queue, failed);
    }
 
-   // Private ------------------------------------------------------------------------------------------
+   // Private -------------------------------------------------------
 
+   private void sendBindRequest(Condition condition, LocalClusteredQueue queue, Binding binding)
+      throws Exception
+   {
+      BindRequest request =
+         new BindRequest(this.currentNodeId, queue.getName(), condition.toText(), queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
+                         binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
+
+      syncSendRequest(request);
+   }
+
    private boolean leaveMessageReceived(Integer nodeId) throws Exception
    {
       synchronized (leftSet)
@@ -2168,7 +1951,7 @@
          BindingInfo info = (BindingInfo)iter.next();
 
          Condition condition = conditionFactory.createCondition(info.getConditionText());
-         
+
          Binding binding = this.createBinding(info.getNodeId(), condition, info.getQueueName(), info.getChannelId(),
                                               info.getFilterString(), info.isDurable(),info.isFailed());
 
@@ -2273,9 +2056,6 @@
          }
       }
    }
-
-
-
    /*
     * A new node has joined the group
     */
@@ -2312,58 +2092,311 @@
       check(theNodeId);
 
       synchronized (failoverMap)
-      {         
+      {
          //Need to evaluate this before we regenerate the failover map
-         Integer failoverNode = (Integer)failoverMap.get(theNodeId);                  
-    
+         Integer failoverNode = (Integer)failoverMap.get(theNodeId);
+
          if (failoverNode == null)
          {
             throw new IllegalStateException("Cannot find failover node for node " + theNodeId);
          }
-         
+
          //debug dump failover map
-         
+
          Iterator iter = failoverMap.entrySet().iterator();
-         
+
          log.info("Dumping failover map");
          while (iter.hasNext())
          {
             Map.Entry entry = (Map.Entry)iter.next();
-            
+
             Integer nodeId = (Integer)entry.getKey();
-            
+
             Integer failoverNodeId = (Integer)entry.getValue();
-            
+
             log.info("node->failover node: " + nodeId + " --> " + failoverNodeId);
          }
          log.info("end dump");
-         
+
          //end debug
-         
+
          boolean isFailover = failoverNode.intValue() == this.currentNodeId;
-            
+
          log.info("Am I failover node for node " + theNodeId + "? " + isFailover);
-   
+
          log.info("Crashed: " + crashed);
-   
+
          //Remove any replicant data and non durable bindings for the node - again we need to do this
          //irrespective of whether we crashed
          //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
          removeDataForNode(theNodeId);
-   
+
          if (crashed && isFailover)
          {
             //The node crashed and we are the failover node
             //so let's perform failover
-   
+
             //TODO server side valve
-   
+
             failOver(theNodeId.intValue());
          }
       }
    }
 
+   /**
+    *  Verifies changes on the View deciding if a node joined or left the cluster.
+    */
+   private void verifyMembership(View oldView, View newView) throws Throwable
+   {
+      if (oldView != null)
+      {
+         for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+         {
+            Address address = (Address)i.next();
+            if (!newView.containsMember(address))
+            {
+               nodeLeft(address);
+            }
+         }
+      }
 
+      for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+      {
+         Address address = (Address)i.next();
+         if (oldView == null || !oldView.containsMember(address))
+         {
+            nodeJoined(address);
+         }
+      }
+   }
+
+   /**
+    * This method fails over all the queues from node <failedNodeId> onto this node
+    * It is triggered when a JGroups view change occurs due to a member leaving and
+    * it's determined the member didn't leave cleanly
+    *
+    * @param failedNodeId
+    * @throws Exception
+    */
+   private void failOver(int failedNodeId) throws Exception
+   {
+      //Need to lock
+      lock.writeLock().acquire();
+
+      try
+      {
+         log.info(this.currentNodeId + " is performing failover for node " + failedNodeId);
+
+         /*
+         We make sure a FailoverStatus object is put in the replicated data for the node
+         The real failover node will always add this in.
+         This means that each node knows which node has really started the failover for another node, and
+         which node did failover for other nodes in the past
+         We cannot rely on the failoverMap for this, since that will regenerated once failover is done,
+         because of the change in membership.
+         And clients may failover after that and need to know if they have the correct node.
+         Since this is the first thing we do after detecting failover, it should be very quick that
+         all nodes know, however there is still a chance that a client tries to failover before
+         the information is replicated.
+         */
+
+         Map replicants = (Map)get(FAILED_OVER_FOR_KEY);
+
+         FailoverStatus status = (FailoverStatus)replicants.get(new Integer(currentNodeId));
+
+         if (status == null)
+         {
+            status = new FailoverStatus();
+         }
+
+         status.startFailingOverForNode(failedNodeId);
+
+         log.info("Putting state that failover is starting");
+
+         put(FAILED_OVER_FOR_KEY, status);
+
+         log.info("Put state that failover is starting");
+
+         //Get the map of queues for the failed node
+
+         Map subMaps = (Map)nameMaps.get(new Integer(failedNodeId));
+         if (subMaps==null || subMaps.size()==0)
+         {
+            log.warn("Couldn't find any binding to failOver from serverId=" +failedNodeId);
+            return;
+         }
+
+         //Compile a list of the queue names to remove
+         //Note that any non durable bindings will already have been removed (in removeDataForNode()) when the
+         //node leave was detected, so if there are any non durable bindings left here then
+         //this is an error
+
+         //We iterate through twice to avoid ConcurrentModificationException
+         ArrayList namesToRemove = new ArrayList();
+         for (Iterator iterNames = subMaps.entrySet().iterator(); iterNames.hasNext();)
+         {
+            Map.Entry entry = (Map.Entry)iterNames.next();
+
+            Binding binding = (Binding )entry.getValue();
+
+            //Sanity check
+            if (!binding.getQueue().isRecoverable())
+            {
+               throw new IllegalStateException("Find non recoverable queue in map, these should have been removed!");
+            }
+
+            //Sanity check
+            if (!binding.getQueue().isClustered())
+            {
+               throw new IllegalStateException("Queue is not clustered!: " + binding.getQueue().getName());
+            }
+
+            ClusteredQueue queue = (ClusteredQueue) binding.getQueue();
+
+            //Sanity check
+            if (queue.isLocal())
+            {
+               throw new IllegalStateException("Queue is local!: " + binding.getQueue().getName());
+            }
+            namesToRemove.add(entry);
+         }
+
+         log.info("Deleting " + namesToRemove.size() + " bindings from old node");
+
+         for (Iterator iterNames = namesToRemove.iterator(); iterNames.hasNext();)
+         {
+            Map.Entry entry = (Map.Entry)iterNames.next();
+
+            Binding binding = (Binding)entry.getValue();
+
+            RemoteQueueStub stub = (RemoteQueueStub)binding.getQueue();
+
+            String queueName = (String)entry.getKey();
+
+            //First the binding is removed from the in memory condition and name maps
+            this.removeBinding(failedNodeId, queueName);
+
+            //Then deleted from the database
+            this.deleteBinding(failedNodeId, queueName);
+
+            log.info("deleted binding for " + queueName);
+
+            //Note we do not need to send an unbind request across the cluster - this is because
+            //when the node crashes a view change will hit the other nodes and that will cause
+            //all binding data for that node to be removed anyway
+
+            //If there is already a queue registered with the same name, then we set a flag "failed" on the
+            //binding and then the queue will go into a special list of failed bindings
+            //otherwise we treat at as a normal queue
+            //This is because we cannot deal with more than one queue with the same name
+            //Any new consumers will always only connect to queues in the main name map
+            //This may mean that queues in the failed map have messages stranded in them if consumers
+            //disconnect (since no more can reconnect)
+            //However we message redistribution activated other queues will be able to consume from them.
+            //TODO allow message redistribution for queues in the failed list
+            boolean failed = this.internalGetBindingForQueueName(queueName) != null;
+
+            if (!failed)
+            {
+               log.info("The current node didn't have a queue " + queueName + " so it's assuming the queue as a regular queue");
+            }
+            else
+            {
+               log.info("There is already a queue with that name so adding to failed map");
+            }
+
+            //Create a new binding
+            Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
+                                                    stub.getName(), stub.getChannelID(),
+                                                    stub.getFilter(), stub.isRecoverable(), failed);
+
+            log.info("Created new binding");
+
+            //Insert it into the database
+            insertBinding(newBinding);
+
+            LocalClusteredQueue clusteredQueue = (LocalClusteredQueue )newBinding.getQueue();
+
+            clusteredQueue.deactivate();
+            clusteredQueue.load();
+            clusteredQueue.activate();
+
+            log.info("Loaded queue");
+
+            //Add the new binding in memory
+            addBinding(newBinding);
+
+            //Send a bind request so other nodes add it too
+            sendBindRequest(binding.getCondition(), clusteredQueue,newBinding);
+
+            //FIXME there is a problem in the above code.
+            //If the server crashes between deleting the binding from the database
+            //and creating the new binding in the database, then the binding will be completely
+            //lost from the database when the server is resurrected.
+            //To remedy, both db operations need to be done in the same JBDC tx
+         }
+
+         log.info("Server side fail over is now complete");
+
+         //TODO - should this be in a finally? I'm not sure
+         status.finishFailingOver();
+
+         log.info("Putting state that failover has completed");
+         put(FAILED_OVER_FOR_KEY, status);
+         log.info("Put state that failover has completed");
+      }
+      finally
+      {
+         lock.writeLock().release();
+      }
+   }
+
+   private void addIntoFailedMaps(Binding binding)
+   {
+      Map channelMap = (Map)failedBindings.get(new Integer(binding.getNodeId()));
+
+      if (channelMap == null)
+      {
+         channelMap = new LinkedHashMap();
+
+         failedBindings.put(new Integer(binding.getNodeId()), channelMap);
+      }
+
+      channelMap.put(new Long(binding.getQueue().getChannelID()), binding);
+   }
+
+   private void sendJMXNotification(String notificationType)
+   {
+      Notification n = new Notification(notificationType, "", 0l);
+      nbSupport.sendNotification(n);
+   }
+
+   private void handleViewAccepted(View newView)
+   {
+      //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+      //TODO:     can't we do the same since this is pretty useful?
+      log.info(currentNodeId  + " got new view: " + newView + " postOffice:"
+               + DefaultClusteredPostOffice.this.getOfficeName());
+
+      // JGroups will make sure this method is never called by more than one thread concurrently
+
+      View oldView = currentView;
+      currentView = newView;
+
+      try
+      {
+         verifyMembership(oldView, newView);
+         sendJMXNotification(VIEW_CHANGED_NOTIFICATION);
+      }
+      catch (Throwable e)
+      {
+         log.error("Caught Exception in MembershipListener", e);
+         IllegalStateException e2 = new IllegalStateException(e.getMessage());
+         e2.setStackTrace(e.getStackTrace());
+         throw e2;
+      }
+   }
+
    // Inner classes -------------------------------------------------------------------
 
    /*
@@ -2462,10 +2495,11 @@
       {
          try
          {
-            //We queue up changes and execute them asynchronously.
-            //This is because JGroups will not let us do stuff like send synch messages
-            //using the same thread that delivered the view change and this is what we need to
-            //do in failover, for example.
+            // We queue up changes and execute them asynchronously.
+            // This is because JGroups will not let us do stuff like send synch messages using the
+            // same thread that delivered the view change and this is what we need to do in
+            // failover, for example.
+
             viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
          }
          catch (InterruptedException e)
@@ -2480,41 +2514,16 @@
          return null;
       }
    }
-   
-   private void handleViewAccepted(View newView)
-   {
-      //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
-      //TODO:     can't we do the same since this is pretty useful?
-      log.info(currentNodeId  + " got new view: " + newView + " postOffice:"
-               + DefaultClusteredPostOffice.this.getOfficeName());
 
-      // JGroups will make sure this method is never called by more than one thread concurrently
-
-      View oldView = currentView;
-      currentView = newView;
-
-      try
-      {
-         verifyMembership(oldView, newView);
-      }
-      catch (Throwable e)
-      {
-         log.error("Caught Exception in MembershipListener", e);
-         IllegalStateException e2 = new IllegalStateException(e.getMessage());
-         e2.setStackTrace(e.getStackTrace());
-         throw e2;
-      }
-   }
-
    private class HandleViewAcceptedRunnable implements Runnable
    {
       private View newView;
-      
+
       HandleViewAcceptedRunnable(View newView)
       {
          this.newView = newView;
       }
-      
+
       public void run()
       {
          handleViewAccepted(newView);

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/Peer.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -6,8 +6,10 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
+import javax.management.NotificationBroadcaster;
 import java.util.Set;
 
+
 /**
  * Group management interface.
  *
@@ -15,10 +17,12 @@
  * @version <tt>$Revision$</tt>
  * $Id$
  */
-public interface Peer
+public interface Peer extends NotificationBroadcaster
 {
    /**
     * Returns a set of nodeIDs (integers) representing the IDs of cluster's nodes.
     */
    Set getNodeIDView();
+
+
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/GroupManagementTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -8,9 +8,15 @@
 
 import org.jboss.test.messaging.MessagingTestCase;
 import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
 
+import javax.management.NotificationListener;
+import javax.management.Notification;
+import javax.management.ObjectName;
 import java.util.Set;
 
+import EDU.oswego.cs.dl.util.concurrent.Slot;
+
 /**
  * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @version <tt>$Revision$</tt>
@@ -51,6 +57,44 @@
       }
    }
 
+   public void testJoinNotification() throws Exception
+   {
+      ViewChangeNotificationListener listener = new ViewChangeNotificationListener();
+      ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
+
+      try
+      {
+         ServerManagement.start("all", 0);
+
+         log.info("Server 0 started");
+
+         ServerManagement.addNotificationListener(0, postOfficeObjectName, listener);
+
+         log.info("NotificationListener added to server 0");
+
+         ServerManagement.start("all", 1);
+
+         log.info("Blocking to receive notification ...");
+
+         if (!listener.viewChanged(120000))
+         {
+            fail("Did not receive view change!");
+         }
+
+         Set view = ServerManagement.getServer(1).getNodeIDView();
+
+         assertEquals(2, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+      }
+      finally
+      {
+         ServerManagement.removeNotificationListener(0, postOfficeObjectName, listener);
+         ServerManagement.stop(1);
+         ServerManagement.stop(0);
+      }
+   }
+
    public void testTwoNodesCluster() throws Exception
    {
       try
@@ -146,11 +190,196 @@
 
    public void testCleanLeave() throws Exception
    {
+      try
+      {
+         // Start with a 3 node cluster
 
+         ServerManagement.start("all", 0);
+         ServerManagement.start("all", 1);
+         ServerManagement.start("all", 2);
+
+         Set view = ServerManagement.getServer(0).getNodeIDView();
+
+         assertEquals(3, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+         assertTrue(view.contains(new Integer(2)));
+
+         // Make node 0 to "cleanly" leave the cluster
+
+         ServerManagement.stop(0);
+
+         view = ServerManagement.getServer(1).getNodeIDView();
+
+         assertEquals(2, view.size());
+         assertTrue(view.contains(new Integer(1)));
+         assertTrue(view.contains(new Integer(2)));
+
+         // Make node 2 to "cleanly" leave the cluster
+
+         ServerManagement.stop(2);
+
+         view = ServerManagement.getServer(1).getNodeIDView();
+
+         assertEquals(1, view.size());
+         assertTrue(view.contains(new Integer(1)));
+
+         // Reuse the "hollow" RMI server 0 to start another cluster node
+
+         ServerManagement.start("all", 0);
+
+         view = ServerManagement.getServer(0).getNodeIDView();
+
+         assertEquals(2, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+
+
+         // Reuse the "hollow" RMI server 2 to start another cluster node
+
+         ServerManagement.start("all", 2);
+
+         view = ServerManagement.getServer(2).getNodeIDView();
+
+         assertEquals(3, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+         assertTrue(view.contains(new Integer(2)));
+
+      }
+      finally
+      {
+         ServerManagement.stop(2);
+         ServerManagement.stop(1);
+         ServerManagement.stop(0);
+      }
    }
 
+   public void testDirtyLeaveOneNode() throws Exception
+   {
+      ViewChangeNotificationListener viewChange = new ViewChangeNotificationListener();
+      ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
 
+      try
+      {
+         // Start with a 2 node cluster
 
+         ServerManagement.start("all", 0);
+         ServerManagement.start("all", 1);
+
+         Set view = ServerManagement.getServer(0).getNodeIDView();
+
+         assertEquals(2, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+
+         ServerManagement.addNotificationListener(0, postOfficeObjectName, viewChange);
+
+         // Make node 1 to "dirty" leave the cluster, by killing the VM running it.
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED 1");
+         log.info("########");
+
+         // Wait for membership change notification
+
+         if (!viewChange.viewChanged(120000))
+         {
+            fail("Did not receive view change after killing server 2!");
+         }
+
+         view = ServerManagement.getServer(0).getNodeIDView();
+
+         assertEquals(1, view.size());
+         assertTrue(view.contains(new Integer(0)));
+      }
+      finally
+      {
+         ServerManagement.removeNotificationListener(0, postOfficeObjectName, viewChange);
+
+         ServerManagement.stop(1);
+         ServerManagement.stop(0);
+      }
+   }
+
+   public void testDirtyLeaveTwoNodes() throws Exception
+   {
+      ViewChangeNotificationListener viewChange = new ViewChangeNotificationListener();
+      ObjectName postOfficeObjectName = new ObjectName("jboss.messaging:service=PostOffice");
+
+      try
+      {
+         // Start with a 3 node cluster
+
+         ServerManagement.start("all", 0);
+         ServerManagement.start("all", 1);
+         ServerManagement.start("all", 2);
+
+         Set view = ServerManagement.getServer(0).getNodeIDView();
+
+         assertEquals(3, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+         assertTrue(view.contains(new Integer(2)));
+
+         ServerManagement.addNotificationListener(0, postOfficeObjectName, viewChange);
+
+         // Make node 2 to "dirty" leave the cluster, by killing the VM running it.
+
+         ServerManagement.kill(2);
+
+         log.info("########");
+         log.info("######## KILLED 2");
+         log.info("########");
+
+         // Wait for membership change notification
+
+         if (!viewChange.viewChanged(120000))
+         {
+            fail("Did not receive view change after killing server 2!");
+         }
+
+         view = ServerManagement.getServer(1).getNodeIDView();
+
+         assertEquals(2, view.size());
+         assertTrue(view.contains(new Integer(0)));
+         assertTrue(view.contains(new Integer(1)));
+
+         // Make node 1 to "dirty" leave the cluster, by killing the VM running it.
+
+         ServerManagement.kill(1);
+
+         log.info("########");
+         log.info("######## KILLED 1");
+         log.info("########");
+
+         // Wait for membership change notification
+
+         if (!viewChange.viewChanged(120000))
+         {
+            fail("Did not receive view change after killing server 1!");
+         }
+
+         view = ServerManagement.getServer(0).getNodeIDView();
+
+         assertEquals(1, view.size());
+         assertTrue(view.contains(new Integer(0)));
+
+      }
+      finally
+      {
+         ServerManagement.removeNotificationListener(0, postOfficeObjectName, viewChange);
+
+         ServerManagement.stop(2);
+         ServerManagement.stop(1);
+         ServerManagement.stop(0);
+      }
+   }
+
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -169,4 +398,45 @@
 
    // Inner classes -------------------------------------------------
 
+   private class ViewChangeNotificationListener implements NotificationListener
+   {
+      private Slot slot;
+
+      ViewChangeNotificationListener()
+      {
+         slot = new Slot();
+      }
+
+      public void handleNotification(Notification notification, Object object)
+      {
+
+         if (!ClusteredPostOffice.VIEW_CHANGED_NOTIFICATION.equals(notification.getType()))
+         {
+            // ignore it
+            return;
+         }
+
+         log.info("received VIEW_CHANGED notification");
+
+         try
+         {
+            slot.put(Boolean.TRUE);
+         }
+         catch(InterruptedException e)
+         {
+            log.error(e);
+         }
+      }
+
+      public boolean viewChanged(long timeout) throws InterruptedException
+      {
+         Boolean result = (Boolean)slot.poll(timeout);
+         if (result == null)
+         {
+            return false;
+         }
+         return result.booleanValue();
+      }
+   }
+
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -38,7 +38,6 @@
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.message.MessageProxy;
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -503,8 +502,8 @@
          
          log.info("************ KILLING (CRASHING) SERVER 1");
          
-         ServerManagement.getServer(1).destroy();
-         
+         ServerManagement.getServer(1).kill();
+
          log.info("killed server, now waiting");
          
          Thread.sleep(5000);
@@ -837,28 +836,28 @@
    
    // Private -------------------------------------------------------
    
-   private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
-   {
-      MessageProxy message = (MessageProxy) consumer.receive(3000);
-      TextMessage txtMessage = (TextMessage) message;
-      if (message != null)
-      {
-         log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
-      } else
-      {
-         log.info(text + ": Message received was null");
-      }
-      if (shouldAssert)
-      {
-         if (shouldBeNull)
-         {
-            assertNull(message);
-         } else
-         {
-            assertNotNull(message);
-         }
-      }
-   }
+//   private void receiveMessage(String text, MessageConsumer consumer, boolean shouldAssert, boolean shouldBeNull) throws Exception
+//   {
+//      MessageProxy message = (MessageProxy) consumer.receive(3000);
+//      TextMessage txtMessage = (TextMessage) message;
+//      if (message != null)
+//      {
+//         log.info(text + ": messageID from messageReceived=" + message.getMessage().getMessageID() + " message = " + message + " content=" + txtMessage.getText());
+//      } else
+//      {
+//         log.info(text + ": Message received was null");
+//      }
+//      if (shouldAssert)
+//      {
+//         if (shouldBeNull)
+//         {
+//            assertNull(message);
+//         } else
+//         {
+//            assertNotNull(message);
+//         }
+//      }
+//   }
    
    // Inner classes -------------------------------------------------
    

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/SimpleClusteringTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -103,7 +103,7 @@
    
 //   public void testKill() throws Exception
 //   {
-//      ServerManagement.getServer(0).destroy();
+//      ServerManagement.getServer(0).kill();
 //   }
 
    public void testDistributedTopic() throws Exception

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/CallbackFailureTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -116,7 +116,7 @@
       
       String remotingSessionId = (String)remoteServer.executeCommand(command);
       
-      remoteServer.destroy();
+      remoteServer.kill();
         
       //we have removed the exception listener so the server side resouces shouldn't be cleared up
       

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashLargeLeaseTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -115,7 +115,7 @@
       
       // Now we should have a client connection from the remote server to the local server
       
-      remoteServer.destroy();
+      remoteServer.kill();
       log.trace("killed remote server");
         
       // Wait for connection resources to be cleared up

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashNegativeLeaseTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -117,7 +117,7 @@
       
       // Now we should have a client connection from the remote server to the local server
       
-      remoteServer.destroy();
+      remoteServer.kill();
       log.trace("killed remote server");
         
       // Wait for connection resources to be cleared up

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -119,7 +119,7 @@
       
       // Now we should have a client connection from the remote server to the local server
       
-      remoteServer.destroy();
+      remoteServer.kill();
       log.trace("killed remote server");
         
       // Wait for connection resources to be cleared up

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashTwoConnectionsTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -121,7 +121,7 @@
       log.info("we have = " + ((SimpleConnectionManager)cm).getClients().size() + " clients registered on SimpleconnectionManager");
       
       // Now we should have a client connection from the remote server to the local server
-      remoteServer.destroy();
+      remoteServer.kill();
       log.info("killed remote server");
         
       // Wait for connection resources to be cleared up

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/crash/ClientCrashZeroLeaseTest.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -117,7 +117,7 @@
       
       // Now we should have a client connection from the remote server to the local server
       
-      remoteServer.destroy();
+      remoteServer.kill();
       log.trace("killed remote server");
         
       // Wait for connection resources to be cleared up

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -24,7 +24,13 @@
 import java.rmi.Naming;
 import java.util.Hashtable;
 import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.Iterator;
 import javax.management.ObjectName;
+import javax.management.NotificationListener;
+import javax.management.Notification;
 import javax.transaction.UserTransaction;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.jms.server.DestinationManager;
@@ -35,6 +41,7 @@
 import org.jboss.test.messaging.tools.jmx.rmi.LocalTestServer;
 import org.jboss.test.messaging.tools.jmx.rmi.RMITestServer;
 import org.jboss.test.messaging.tools.jmx.rmi.Server;
+import org.jboss.test.messaging.tools.jmx.rmi.NotificationListenerID;
 import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
 import org.jboss.test.messaging.tools.jndi.RemoteInitialContextFactory;
 
@@ -73,6 +80,9 @@
 
    private static Server[] servers = new Server[MAX_SERVER_COUNT];
 
+   // Map<NotificationListener - NotificationListenerPoller>
+   private static Map notificationListenerPollers = new HashMap();
+
    public static boolean isLocal()
    {
       return !"true".equals(System.getProperty("remote"));
@@ -171,17 +181,45 @@
 
    public static synchronized void stop(int index) throws Exception
    {
-      insureStarted(index);
+      if (servers[index] == null)
+      {
+         log.warn("Server " + index + " has not been created, so it cannot be stopped");
+         return;
+      }
+
+      if (!servers[index].isStarted())
+      {
+         log.warn("Server " + index + " either has not been started, or it is stopped already");
+         return;
+      }
+
       servers[index].stop();
    }
 
+   /**
+    * TODO - this methods should be removed, to not be confused with kill(index)
+    * @deprecated
+    */
    public static synchronized void destroy() throws Exception
    {
       stop();
+      servers[0].kill();
+      servers[0] = null;
+   }
 
-      servers[0].destroy();
+   /**
+    * Abruptly kills the VM running the specified server.
+    */
+   public static synchronized void kill(int index) throws Exception
+   {
+      if (servers[index] == null)
+      {
+         log.warn("Server " + index + " has not been created, so it cannot be killed");
+         return;
+      }
 
-      servers[0] = null;
+      servers[index].kill();
+      servers[index] = null;
    }
 
    public static void disconnect() throws Exception
@@ -224,6 +262,59 @@
       return servers[0].invoke(on, operationName, params, signature);
    }
 
+   public static void addNotificationListener(int serverIndex, ObjectName on,
+                                              NotificationListener listener) throws Exception
+   {
+      insureStarted(serverIndex);
+
+      if (isLocal())
+      {
+         // add the listener directly to the server
+         servers[serverIndex].addNotificationListener(on, listener);
+      }
+      else
+      {
+         // is remote, need to poll
+         NotificationListenerPoller p =
+            new NotificationListenerPoller((Server)servers[serverIndex], on, listener);
+
+         synchronized(notificationListenerPollers)
+         {
+            notificationListenerPollers.put(listener, p);
+         }
+
+         new Thread(p, "Poller for " + Integer.toHexString(p.hashCode())).start();
+      }
+   }
+
+   public static void removeNotificationListener(int serverIndex, ObjectName on,
+                                                 NotificationListener listener) throws Exception
+   {
+      insureStarted(serverIndex);
+
+      if (isLocal())
+      {
+         // remove the listener directly
+         servers[serverIndex].removeNotificationListener(on, listener);
+      }
+      else
+      {
+         // is remote
+
+         NotificationListenerPoller p = null;
+         synchronized(notificationListenerPollers)
+         {
+            p = (NotificationListenerPoller)notificationListenerPollers.remove(listener);
+         }
+
+         if (p != null)
+         {
+            // stop the polling thread
+            p.stop();
+         }
+      }
+   }
+
    public static Set query(ObjectName pattern) throws Exception
    {
       insureStarted();
@@ -645,7 +736,9 @@
          {
             log.info("trying to connect to the remote RMI server " + index + 
                      (attempt == 1 ? "" : ", attempt " + attempt));
+
             s = (Server)Naming.lookup(name);
+
             log.info("connected to the remote server");
          }
          catch(Exception e)
@@ -805,4 +898,63 @@
 //         }
 //      }
 //   }
+
+   private static long listenerIDCounter = 0;
+
+   static class NotificationListenerPoller implements Runnable
+   {
+      public static final int POLL_INTERVAL = 500;
+
+      private long id;
+      private Server server;
+      private NotificationListener listener;
+      private volatile boolean running;
+
+      private synchronized static long generateID()
+      {
+         return listenerIDCounter++;
+      }
+
+      NotificationListenerPoller(Server server, ObjectName on, NotificationListener listener)
+         throws Exception
+      {
+         id = generateID();
+         this.server = server;
+
+         server.addNotificationListener(on, new NotificationListenerID(id));
+
+         this.listener = listener;
+         this.running = true;
+      }
+
+      public void run()
+      {
+         while(running)
+         {
+            try
+            {
+               List notifications = server.pollNotificationListener(id);
+
+               for(Iterator i = notifications.iterator(); i.hasNext(); )
+               {
+                  Notification n = (Notification)i.next();
+                  listener.handleNotification(n, null);
+               }
+
+               Thread.sleep(POLL_INTERVAL);
+            }
+            catch(Exception e)
+            {
+               log.error(e);
+               stop();
+            }
+         }
+      }
+
+      public void stop()
+      {
+         running = false;
+      }
+   }
+
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -45,6 +45,7 @@
 import javax.management.MBeanServer;
 import javax.management.MBeanServerFactory;
 import javax.management.ObjectName;
+import javax.management.NotificationListener;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NameNotFoundException;
@@ -704,6 +705,18 @@
       return mbeanServer.getAttribute(on, name);
    }
 
+   public void addNotificationListener(ObjectName on, NotificationListener listener)
+      throws Exception
+   {
+      mbeanServer.addNotificationListener(on, listener, null, null);
+   }
+
+   public void removeNotificationListener(ObjectName on, NotificationListener listener)
+      throws Exception
+   {
+      mbeanServer.removeNotificationListener(on, listener);
+   }
+
    public void bindDefaultJMSProvider() throws Exception
    {
       JNDIProviderAdapter pa = new JNDIProviderAdapter();

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/LocalTestServer.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -30,6 +30,7 @@
 import javax.jms.Queue;
 import javax.jms.Topic;
 import javax.management.ObjectName;
+import javax.management.NotificationListener;
 import javax.transaction.UserTransaction;
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.ServerPeer;
@@ -170,9 +171,9 @@
       }
    }
 
-   public synchronized void destroy() throws Exception
+   public synchronized void kill() throws Exception
    {
-      stop();
+      throw new IllegalStateException("Cannot KILL a local server. Consider using stop() instead.");
    }
 
    public ObjectName deploy(String mbeanConfiguration) throws Exception
@@ -203,6 +204,18 @@
       return sc.invoke(on, operationName, params, signature);
    }
 
+   public void addNotificationListener(ObjectName on, NotificationListener listener)
+      throws Exception
+   {
+      sc.addNotificationListener(on, listener);
+   }
+
+   public void removeNotificationListener(ObjectName on, NotificationListener listener)
+      throws Exception
+   {
+      sc.removeNotificationListener(on, listener);
+   }
+
    public Set query(ObjectName pattern) throws Exception
    {
       return sc.query(pattern);
@@ -808,6 +821,12 @@
       return (Set)sc.getAttribute(postOfficeObjectName, "NodeIDView");
    }
 
+   public List pollNotificationListener(long listenerID) throws Exception
+   {
+      throw new IllegalStateException("Poll doesn't make sense on a local server. " +
+                                      "Register listeners directly instead.");
+   }
+
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/NotificationListenerID.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -0,0 +1,58 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.tools.jmx.rmi;
+
+import javax.management.NotificationListener;
+import javax.management.Notification;
+import java.io.Serializable;
+
+/**
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class NotificationListenerID implements Serializable, NotificationListener
+{
+   // Constants -----------------------------------------------------
+
+   private static final long serialVersionUID = -39839086486546L;
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private long id;
+
+   // Constructors --------------------------------------------------
+
+   public NotificationListenerID(long id)
+   {
+      this.id = id;
+   }
+
+   // NotificationListener implementation ---------------------------
+
+   public void handleNotification(Notification notification, Object object)
+   {
+      throw new IllegalStateException("Do not use this method directly!");
+   }
+
+   // Public --------------------------------------------------------
+
+   public long getID()
+   {
+      return id;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Added: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/ProxyNotificationListener.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -0,0 +1,67 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.tools.jmx.rmi;
+
+import javax.management.NotificationListener;
+import javax.management.Notification;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Stores notifications until they're transferred to the remote client.
+ *
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision$</tt>
+ * $Id$
+ */
+public class ProxyNotificationListener implements NotificationListener
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private List notifications;
+
+   // Constructors --------------------------------------------------
+
+   ProxyNotificationListener()
+   {
+      notifications = new ArrayList();
+   }
+
+   // NotificationListener implementation ---------------------------
+
+   public synchronized void handleNotification(Notification notification, Object object)
+   {
+      notifications.add(notification);
+   }
+
+   // Public --------------------------------------------------------
+
+   public synchronized List drain()
+   {
+      if (notifications.size() == 0)
+      {
+         return Collections.EMPTY_LIST;
+      }
+
+      List old = notifications;
+      notifications = new ArrayList();
+      return old;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/RMITestServer.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -25,7 +25,12 @@
 import java.rmi.registry.Registry;
 import java.rmi.server.UnicastRemoteObject;
 import java.util.Set;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collections;
 import javax.management.ObjectName;
+import javax.management.NotificationListener;
 import javax.transaction.UserTransaction;
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.ServerPeer;
@@ -46,24 +51,20 @@
  */
 public class RMITestServer extends UnicastRemoteObject implements Server
 {
-   private static final long serialVersionUID = -368445344011004778L;
+   // Constants -----------------------------------------------------
 
+   public static final String RMI_SERVER_PREFIX = "messaging_rmi_server_";
+   public static final String NAMING_SERVER_PREFIX = "naming_rmi_server_";
+
    public static final int DEFAULT_REGISTRY_PORT = 22555;
    public static final int DEFAULT_SERVER_INDEX = 0;
    public static final String DEFAULT_SERVER_HOST = "localhost";
 
+   private static final long serialVersionUID = -368445344011004778L;
    private static final Logger log = Logger.getLogger(RMITestServer.class);
 
-   protected RemoteTestServer server;
+   // Static --------------------------------------------------------
 
-   private RMINamingDelegate namingDelegate;
-
-
-   public static final String RMI_SERVER_PREFIX = "messaging_rmi_server_";
-   public static final String NAMING_SERVER_PREFIX = "naming_rmi_server_";
-
-   private static Registry registry;
-
    public static void main(String[] args) throws Exception
    {
       log.debug("initializing RMI runtime");
@@ -92,6 +93,7 @@
       // let RMI know the bind address
       System.setProperty("java.rmi.server.hostname", host);
 
+      Registry registry;
 
       // try to bind first
       try
@@ -115,123 +117,150 @@
       log.info("RMI server " + serverIndex + " bound");
    }
 
-   public class VMKiller implements Runnable
-   {
-      public void run()
-      {
-         log.info("shutting down the VM");
+   // Attributes ----------------------------------------------------
 
-         try
-         {
-            Thread.sleep(250);
-         }
-         catch(Exception e)
-         {
-            log.warn("interrupted while sleeping", e);
-         }
+   protected RemoteTestServer server;
+   private RMINamingDelegate namingDelegate;
+   // Map<Long-ProxyNotificationListener>
+   private Map proxyListeners;
 
-         System.exit(0);
-      }
-   }
+   // Constructors --------------------------------------------------
 
    public RMITestServer(int index) throws Exception
    {
       namingDelegate = new RMINamingDelegate(index);
       server = new RemoteTestServer(index);
+      proxyListeners = new HashMap();
    }
 
-   public void configureSecurityForDestination(String destName, String config) throws Exception
+   // Server implementation -----------------------------------------
+
+   public void start(String containerConfig) throws Exception
    {
-      server.configureSecurityForDestination(destName, config);
+      server.start(containerConfig);
    }
 
-   public ObjectName deploy(String mbeanConfiguration) throws Exception
+   public void stop() throws Exception
    {
-      return server.deploy(mbeanConfiguration);
+      server.stop();
+      namingDelegate.reset();
    }
 
-   public void deployQueue(String name, String jndiName, boolean clustered) throws Exception
+   public synchronized void kill() throws Exception
    {
-      server.deployQueue(name, jndiName, clustered);
+      // Kills the server without doing any graceful shutdown. For graceful shutdown use stop().
+      new Thread(new VMKiller(), "VM Killer").start();
    }
 
-   public void deployTopic(String name, String jndiName, boolean clustered) throws Exception
+   public ObjectName deploy(String mbeanConfiguration) throws Exception
    {
-      server.deployTopic(name, jndiName, clustered);
+      return server.deploy(mbeanConfiguration);
    }
-   
-   public void deployQueue(String name,
-                           String jndiName,
-                           int fullSize,
-                           int pageSize,
-                           int downCacheSize,
-                           boolean clustered) throws Exception
+
+   public void undeploy(ObjectName on) throws Exception
    {
-      server.deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+      server.undeploy(on);
    }
 
-   public void createQueue(String name, String jndiName) throws Exception
+   public Object getAttribute(ObjectName on, String attribute) throws Exception
    {
-      server.createQueue(name, jndiName);
+      return server.getAttribute(on, attribute);
    }
 
-   public void deployTopic(String name,
-                           String jndiName,
-                           int fullSize,
-                           int pageSize,
-                           int downCacheSize,
-                           boolean clustered) throws Exception
+   public void setAttribute(ObjectName on, String name, String valueAsString) throws Exception
    {
-      server.deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+      server.setAttribute(on, name, valueAsString);
    }
 
-   public void createTopic(String name, String jndiName) throws Exception
+   public Object invoke(ObjectName on, String operationName, Object[] params, String[] signature)
+      throws Exception
    {
-      server.createTopic(name, jndiName);
+      return server.invoke(on, operationName, params, signature);
    }
 
-   public void deployConnectionFactory(String objectName, String[] jndiBindings)
+   public void addNotificationListener(ObjectName on, NotificationListener listener)
       throws Exception
    {
-      server.deployConnectionFactory(objectName, jndiBindings);
+      if (!(listener instanceof NotificationListenerID))
+      {
+         throw new IllegalArgumentException("A RMITestServer can only handle NotificationListenerIDs!");
+      }
+
+      long id = ((NotificationListenerID)listener).getID();
+
+      ProxyNotificationListener pl = new ProxyNotificationListener();
+
+      synchronized(proxyListeners)
+      {
+         proxyListeners.put(new Long(id), pl);
+      }
+
+      server.addNotificationListener(on, pl);
    }
-   
-   public void deployConnectionFactory(String objectName, String[] jndiBindings, int prefetchSize)
+
+   public void removeNotificationListener(ObjectName on, NotificationListener listener)
       throws Exception
    {
-      server.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
+
+      if (!(listener instanceof NotificationListenerID))
+      {
+         throw new IllegalArgumentException("A RMITestServer can only handle NotificationListenerIDs!");
+      }
+
+      long id = ((NotificationListenerID)listener).getID();
+
+      ProxyNotificationListener pl = null;
+
+      synchronized(proxyListeners)
+      {
+         pl = (ProxyNotificationListener)proxyListeners.remove(new Long(id));
+      }
+
+      server.removeNotificationListener(on, pl);
    }
 
-   public void deployConnectionFactory(String objectName,
-                                       String[] jndiBindings,
-                                       int prefetchSize,
-                                       int defaultTempQueueFullSize,
-                                       int defaultTempQueuePageSize,
-                                       int defaultTempQueueDownCacheSize) throws Exception
+   public Set query(ObjectName pattern) throws Exception
    {
-      server.deployConnectionFactory(objectName, jndiBindings, prefetchSize,
-               defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
+      return server.query(pattern);
    }
 
-   public void undeployConnectionFactory(ObjectName objectName) throws Exception
+   public String getDatabaseType()
    {
-      server.undeployConnectionFactory(objectName);
+      return server.getDatabaseType();
    }
 
-   public synchronized void destroy() throws Exception
+   public void log(int level, String text) throws Exception
    {
-      //Kill the server without doing any graceful shutdown
-      
-      //For graceful shutdown use stop()
-      
-      new Thread(new VMKiller(), "VM Killer").start();
+      server.log(level, text);
    }
 
-   public Object getAttribute(ObjectName on, String attribute) throws Exception
+   public void startServerPeer(int serverPeerID, String defaultQueueJNDIContext,
+                               String defaultTopicJNDIContext, boolean clustered) throws Exception
    {
-      return server.getAttribute(on, attribute);
+      server.
+         startServerPeer(serverPeerID, defaultQueueJNDIContext, defaultTopicJNDIContext, clustered);
    }
 
+   public void stopServerPeer() throws Exception
+   {
+      server.stopServerPeer();
+   }
+
+   public boolean isServerPeerStarted() throws Exception
+   {
+      return server.isServerPeerStarted();
+   }
+
+   public ObjectName getServerPeerObjectName() throws Exception
+   {
+      return server.getServerPeerObjectName();
+   }
+
+   public boolean isStarted() throws Exception
+   {
+      return server.isStarted();
+   }
+
    public Set getConnectorSubsystems() throws Exception
    {
       return server.getConnectorSubsystems();
@@ -248,9 +277,9 @@
       server.removeServerInvocationHandler(subsystem);
    }
 
-   public String getDefaultSecurityConfig() throws Exception
+   public MessageStore getMessageStore() throws Exception
    {
-      return server.getDefaultSecurityConfig();
+      return server.getMessageStore();
    }
 
    public DestinationManager getDestinationManager() throws Exception
@@ -258,106 +287,122 @@
       return server.getDestinationManager();
    }
 
-   public MessageStore getMessageStore() throws Exception
+   public PersistenceManager getPersistenceManager() throws Exception
    {
-      return server.getMessageStore();
+      return server.getPersistenceManager();
    }
 
-   public PersistenceManager getPersistenceManager() throws Exception
+   public PostOffice getQueuePostOffice() throws Exception
    {
-      return server.getPersistenceManager();
+      return server.getQueuePostOffice();
    }
-   
-   public ObjectName getServerPeerObjectName() throws Exception
+
+   public PostOffice getTopicPostOffice() throws Exception
    {
-      return server.getServerPeerObjectName();
+      return server.getTopicPostOffice();
    }
 
-   public Object invoke(ObjectName on, String operationName, Object[] params, String[] signature) throws Exception
+   public ServerPeer getServerPeer() throws Exception
    {
-      return server.invoke(on, operationName, params, signature);
+      return server.getServerPeer();
    }
 
-   public boolean isServerPeerStarted() throws Exception
+   public void deployTopic(String name, String jndiName, boolean clustered) throws Exception
    {
-      return server.isServerPeerStarted();
+      server.deployTopic(name, jndiName, clustered);
    }
 
-   public boolean isStarted() throws Exception
+   public void deployTopic(String name,
+                           String jndiName,
+                           int fullSize,
+                           int pageSize,
+                           int downCacheSize,
+                           boolean clustered) throws Exception
    {
-      return server.isStarted();
+      server.deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
    }
 
-   public void log(int level, String text) throws Exception
+   public void createTopic(String name, String jndiName) throws Exception
    {
-      server.log(level, text);
+      server.createTopic(name, jndiName);
    }
 
-   public Set query(ObjectName pattern) throws Exception
+   public void deployQueue(String name, String jndiName, boolean clustered) throws Exception
    {
-      return server.query(pattern);
+      server.deployQueue(name, jndiName, clustered);
    }
 
-   public String getDatabaseType()
+   public void deployQueue(String name,
+                           String jndiName,
+                           int fullSize,
+                           int pageSize,
+                           int downCacheSize,
+                           boolean clustered) throws Exception
    {
-      return server.getDatabaseType();
+      server.deployQueue(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
    }
 
-   public void setAttribute(ObjectName on, String name, String valueAsString) throws Exception
+   public void createQueue(String name, String jndiName) throws Exception
    {
-      server.setAttribute(on, name, valueAsString);
+      server.createQueue(name, jndiName);
    }
 
-   public void setDefaultSecurityConfig(String config) throws Exception
+   public void undeployDestination(boolean isQueue, String name) throws Exception
    {
-      server.setDefaultSecurityConfig(config);
+      server.undeployDestination(isQueue, name);
    }
 
-   public void start(String containerConfig) throws Exception
+   public boolean destroyDestination(boolean isQueue, String name) throws Exception
    {
-      server.start(containerConfig);
+      return server.destroyDestination(isQueue, name);
    }
 
-   public void startServerPeer(int serverPeerID, String defaultQueueJNDIContext,
-                               String defaultTopicJNDIContext, boolean clustered) throws Exception
+   public void deployConnectionFactory(String objectName, String[] jndiBindings)
+      throws Exception
    {
-      server.startServerPeer(serverPeerID, defaultQueueJNDIContext, defaultTopicJNDIContext, clustered);
+      server.deployConnectionFactory(objectName, jndiBindings);
    }
 
-   public void stop() throws Exception
+   public void deployConnectionFactory(String objectName, String[] jndiBindings, int prefetchSize)
+      throws Exception
    {
-      server.stop();
-      namingDelegate.reset();
+      server.deployConnectionFactory(objectName, jndiBindings, prefetchSize);
    }
 
-   public void stopServerPeer() throws Exception
+   public void deployConnectionFactory(String objectName,
+                                       String[] jndiBindings,
+                                       int prefetchSize,
+                                       int defaultTempQueueFullSize,
+                                       int defaultTempQueuePageSize,
+                                       int defaultTempQueueDownCacheSize) throws Exception
    {
-      server.stopServerPeer();
+      server.deployConnectionFactory(objectName, jndiBindings, prefetchSize,
+               defaultTempQueueFullSize, defaultTempQueuePageSize, defaultTempQueueDownCacheSize);
    }
 
-   public void undeploy(ObjectName on) throws Exception
+   public void undeployConnectionFactory(ObjectName objectName) throws Exception
    {
-      server.undeploy(on);
+      server.undeployConnectionFactory(objectName);
    }
 
-   public void undeployDestination(boolean isQueue, String name) throws Exception
+   public void configureSecurityForDestination(String destName, String config) throws Exception
    {
-      server.undeployDestination(isQueue, name);
+      server.configureSecurityForDestination(destName, config);
    }
 
-   public boolean destroyDestination(boolean isQueue, String name) throws Exception
+   public void setDefaultSecurityConfig(String config) throws Exception
    {
-      return server.destroyDestination(isQueue, name);
+      server.setDefaultSecurityConfig(config);
    }
 
-   public Object executeCommand(Command command) throws Exception
+   public String getDefaultSecurityConfig() throws Exception
    {
-      return server.executeCommand(command);
+      return server.getDefaultSecurityConfig();
    }
 
-   public ServerPeer getServerPeer() throws Exception
+   public Object executeCommand(Command command) throws Exception
    {
-      return server.getServerPeer();
+      return server.executeCommand(command);
    }
 
    public UserTransaction getUserTransaction() throws Exception
@@ -370,9 +415,54 @@
       return server.getNodeIDView();
    }
 
+   public List pollNotificationListener(long listenerID) throws Exception
+   {
+      ProxyNotificationListener pl = null;
+
+      synchronized(proxyListeners)
+      {
+         pl = (ProxyNotificationListener)proxyListeners.get(new Long(listenerID));
+      }
+
+      if (pl == null)
+      {
+         return Collections.EMPTY_LIST;
+      }
+
+      return pl.drain();
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
    private RMINamingDelegate getNamingDelegate()
    {
       return namingDelegate;
    }
 
+   // Inner classes -------------------------------------------------
+
+   public class VMKiller implements Runnable
+   {
+      public void run()
+      {
+         log.info("shutting down the VM");
+
+         try
+         {
+            Thread.sleep(250);
+         }
+         catch(Exception e)
+         {
+            log.warn("interrupted while sleeping", e);
+         }
+
+         System.exit(0);
+      }
+   }
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/Server.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -23,7 +23,9 @@
 
 import java.rmi.Remote;
 import java.util.Set;
+import java.util.List;
 import javax.management.ObjectName;
+import javax.management.NotificationListener;
 import javax.transaction.UserTransaction;
 import org.jboss.jms.server.DestinationManager;
 import org.jboss.jms.server.ServerPeer;
@@ -43,21 +45,36 @@
 public interface Server extends Remote
 {
    void start(String containerConfig) throws Exception;
+
    void stop() throws Exception;
-   void destroy() throws Exception;
 
    /**
+    * For a remote server, it "abruptly" kills the VM running the server. For a local server
+    * it just stops the server.
+    */
+   void kill() throws Exception;
+
+   /**
     * Deploys and registers a service based on the MBean service descriptor element, specified as
     * a String. Supports XMBeans. The implementing class and the ObjectName are inferred from the
     * mbean element. If there are configuration attributed specified in the deployment descriptor,
     * they are applied to the service instance.
     */
    ObjectName deploy(String mbeanConfiguration) throws Exception;
+
    void undeploy(ObjectName on) throws Exception;
+
    Object getAttribute(ObjectName on, String attribute) throws Exception;
+
    void setAttribute(ObjectName on, String name, String valueAsString) throws Exception;
+
    Object invoke(ObjectName on, String operationName, Object[] params, String[] signature)
       throws Exception;
+
+   void addNotificationListener(ObjectName on, NotificationListener listener) throws Exception;
+
+   void removeNotificationListener(ObjectName on, NotificationListener listener) throws Exception;
+
    /**
     * Returns a set of ObjectNames corresponding to installed services.
     */
@@ -218,4 +235,9 @@
     */
    Set getNodeIDView() throws Exception;
 
+   /**
+    * @return List<Notification>
+    */
+   List pollNotificationListener(long listenerID) throws Exception;
+
 }

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java	2006-12-11 15:27:26 UTC (rev 1763)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/rmi/StopRMIServer.java	2006-12-11 16:56:06 UTC (rev 1764)
@@ -81,7 +81,7 @@
 
       // We should shut down cleanly - not kill the process like we are currently doing
       
-      server.destroy();
+      server.kill();
 
       // The last RMI server will take with it the registry too
 




More information about the jboss-cvs-commits mailing list