[jboss-cvs] JBoss Messaging SVN: r1705 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/tools/jmx

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 5 09:06:46 EST 2006


Author: timfox
Date: 2006-12-05 09:06:32 -0500 (Tue, 05 Dec 2006)
New Revision: 1705

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
Log:
Various fixes to get things running



Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -70,8 +70,8 @@
    // Map<uniqueName<String> - ServerConnectionFactoryEndpoint>
    protected Map endpoints;
 
-   // Map<uniqueName<String> - ClusteredClientConnectionFactoryDelegate>
-   protected Map clusteredDelegates;
+   // Map<uniqueName<String> - ClientConnectionFactoryDelegate> (not just clustered delegates)
+   protected Map delegates;
    
    protected Replicator replicator;
    
@@ -81,7 +81,7 @@
    {
       this.serverPeer = serverPeer;
       endpoints = new HashMap();
-      clusteredDelegates = new HashMap();
+      delegates = new HashMap();
    }
    
    // ConnectionFactoryManager implementation -----------------------
@@ -98,6 +98,8 @@
                                                       boolean clustered)
       throws Exception
    {
+      log.info("Registering connection factory with name " + uniqueName + " bindings " + jndiBindings);
+      
       int id = serverPeer.getNextObjectID();
       Version version = serverPeer.getVersion();
 
@@ -107,42 +109,51 @@
                                              defaultTempQueueFullSize,
                                              defaultTempQueuePageSize,
                                              defaultTempQueueDownCacheSize);
-      endpoints.put(uniqueName, endpoint);
-
-      JMSDispatcher.instance.
-         registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
-
-      ClientConnectionFactoryDelegate localDelegate =
-         new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
-
-      if (!clustered)
+      endpoints.put(uniqueName, endpoint);   
+      
+      ClientConnectionFactoryDelegate delegate = null;
+      
+      boolean replicateChanges = false;
+      
+      if (clustered)
       {
-         rebindConnectionFactory(initialContext, jndiBindings, localDelegate);
-         return;
+         setupReplicator();
+         
+         if (replicator != null)
+         {                                             
+            //Replicator might still be null since we might be deploying a clustered cf in a non clustered
+            //post office (which is ok)
+            delegate = new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+         }
       }
-
-      // We are clustered, we need to propagate the local delegate across the cluster.
-
-      setupReplicator();
-
-      if (replicator == null)
+      
+      if (delegate == null)
       {
-         return;
+         //Local
+         delegate = new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
       }
+      
+      delegates.put(uniqueName, delegate);
+      
+      //Now bind it in JNDI
+      rebindConnectionFactory(initialContext, jndiBindings, delegate);
+      
+      if (replicateChanges)
+      {
+         // We are clustered, we need to propagate the local delegate across the cluster.
 
-      // Create a "hollow" clustered delegate ...
+         // ... and then update it (and the clustered delegate on every node, while we're at it) by
+         // replicating the local delegate across the cluster. This way, the clustred delegates on each
+         // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
+         // will rebind updated ConnectionFactories in JNDI.
+         // This will update the local node too
 
-      ClusteredClientConnectionFactoryDelegate clusteredDelegate =
-         new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
-
-      clusteredDelegates.put(uniqueName, clusteredDelegate);
-
-      // ... and then update it (and the clustered delegate on every node, while we're at it) by
-      // replicating the local delegate across the cluster. This way, the clustred delegates on each
-      // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
-      // will rebind updated ConnectionFactories in JNDI.
-
-      replicator.put(CF_PREFIX + uniqueName, localDelegate);
+         replicator.put(CF_PREFIX + uniqueName, delegate);
+      }
+            
+      //Registering with the dispatcher should always be the last thing otherwise a client could use
+      //a partially initialised object
+      JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
    }
    
    public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
@@ -169,7 +180,7 @@
       }
       
       ClientConnectionFactoryDelegate delegate =
-         (ClientConnectionFactoryDelegate)clusteredDelegates.remove(uniqueName);
+         (ClientConnectionFactoryDelegate)delegates.remove(uniqueName);
       
       if (delegate == null)
       {
@@ -245,7 +256,7 @@
             String uniqueName = sKey.substring(CF_PREFIX.length());
             
             ClusteredClientConnectionFactoryDelegate clusteredDelegate =
-               (ClusteredClientConnectionFactoryDelegate)clusteredDelegates.get(uniqueName);
+               (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
             
             if (clusteredDelegate == null)
             {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -65,6 +65,7 @@
           
    private String officeName;
    
+   //This lock protects the condition and name maps
    protected ReadWriteLock lock;
    
    protected MessageStore ms;     
@@ -73,7 +74,7 @@
    
    protected TransactionRepository tr;
    
-   protected int nodeId;
+   protected int currentNodeId;
    
    //Map <node id, Map < queue name, binding > >
    protected Map nameMaps;
@@ -104,7 +105,7 @@
        
       conditionMap = new LinkedHashMap(); 
       
-      this.nodeId = nodeId;
+      this.currentNodeId = nodeId;
       
       this.officeName = officeName;
       
@@ -172,7 +173,7 @@
       try
       {         
          //We currently only allow one binding per name per node
-         Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+         Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
          
          Binding binding = null;
          
@@ -186,7 +187,7 @@
             throw new IllegalArgumentException("Binding already exists for name " + queue.getName());
          }
                  
-         binding = new DefaultBinding(nodeId, condition, queue, false);
+         binding = new DefaultBinding(currentNodeId, condition, queue, false);
          
          addBinding(binding);
                
@@ -217,13 +218,13 @@
 
       try
       {         
-         Binding binding = removeBinding(nodeId,queueName);
+         Binding binding = removeBinding(currentNodeId,queueName);
       
          if (binding.getQueue().isRecoverable())
          {
             //Need to remove from db too
             
-            deleteBinding(nodeId, binding.getQueue().getName());
+            deleteBinding(currentNodeId, binding.getQueue().getName());
          }
          
          binding.getQueue().removeAllReferences();         
@@ -270,7 +271,7 @@
     */
    protected Binding internalGetBindingForQueueName(String queueName)
    {
-      Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+      Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
 
       Binding binding = null;
 
@@ -339,7 +340,7 @@
                Binding binding = (Binding)iter.next();
                
                //Sanity check
-               if (binding.getNodeId() != this.nodeId)
+               if (binding.getNodeId() != this.currentNodeId)
                {
                   throw new IllegalStateException("Local post office has foreign bindings!");
                }
@@ -407,7 +408,7 @@
             {
                Binding binding = (Binding)iter.next();
                
-               if (!localOnly || (binding.getNodeId() == this.nodeId))
+               if (!localOnly || (binding.getNodeId() == this.currentNodeId))
                {
                   list.add(binding);
                }
@@ -499,7 +500,7 @@
     protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
     {
         Queue queue;
-        if (nodeId == this.nodeId)
+        if (nodeId == this.currentNodeId)
         {
            QueuedExecutor executor = (QueuedExecutor)pool.get();
 
@@ -532,7 +533,7 @@
          String filterString = binding.getQueue().getFilter() == null ? null : binding.getQueue().getFilter().getFilterString();
                   
          ps.setString(1, this.officeName);
-         ps.setInt(2, this.nodeId);
+         ps.setInt(2, this.currentNodeId);
          ps.setString(3, binding.getQueue().getName());
          ps.setString(4, binding.getCondition());         
          if (filterString != null)
@@ -564,7 +565,7 @@
    
    protected boolean deleteBinding(int parameterNodeId, String queueName) throws Exception
    {
-      if (parameterNodeId<0) parameterNodeId=this.nodeId;
+      if (parameterNodeId<0) parameterNodeId=this.currentNodeId;
       Connection conn = null;
       PreparedStatement ps  = null;
       TransactionWrapper wrap = new TransactionWrapper();

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -21,7 +21,6 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -40,8 +39,10 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
+
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -74,6 +75,8 @@
 import org.jgroups.blocks.RequestHandler;
 import org.w3c.dom.Element;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
 /**
  * 
  * A DefaultClusteredPostOffice
@@ -99,6 +102,7 @@
    private boolean failAfterCommit;
    
    private boolean failHandleResult;
+   //End of failure testing attributes
      
    private boolean trace = log.isTraceEnabled();
 
@@ -152,27 +156,11 @@
    private Map failedBindings;
 
    private StatsSender statsSender;
+   
+   private ReplicationListener nodeAddressMapListener;
 
    private boolean started;
      
-   public DefaultClusteredPostOffice()
-   {        
-      init();
-   }
-   
-   private void init()
-   {
-      holdingArea = new HashMap();
-      
-      replicatedData = new HashMap();      
-      
-      replicationListeners = new HashSet();
-      
-      failoverMap = new LinkedHashMap();
-      
-      leftSet = new HashSet();
-   }
-   
    /*
     * Constructor using Element for configuration
     */
@@ -275,7 +263,15 @@
       
       statsSender = new StatsSender(this, statsSendPeriod);
       
-      init();
+      holdingArea = new HashMap();
+      
+      replicatedData = new HashMap();      
+      
+      replicationListeners = new HashSet();
+      
+      failoverMap = new LinkedHashMap();
+      
+      leftSet = new HashSet();
    }
 
    // MessagingComponent overrides
@@ -309,10 +305,16 @@
       MessageListener cml = new ControlMessageListener();
       MembershipListener ml = new ControlMembershipListener();
       RequestHandler rh = new PostOfficeRequestHandler();
+      
+      //Register as a listener for nodeid-adress mapping events
+      nodeAddressMapListener = new NodeAddressMapListener();
+      
+      registerListener(nodeAddressMapListener);
 
       this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
 
       Receiver r = new DataReceiver();
+      
       asyncChannel.setReceiver(r);
 
       syncChannel.connect(groupName);
@@ -320,7 +322,7 @@
       asyncChannel.connect(groupName);
       
       super.start();
-
+            
       Address syncAddress = syncChannel.getLocalAddress();
       
       Address asyncAddress = asyncChannel.getLocalAddress();
@@ -329,7 +331,7 @@
       
       put(ADDRESS_INFO_KEY, info);
 
-      verifyMembership(null,this.currentView);
+      //verifyMembership(null, this.currentView);
 
       statsSender.start();
       
@@ -346,8 +348,11 @@
       }
 
       syncSendRequest(new LeaveClusterRequest(this.getNodeId()));
+      
       super.stop(sendNotification);
       
+      unregisterListener(nodeAddressMapListener);
+      
       statsSender.stop();
          
       syncChannel.close();
@@ -365,10 +370,10 @@
    {
       if (trace)
       {
-         log.trace(this.nodeId + " binding clustered queue: " + queue + " with condition: " + condition);
+         log.trace(this.currentNodeId + " binding clustered queue: " + queue + " with condition: " + condition);
       }
         
-      if (queue.getNodeId() != this.nodeId)
+      if (queue.getNodeId() != this.currentNodeId)
       {
           log.warn("queue.getNodeId is not this node");
          //throw new IllegalArgumentException("Queue node id does not match office node id");
@@ -386,7 +391,7 @@
       throws Exception
    {
       BindRequest request =
-         new BindRequest(this.nodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
+         new BindRequest(this.currentNodeId, queue.getName(), condition, queue.getFilter() == null ? null : queue.getFilter().getFilterString(),
                          binding.getQueue().getChannelID(), queue.isRecoverable(), binding.isFailed());
 
       syncSendRequest(request);
@@ -396,12 +401,12 @@
    {
       if (trace)
       {
-         log.trace(this.nodeId + " unbind clustered queue: " + queueName);
+         log.trace(this.currentNodeId + " unbind clustered queue: " + queueName);
       }
       
       Binding binding = (Binding)super.unbindQueue(queueName);
       
-      UnbindRequest request = new UnbindRequest(this.nodeId, queueName);
+      UnbindRequest request = new UnbindRequest(this.currentNodeId, queueName);
       
       syncSendRequest(request);
       
@@ -412,7 +417,7 @@
    {
       if (trace)
       {
-         log.trace(this.nodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
+         log.trace(this.currentNodeId + " Routing " + ref + " with condition " + condition + " and transaction " + tx);
       }
       
       if (ref == null)
@@ -451,7 +456,7 @@
                   startInternalTx = true;
                   if (trace)
                   {
-                     log.trace(this.nodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
+                     log.trace(this.currentNodeId + " Starting internal transaction since more than one durable sub or remote durable subs");
                   }
                }
             }                        
@@ -485,7 +490,7 @@
                   
                   if (trace)
                   {
-                     log.trace(this.nodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
+                     log.trace(this.currentNodeId + " Routing message to queue or stub:" + queue.getName() + " on node " +
                                queue.getNodeId() +" local:" + queue.isLocal());
                   }
                   
@@ -527,14 +532,14 @@
                {
                   if (numberRemote == 1)
                   {
-                     if (trace) { log.trace(this.nodeId + " unicasting message to " + lastNodeId); }
+                     if (trace) { log.trace(this.currentNodeId + " unicasting message to " + lastNodeId); }
                      
                      //Unicast - only one node is interested in the message                                        
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), null), lastNodeId);
                   }
                   else
                   {
-                     if (trace) { log.trace(this.nodeId + " multicasting message to group"); }
+                     if (trace) { log.trace(this.currentNodeId + " multicasting message to group"); }
                      
                      //Multicast - more than one node is interested
                      asyncSendRequest(new MessageRequest(condition, ref.getMessage(), queueNameNodeIdMap));
@@ -546,7 +551,7 @@
                   
                   if (callback == null)
                   {
-                     callback = new CastMessagesCallback(nodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
+                     callback = new CastMessagesCallback(currentNodeId, tx.getId(), DefaultClusteredPostOffice.this, failBeforeCommit, failAfterCommit);
                      
                      //This callback MUST be executed first
                      
@@ -593,48 +598,45 @@
    
    public int getFailoverNodeID(int nodeId)
    {
-      Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
-      
-      if (failoverNode == null)
-      {
-         return nodeId;
+      synchronized (failoverMap)
+      {         
+         Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
+         
+         if (failoverNode == null)
+         {
+            return nodeId;
+         }
+         
+         return failoverNode.intValue();
       }
-      
-      return failoverNode.intValue();
    }
 
    // Replicator implementation --------------------------------------------------------------------------
    
    public Map get(Serializable key) throws Exception
    {
-      lock.readLock().acquire();
-      
-      try
-      {
+      synchronized (replicatedData)
+      {         
          Map m = (Map)replicatedData.get(key);
-         
+            
          return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
       }
-      finally
-      {
-         lock.readLock().release();
-      }
    }
 
    public void put(Serializable key, Serializable replicant) throws Exception
    {
-      putReplicantLocally(nodeId, key, replicant);
+      putReplicantLocally(currentNodeId, key, replicant);
       
-      PutReplicantRequest request = new PutReplicantRequest(nodeId, key, replicant);
+      PutReplicantRequest request = new PutReplicantRequest(currentNodeId, key, replicant);
       
       syncSendRequest(request);
    }
    
    public boolean remove(Serializable key) throws Exception
    {
-      if (removeReplicantLocally(this.nodeId, key))
+      if (removeReplicantLocally(this.currentNodeId, key))
       {      
-         RemoveReplicantRequest request = new RemoveReplicantRequest(this.nodeId, key);
+         RemoveReplicantRequest request = new RemoveReplicantRequest(this.currentNodeId, key);
          
          syncSendRequest(request);
          
@@ -648,20 +650,26 @@
    
    public void registerListener(ReplicationListener listener)
    {
-      if (replicationListeners.contains(listener))
+      synchronized (replicationListeners)
       {
-         throw new IllegalArgumentException("Listener " + listener + " is already registered");
+         if (replicationListeners.contains(listener))
+         {
+            throw new IllegalArgumentException("Listener " + listener + " is already registered");
+         }
+         replicationListeners.add(listener);
       }
-      replicationListeners.add(listener);
    }
    
    public void unregisterListener(ReplicationListener listener)
    {
-      boolean removed = replicationListeners.remove(listener);
-      
-      if (!removed)
+      synchronized (replicationListeners)
       {
-         throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
+         boolean removed = replicationListeners.remove(listener);
+         
+         if (!removed)
+         {
+            throw new IllegalArgumentException("Cannot find listener " + listener + " to remove");
+         }
       }
    }
     
@@ -669,16 +677,10 @@
    
    public void handleNodeLeft(int nodeId) throws Exception
    {
-      lock.writeLock().acquire();
-      
-      try
+      synchronized (leftSet)
       {
          leftSet.add(new Integer(nodeId));
       }
-      finally
-      {
-         lock.writeLock().release();
-      }      
    }
 
    /**
@@ -687,9 +689,7 @@
    public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
       throws Exception
    {
-      lock.writeLock().acquire();
-      
-      try
+      synchronized (replicatedData)
       {         
          Map m = (Map)replicatedData.get(key);
          
@@ -704,10 +704,6 @@
          
          notifyListeners(key, m);
       }
-      finally
-      {
-         lock.writeLock().release();
-      }
    }
 
    /**
@@ -715,10 +711,10 @@
     */
    public boolean removeReplicantLocally(int originatorNodeID, Serializable key) throws Exception
    {
-      lock.writeLock().acquire();
-      
-      try
+      synchronized (replicatedData)
       {
+         log.info(this.currentNodeId + " removing key " + key + " from node " + originatorNodeID);
+         
          Map m = (Map)replicatedData.get(key);
          
          if (m == null)
@@ -736,16 +732,11 @@
          if (m.isEmpty())
          {
             replicatedData.remove(key);
-         }
-         
+         } 
          notifyListeners(key, m);
-         
-         return true;
+      
+         return true; 
       }
-      finally
-      {
-         lock.writeLock().release();
-      }
    }
      
    /*
@@ -759,7 +750,7 @@
     
       if (trace)
       {
-         log.info(this.nodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
+         log.info(this.currentNodeId + " adding binding from node: " + nodeId + " queue: " + queueName + " with condition: " + condition);
       }
             
       try
@@ -783,7 +774,7 @@
          
          if (binding != null && failed)
          {
-            throw new IllegalArgumentException(this.nodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
+            throw new IllegalArgumentException(this.currentNodeId + " Binding already exists for node Id " + nodeId + " queue name " + queueName);
          }
             
          binding = this.createBinding(nodeId, condition, queueName, channelID, filterString, durable, failed);
@@ -805,7 +796,7 @@
       
       if (trace)
       {
-         log.trace(this.nodeId + " removing binding from node: " + nodeId + " queue: " + queueName);        
+         log.trace(this.currentNodeId + " removing binding from node: " + nodeId + " queue: " + queueName);        
       }
       
       try
@@ -829,7 +820,7 @@
    {
       if (trace)
       {
-         log.trace(this.nodeId + " routing from cluster, message: " + message + " routing key " +
+         log.trace(this.currentNodeId + " routing from cluster, message: " + message + " routing key " +
                   routingKey + " map " + queueNameNodeIdMap);
       }
             
@@ -860,7 +851,7 @@
             {
                Binding binding = (Binding)iter.next();
                                                      
-               if (binding.getNodeId() == this.nodeId)
+               if (binding.getNodeId() == this.currentNodeId)
                {  
                   boolean handle = true;
                   
@@ -873,7 +864,7 @@
                      
                      if (in != null)
                      {
-                        handle = in.intValue() == nodeId;
+                        handle = in.intValue() == currentNodeId;
                      }
                   }
                   
@@ -887,7 +878,7 @@
                      
                      if (trace)
                      {
-                        log.trace(this.nodeId + " queue " + queue.getName() + " handled reference from cluster " + del);
+                        log.trace(this.currentNodeId + " queue " + queue.getName() + " handled reference from cluster " + del);
                      }
                   }
                }
@@ -909,7 +900,7 @@
     */
    public void asyncSendRequest(ClusterRequest request) throws Exception
    {     
-      if (trace) { log.trace(this.nodeId + " sending asynch request to group, request: " + request); }
+      if (trace) { log.trace(this.currentNodeId + " sending asynch request to group, request: " + request); }
       
       byte[] bytes = writeRequest(request);
               
@@ -921,11 +912,11 @@
     */
    public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
    {               
-      if (trace) { log.trace(this.nodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
+      if (trace) { log.trace(this.currentNodeId + " sending asynch request to single node, request: " + request + " node " + nodeId); }
       
       Address address = this.getAddressForNodeId(nodeId, false);
       
-      if (trace) { log.trace(this.nodeId + " sending to address " + address); }
+      if (trace) { log.trace(this.currentNodeId + " sending to address " + address); }
       
       if (address == null)
       {
@@ -948,13 +939,13 @@
       {
          holdingArea.put(id, tx);
          
-         if (trace) { log.trace(this.nodeId + " added transaction " + tx + " to holding area with id " + id); }
+         if (trace) { log.trace(this.currentNodeId + " added transaction " + tx + " to holding area with id " + id); }
       } 
    }
    
    public void commitTransaction(TransactionId id) throws Throwable
    {
-      if (trace) { log.trace(this.nodeId + " committing transaction " + id ); }
+      if (trace) { log.trace(this.currentNodeId + " committing transaction " + id ); }
       
       ClusterTransaction tx = null;
         
@@ -970,12 +961,12 @@
       
       tx.commit(this);
       
-      if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
+      if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
    }
    
    public void rollbackTransaction(TransactionId id) throws Throwable
    {
-      if (trace) { log.trace(this.nodeId + " rolling back transaction " + id ); }
+      if (trace) { log.trace(this.currentNodeId + " rolling back transaction " + id ); }
       
       ClusterTransaction tx = null;
         
@@ -991,7 +982,7 @@
       
       tx.rollback(this);
       
-      if (trace) { log.trace(this.nodeId + " committed transaction " + id ); }
+      if (trace) { log.trace(this.currentNodeId + " committed transaction " + id ); }
    }
    
    /**
@@ -999,7 +990,7 @@
     */
    public void check(Integer nodeId) throws Throwable
    {
-      if (trace) { log.trace(this.nodeId + " checking for any stranded transactions for node " + nodeId); }
+      if (trace) { log.trace(this.currentNodeId + " checking for any stranded transactions for node " + nodeId); }
       
       synchronized (holdingArea)
       {
@@ -1021,7 +1012,7 @@
                 
                boolean commit = tx.check(this);
                
-               if (trace) { log.trace(this.nodeId + " transaction " + tx + " will be committed?: " + commit); }
+               if (trace) { log.trace(this.currentNodeId + " transaction " + tx + " will be committed?: " + commit); }
                
                if (commit)
                {
@@ -1034,7 +1025,7 @@
                
                toRemove.add(id);
                
-               if (trace) { log.trace(this.nodeId + " resolved " + tx); }
+               if (trace) { log.trace(this.currentNodeId + " resolved " + tx); }
             }
          }
          
@@ -1049,7 +1040,7 @@
             holdingArea.remove(id);
          }
       }
-      if (trace) { log.trace(this.nodeId + " check complete"); }
+      if (trace) { log.trace(this.currentNodeId + " check complete"); }
    }
    
    public void sendQueueStats() throws Exception
@@ -1065,7 +1056,7 @@
       
       try
       {         
-         Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+         Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
          
          if (nameMap != null)
          {            
@@ -1090,7 +1081,7 @@
 
                      statsList.add(stats);
                      
-                     if (trace) { log.trace(this.nodeId + " adding stat for send " + stats); }
+                     if (trace) { log.trace(this.currentNodeId + " adding stat for send " + stats); }
                   } 
                }
             }
@@ -1103,11 +1094,11 @@
       
       if (statsList != null)
       {
-         ClusterRequest req = new QueueStatsRequest(nodeId, statsList);
+         ClusterRequest req = new QueueStatsRequest(currentNodeId, statsList);
          
          asyncSendRequest(req);
          
-         if (trace) { log.trace(this.nodeId + " Sent stats"); }
+         if (trace) { log.trace(this.currentNodeId + " Sent stats"); }
       }
    }
    
@@ -1115,11 +1106,11 @@
    {
       lock.readLock().acquire();
 
-      if (trace) { log.trace(this.nodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
+      if (trace) { log.trace(this.currentNodeId + " updating queue stats from node " + nodeId + " stats size: " + statsList.size()); }
       
       try
       {      
-         if (nodeId == this.nodeId)
+         if (nodeId == this.currentNodeId)
          {
             //Sanity check
             throw new IllegalStateException("Received stats from node with id that matches this nodes id. You may have started two or more nodes with the same node id!");
@@ -1130,7 +1121,7 @@
          if (nameMap == null)
          {
             //This is ok, the node might have left
-            if (trace) { log.trace(this.nodeId + " cannot find node in name map, i guess the node might have left?"); }
+            if (trace) { log.trace(this.currentNodeId + " cannot find node in name map, i guess the node might have left?"); }
          }
          else
          {     
@@ -1145,7 +1136,7 @@
                if (bb == null)
                {
                   //I guess this is possible if the queue was unbound
-                  if (trace) { log.trace(this.nodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
+                  if (trace) { log.trace(this.currentNodeId + " cannot find binding for queue " + st.getQueueName() + " it could have been unbound"); }
                }
                else
                {                  
@@ -1153,7 +1144,7 @@
                   
                   stub.setStats(st);
                   
-                  if (trace) { log.trace(this.nodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
+                  if (trace) { log.trace(this.currentNodeId + " setting stats: " + st + " on remote stub " + stub.getName()); }
                   
                   ClusterRouter router = (ClusterRouter)routerMap.get(st.getQueueName());
                   
@@ -1166,7 +1157,7 @@
                      //TODO - the call to getQueues is too slow since it creates a new list and adds the local queue!!!
                      RemoteQueueStub toQueue = (RemoteQueueStub)messagePullPolicy.chooseQueue(router.getQueues());
                      
-                     if (trace) { log.trace(this.nodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
+                     if (trace) { log.trace(this.currentNodeId + " recalculated pull queue for queue " + st.getQueueName() + " to be " + toQueue); }
                                     
                      localQueue.setPullQueue(toQueue);
                      
@@ -1177,7 +1168,7 @@
                         
                         localQueue.deliver(false);
                                                                     
-                        if (trace) { log.trace(this.nodeId + " triggered delivery for " + localQueue.getName()); }
+                        if (trace) { log.trace(this.currentNodeId + " triggered delivery for " + localQueue.getName()); }
                      }
                   } 
                }
@@ -1199,7 +1190,7 @@
    public void handleMessagePullResult(int remoteNodeId, long holdingTxId,
                                        String queueName, org.jboss.messaging.core.Message message) throws Throwable
    {
-      if (trace) { log.trace(this.nodeId + " handling pull result " + message + " for " + queueName); }
+      if (trace) { log.trace(this.currentNodeId + " handling pull result " + message + " for " + queueName); }
                
       Binding binding = getBindingForQueueName(queueName);
       
@@ -1246,22 +1237,22 @@
          if (message.isReliable())
          {
             //Only reliable messages will be in holding area
-            this.asyncSendRequest(new RollbackPullRequest(this.nodeId, holdingTxId), remoteNodeId);
+            this.asyncSendRequest(new RollbackPullRequest(this.currentNodeId, holdingTxId), remoteNodeId);
             
-            if (trace) { log.trace(this.nodeId + " send rollback pull request"); }
+            if (trace) { log.trace(this.currentNodeId + " send rollback pull request"); }
          }
       }      
    }
    
    public int getNodeId()
    {
-      return nodeId;
+      return currentNodeId;
    }
 
    public String toString()
    {
       StringBuffer sb = new StringBuffer("ClusteredPostOffice[");
-      sb.append(nodeId).append(":").append(getOfficeName()).append(":");
+      sb.append(currentNodeId).append(":").append(getOfficeName()).append(":");
 
       if (syncChannel == null)
       {
@@ -1324,7 +1315,7 @@
       {
          if (oldView != null)
          {
-            for(Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+            for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
             {
                Address address = (Address)i.next();
                if (!newView.containsMember(address))
@@ -1334,7 +1325,7 @@
             }
          }
 
-         for(Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+         for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
          {
             Address address = (Address)i.next();
             if (oldView == null || !oldView.containsMember(address))
@@ -1457,7 +1448,7 @@
             }
            
             //Create a new binding
-            Binding newBinding = this.createBinding(this.nodeId, binding.getCondition(),
+            Binding newBinding = this.createBinding(this.currentNodeId, binding.getCondition(),
                                                     stub.getName(), stub.getChannelID(),
                                                     stub.getFilter(), stub.isRecoverable(), failed);
 
@@ -1497,7 +1488,7 @@
       {
          //First look in the failed map
          //Failed bindings are stored in the failed map by channel id
-         Map channelMap = (Map)failedBindings.get(new Integer(nodeId));
+         Map channelMap = (Map)failedBindings.get(new Integer(currentNodeId));
          Binding binding = null;
          if (channelMap != null)
          {
@@ -1507,7 +1498,7 @@
          if (binding == null)
          {
             //Not found in the failed map - look in the name map
-            Map nameMap = (Map)nameMaps.get(new Integer(nodeId));
+            Map nameMap = (Map)nameMaps.get(new Integer(currentNodeId));
 
             if (nameMap != null)
             {
@@ -1684,7 +1675,7 @@
 
       if (bindings == null)
       {
-         bindings = new DefaultClusteredBindings(nodeId);
+         bindings = new DefaultClusteredBindings(currentNodeId);
 
          conditionMap.put(condition, bindings);
       }
@@ -1769,7 +1760,7 @@
       {
          //The state will be set in due course via the MessageListener - we must wait until this happens
          
-         if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
+         if (trace) { log.trace(this.currentNodeId + " Not first member of group- so waiting for state to arrive...."); }
 
       synchronized (setStateLock)
       {
@@ -1780,14 +1771,14 @@
          }
       }
 
-         if (trace) { log.trace(this.nodeId + " Received state"); }
+         if (trace) { log.trace(this.currentNodeId + " Received state"); }
       }
    }
 
    protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
    {
       Queue queue;
-      if (nodeId == this.nodeId)
+      if (nodeId == this.currentNodeId)
       {
          QueuedExecutor executor = (QueuedExecutor)pool.get();
          
@@ -1806,16 +1797,10 @@
    
    private boolean leaveMessageReceived(Integer nodeId) throws Exception
    {
-      lock.writeLock().acquire();
-      
-      try
+      synchronized (leftSet)
       {
          return leftSet.remove(nodeId);
       }
-      finally
-      {
-         lock.writeLock().release();
-      }    
    }
    
    /*
@@ -1857,8 +1842,15 @@
                
                removeBinding(nodeID.intValue(), binding.getQueue().getName());
             }
-         }
+         }         
+      }
+      finally
+      {
+         lock.writeLock().release();
+      }
          
+      synchronized (replicatedData)
+      {         
          // We need to remove any replicant data for the node. This includes the node-address info.
          for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
          {
@@ -1876,12 +1868,8 @@
             
             // Need to trigger listeners
             notifyListeners(key, replicants);
-         }                  
+         }         
       }
-      finally
-      {
-         lock.writeLock().release();
-      }
    }
 
    /**
@@ -1890,10 +1878,14 @@
     */
    private void notifyListeners(Serializable key, Map updatedReplicantMap)
    {
-      for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
-      {
-         ReplicationListener listener = (ReplicationListener)i.next();
-         listener.onReplicationChange(key, updatedReplicantMap);
+      synchronized (replicationListeners)
+      {         
+         for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
+         {
+            ReplicationListener listener = (ReplicationListener)i.next();
+            
+            listener.onReplicationChange(key, updatedReplicantMap);
+         }
       }
    } 
    
@@ -1913,13 +1905,33 @@
       if (trace) { log.trace(this + " request sent OK"); }
    }
    
+   //DEBUG ONLY - remove this
+   private void dumpNodeIdAddressMap(Map map) throws Exception   
+   {
+      log.info("** DUMPING NODE ADDRESS MAPPING");
+      
+      Iterator iter = map.entrySet().iterator();
+      
+      while (iter.hasNext())
+      {
+         Map.Entry entry = (Map.Entry)iter.next();
+         
+         Integer theNodeId = (Integer)entry.getKey();
+         
+         PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
+         
+         log.info("node id: " + theNodeId +" --------->(async:sync) " + info.getAsyncChannelAddress() + ":" + info.getSyncChannelAddress());
+      }
+      
+      log.info("** END DUMP");      
+   }
+   
 
    //TODO - this is a bit tortuous - needs optimising
    private Integer getNodeIdForSyncAddress(Address address) throws Exception
    {
-      lock.readLock().acquire();
-      try
-      { 
+      synchronized (replicatedData)
+      {
          Map map = get(ADDRESS_INFO_KEY);
          
          if (map == null)
@@ -1927,43 +1939,54 @@
             throw new IllegalStateException("Cannot find node id -> address mapping");
          }
          
+         this.dumpNodeIdAddressMap(map);
+         
          Iterator iter = map.entrySet().iterator();
          
-         Integer nodeId = null;
+         log.info("iterating, looking for " + address);
+         
+         Integer theNodeId = null;
          while (iter.hasNext())
          {
             Map.Entry entry = (Map.Entry)iter.next();
             
             PostOfficeAddressInfo info = (PostOfficeAddressInfo)entry.getValue();
             
+            log.info("info synch channel address: " + info.getSyncChannelAddress());
+            
             if (info.getSyncChannelAddress().equals(address))
             {
-               nodeId = (Integer)entry.getKey();
+               log.info("equal");
+               theNodeId = (Integer)entry.getKey();
+               break;
             }
+            else
+            {
+               log.info("Not equal");
+            }
          }
-         return nodeId;
+         return theNodeId;
       }
-      finally
-      {
-         lock.readLock().release();
-      }
    }
    
    private boolean knowAboutNodeId(int nodeId)
    {
       //The nodeid->Address info mapping is stored in the replicated data
       
-      Map nodeIdAddressMapping = (Map)replicatedData.get(ADDRESS_INFO_KEY);
-      
-      if (nodeIdAddressMapping == null)
-      {
-         return false;
-      }
-      else
-      {
-         Object obj = nodeIdAddressMapping.get(new Integer(nodeId));
+      synchronized (replicatedData)
+      {         
+         Map nodeIdAddressMapping = (Map)replicatedData.get(ADDRESS_INFO_KEY);
          
-         return obj != null;
+         if (nodeIdAddressMapping == null)
+         {
+            return false;
+         }
+         else
+         {
+            Object obj = nodeIdAddressMapping.get(new Integer(nodeId));
+            
+            return obj != null;
+         }
       }
    }
 
@@ -1973,7 +1996,7 @@
     */
    private boolean isFailoverNodeForNode(int nodeId)
    {
-      return this.nodeId == getFailoverNodeID(nodeId);
+      return this.currentNodeId == getFailoverNodeID(nodeId);
    }
       
    private byte[] getStateAsBytes() throws Exception
@@ -2004,8 +2027,17 @@
          }
       }
       
-      SharedState state = new SharedState(bindings, replicatedData);
+      //Need to copy
       
+      Map copy;
+      
+      synchronized (replicatedData)
+      {
+         copy = copyReplicatedData(replicatedData);
+      }
+      
+      SharedState state = new SharedState(bindings, copy);
+      
       return StreamUtils.toBytes(state);
    }
    
@@ -2034,7 +2066,7 @@
          Binding binding = this.createBinding(info.getNodeId(), info.getCondition(), info.getQueueName(), info.getChannelId(),
                                               info.getFilterString(), info.isDurable(),info.isFailed());
          
-         if (binding.getNodeId() == this.nodeId)
+         if (binding.getNodeId() == this.currentNodeId)
          {
             //We deactivate if this is one of our own bindings - it can only
             //be one of our own durable bindings - and since state is retrieved before we are fully started
@@ -2046,11 +2078,20 @@
          addBinding(binding);         
       }
       
-      //Copy the map
-      this.replicatedData.clear();
+      //Update the replicated data
       
-      iter = state.getReplicatedData().entrySet().iterator();
+      synchronized (replicatedData)
+      {
+         replicatedData = copyReplicatedData(state.getReplicatedData());
+      }
+   }
+   
+   private Map copyReplicatedData(Map toCopy)
+   {
+      Map copy = new HashMap();
       
+      Iterator iter = toCopy.entrySet().iterator();
+      
       while (iter.hasNext())
       {
          Map.Entry entry = (Map.Entry)iter.next();
@@ -2063,8 +2104,10 @@
          
          m.putAll(replicants);
          
-         replicatedData.put(key, m);
+         copy.put(key, m);
       }
+      
+      return copy;
    }
    
 
@@ -2096,9 +2139,7 @@
    
    private Address getAddressForNodeId(int nodeId, boolean sync) throws Exception
    {  
-      lock.readLock().acquire();
-
-      try
+      synchronized (replicatedData)
       {
          Map map = this.get(ADDRESS_INFO_KEY);
          
@@ -2124,64 +2165,11 @@
          {
             return null;
          }
-      }
-      finally
-      {
-         lock.readLock().release();      
-      }
+      }      
    }
    
-   /*
-    * Given a JGroups view, generate a map of node to failover node.
-    * The mapping is determined by a pluggable policy.
-    */
-   private void generateFailoverMap(View view) throws Exception
-   {
-      List nodes = new ArrayList();
-      
-      for (Iterator i = view.getMembers().iterator(); i.hasNext(); )
-      {
-         Address address = (Address)i.next();
+   
 
-         // Ignore own address, a node can't be its own failover node
-         if (syncChannel.getLocalAddress().equals(address))
-         {
-            continue;
-         }
-
-         // Convert to node id
-         // TODO this should be optimised - currently the implementation of the lookup
-         // is a bit tortuous
-         
-         Integer n = getNodeIdForSyncAddress(address);
-         
-         if (n == null)
-         {
-            throw new IllegalStateException("Cannot find node id for address: " + address);
-         }
-         
-         nodes.add(n);                  
-      }
-            
-      List failoverNodes = failoverMapper.generateMapping(nodes);
-      
-      // Now put this in the map of node -> failover node
-      
-      failoverMap.clear();
-
-      Iterator iter = nodes.iterator();
-      Iterator iter2 = failoverNodes.iterator();
-
-      while (iter.hasNext())
-      {
-         Integer node = (Integer)iter.next();
-
-         Integer failoverNode = (Integer)iter2.next();
-
-         failoverMap.put(node, failoverNode);
-      }
-   }
-
    /*
     * A new node has joined the group
     */
@@ -2189,8 +2177,10 @@
    {
       if (trace) { log.trace(this + ": " + address + " joined"); }
 
+      log.info(this.currentNodeId + " Node with address: " + address + " joined");
+      
       // We need to regenerate the failover map
-      generateFailoverMap(currentView);
+      //generateFailoverMap(currentView);
    }
 
    /*
@@ -2199,32 +2189,34 @@
    private void nodeLeft(Address address) throws Throwable
    {
       if (trace) { log.trace(this + ": " + address + " left"); }
+      
+      log.info(this.currentNodeId + " Node with address: " + address + " left");
 
-      Integer nodeId = getNodeIdForSyncAddress(address);
+      Integer theNodeId = getNodeIdForSyncAddress(address);
 
-      if (nodeId != null)
+      if (theNodeId == null)
       {
-         throw new IllegalStateException("Cannot find node id for address " + address);
+         throw new IllegalStateException(this.currentNodeId + " Cannot find node id for address " + address);
       }
 
-      boolean crashed = !this.leaveMessageReceived(nodeId);
+      boolean crashed = !this.leaveMessageReceived(theNodeId);
 
-      if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
+      if (trace) { log.trace("Node " + address + " id: " + theNodeId +" has left the group, crashed = " + crashed); }
 
       //Cleanup any hanging transactions - we do this irrespective of whether we crashed
-      check(nodeId);
+      check(theNodeId);
 
       //Need to evaluate this before we regenerate the failover map
-      boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
+      boolean isFailover = isFailoverNodeForNode(theNodeId.intValue());
 
       //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
       //since that may cause connection factories to be rebound
-      generateFailoverMap(currentView);
+      //generateFailoverMap(currentView);
 
       //Remove any replicant data and non durable bindings for the node - again we need to do this
       //irrespective of whether we crashed
       //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
-      removeDataForNode(nodeId);
+      removeDataForNode(theNodeId);
 
       if (crashed && isFailover)
       {
@@ -2233,7 +2225,7 @@
 
          //TODO server side valve
 
-         failOver(nodeId.intValue());
+         failOver(theNodeId.intValue());
       }
    }
 
@@ -2337,8 +2329,8 @@
          // + DefaultClusteredPostOffice.this.getOfficeName()); }
          //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
          //TODO:     can't we do the same since this is pretty useful?
-         log.info(DefaultClusteredPostOffice.this  + " got new view: " + newView + " postOffice:"
-            + DefaultClusteredPostOffice.this.getOfficeName());
+         log.info(currentNodeId  + " got new view: " + newView + " postOffice:"
+                  + DefaultClusteredPostOffice.this.getOfficeName());
 
          // JGroups will make sure this method is never called by more than one thread concurrently
          
@@ -2347,11 +2339,16 @@
 
          // Since now membership is sent over state transfer, we have to wait stateSet to be set.
          // If this logic is not good enough, we will have to place Address in a separate data structure
-         // as it used to be (NodeInfos)
-         if (stateSet)
-         {
+         // as it used to be (NodeInfos) - Clebert
+         
+         
+         // FIXME - I don't understand why this check with flag (stateSet)is necessary
+         //The state will always be set before any view changes occur - JGroups guarantees this -
+         //Maybe I am wrong - Tim
+//         if (stateSet)
+//         {
             verifyMembership(oldView, newView);
-         }
+        // }
       }
 
       public byte[] getState()
@@ -2391,7 +2388,7 @@
       
       public void receive(Message message)
       {
-         if (trace) { log.trace(nodeId + " received message " + message + " on async channel"); }
+         if (trace) { log.trace(currentNodeId + " received message " + message + " on async channel"); }
          
          try
          {
@@ -2423,7 +2420,7 @@
    {
       public Object handle(Message message)
       {
-         if (trace) { log.info(nodeId + " received message " + message + " on sync channel"); }
+         if (trace) { log.info(currentNodeId + " received message " + message + " on sync channel"); }
          try
          {   
             byte[] bytes = message.getBuffer();
@@ -2441,4 +2438,66 @@
          }         
       }      
    }
+   
+   /*
+    * We use this class to respond to node address mappings being added or removed from the cluster
+    * and then recalculate the node->failover node mapping
+    * 
+    */
+   private class NodeAddressMapListener implements ReplicationListener
+   {
+
+      public void onReplicationChange(Serializable key, Map updatedReplicantMap)
+      {
+         if (key instanceof String && ((String)key).equals(ADDRESS_INFO_KEY))
+         {
+            log.info(currentNodeId + " got node address change");
+            
+            try
+            {
+               //DEBUG only
+               dumpNodeIdAddressMap(updatedReplicantMap);
+            }
+            catch (Exception ignore)
+            {               
+            }
+            
+            //A node-address mapping has been added/removed from global state-
+            //We need to update the failover map
+            generateFailoverMap(updatedReplicantMap);
+         }
+      }
+      
+      private void generateFailoverMap(Map nodeAddressMap)
+      {
+         List nodes = new ArrayList(nodeAddressMap.keySet());
+         
+         log.info("generating failover map");
+         
+         log.info("I have " + nodes.size() + " nodes");
+                                
+         List failoverNodes = failoverMapper.generateMapping(nodes);
+         
+         log.info("I generated " + failoverNodes.size() +" failover nodes");
+         
+         // Now put this in the map of node -> failover node
+         
+         synchronized (failoverMap)
+         {            
+            failoverMap.clear();
+   
+            Iterator iter = nodes.iterator();
+            Iterator iter2 = failoverNodes.iterator();
+   
+            while (iter.hasNext())
+            {
+               Integer node = (Integer)iter.next();
+   
+               Integer failoverNode = (Integer)iter2.next();
+   
+               failoverMap.put(node, failoverNode);
+            }
+         }
+      }      
+   }
 }
\ No newline at end of file

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultFailoverMapper.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.jboss.logging.Logger;
 import org.jboss.messaging.core.plugin.contract.FailoverMapper;
 
 /**
@@ -40,22 +41,25 @@
  */
 public class DefaultFailoverMapper implements FailoverMapper
 {
-
+   private static final Logger log = Logger.getLogger(DefaultFailoverMapper.class);
+   
    public List generateMapping(List nodes)
    {
-      List failoverNodes = new ArrayList(nodes.size());
+      int s = nodes.size();
       
+      log.info("Genertaing failover mapping, node size="+ s);
+            
+      List failoverNodes = new ArrayList(s);
+      
       if (!(nodes instanceof ArrayList))
       {
          //So we can ensure fast index based access
          nodes = new ArrayList(nodes);
       }
-      
-      int s = nodes.size();
-      
+            
       for (int i = 0; i < s; i++)
       {
-         int j = i++;
+         int j = i + 1;
          
          if (j == s)
          {
@@ -67,6 +71,8 @@
          failoverNodes.add(failoverNode);
       }
       
+      log.info("Returning " + failoverNodes.size() + " nodes");
+      
       return failoverNodes;
    }
 

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LeaveClusterRequest.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -36,6 +36,7 @@
    Object execute(PostOfficeInternal office) throws Throwable
    {
       office.handleNodeLeft(nodeId);
+      
       return null;
    }
 

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -85,6 +85,60 @@
       super.tearDown();
    }
    
+   public final void testSimpleJoinLeave() throws Throwable
+   {
+      ClusteredPostOffice office1 = null;
+      
+      ClusteredPostOffice office2 = null;
+      
+      //ClusteredPostOffice office3 = null;
+      
+      try
+      {         
+         log.info("Starting office 1");
+         office1 = createClusteredPostOffice(1, "testgroup");
+         
+         log.info("starting office 2");
+         office2 = createClusteredPostOffice(2, "testgroup");
+                  
+         //office3 = createClusteredPostOffice(3, "testgroup");
+         
+         Thread.sleep(2000);
+         
+         office1.stop();
+         office1 = null;
+         
+         office2.stop();
+         office2 = null;
+         
+//         office3.stop();
+//         office3 = null;
+      }
+      finally
+      {
+         if (office1 != null)
+         {
+            office1.stop();
+         }
+         
+         if (office2 != null)
+         {
+            office2.stop();
+         }
+         
+//         if (office3 != null)
+//         {
+//            office3.stop();
+//         }
+         
+         if (checkNoBindingData())
+         {
+            fail("data still in database");
+         }
+      }
+         
+   }
+   
    public final void testClusteredBindUnbind() throws Throwable
    {
       ClusteredPostOffice office1 = null;

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-12-05 03:10:30 UTC (rev 1704)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/tools/jmx/ServiceContainer.java	2006-12-05 14:06:32 UTC (rev 1705)
@@ -56,6 +56,7 @@
 import org.jboss.test.messaging.tools.ServerManagement;
 import org.jboss.test.messaging.tools.jboss.MBeanConfigurationElement;
 import org.jboss.test.messaging.tools.jboss.ServiceDeploymentDescriptor;
+import org.jboss.test.messaging.tools.jndi.Constants;
 import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
 import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactoryBuilder;
 import org.jboss.tm.TransactionManagerLocator;
@@ -290,6 +291,8 @@
          Hashtable t = InVMInitialContextFactory.getJNDIEnvironment(serverIndex);
          System.setProperty("java.naming.factory.initial",
                             (String)t.get("java.naming.factory.initial"));
+         System.setProperty(Constants.SERVER_INDEX_PROPERTY_NAME,
+                            Integer.toString(serverIndex));
 
          initialContext = new InitialContext();
 




More information about the jboss-cvs-commits mailing list