[jboss-cvs] JBoss Messaging SVN: r1694 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/client/delegate messaging/core/plugin messaging/core/plugin/contract messaging/core/plugin/postoffice messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 3 17:41:34 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-12-03 17:41:30 -0500 (Sun, 03 Dec 2006)
New Revision: 1694

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java
Log:
minor renaming, formatting and logging changes

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -38,56 +38,76 @@
  * @version <tt>$Revision: 1.1 $</tt>
  *
  * $Id$
- *
  */
-public class ClusteredClientConnectionFactoryDelegate extends
-                ClientConnectionFactoryDelegate
+public class ClusteredClientConnectionFactoryDelegate extends ClientConnectionFactoryDelegate
 {
+   // Constants -----------------------------------------------------
+
    private static final long serialVersionUID = 8286850860206289277L;
-   
-   
-   /*
-    * If delegates[i] is the current delegate, then the failover delegate
-    * is given by delegates[failoverIndexes[i]]
-    */   
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   /**
+    * If delegates[i] is the current delegate, then the failover delegate is given by
+    * delegates[failoverIndexes[i]]
+    */
    private ClientConnectionFactoryDelegate[] delegates;
-   
+
    private int[] failoverIndexes;
-      
-   public ClusteredClientConnectionFactoryDelegate(int objectID,
-            String serverLocatorURI, Version serverVersion, boolean clientPing)
+
+   // Constructors --------------------------------------------------
+
+   public ClusteredClientConnectionFactoryDelegate(int objectID, String serverLocatorURI,
+                                                   Version serverVersion, boolean clientPing)
    {
       super(objectID, serverLocatorURI, serverVersion, clientPing);
    }
-   
+
+   // DelegateSupport overrides -------------------------------------
+
    public void init()
    {
       super.init();
-      
-      for (int i=0; i < delegates.length; i++)
+
+      for (int i = 0; i < delegates.length; i++)
       {
          delegates[i].init();
       }
-      
-      //We add this to the meta data so the failOver aspect can get access to it
-      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_DELEGATES,
+
+      // We add this to the meta data so the failOver aspect can get access to it
+      getMetaData().addMetaData(MetaDataConstants.JMS,
+                                MetaDataConstants.CF_DELEGATES,
                                 delegates, PayloadKey.TRANSIENT);
-      
-      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES,
+
+      getMetaData().addMetaData(MetaDataConstants.JMS,
+                                MetaDataConstants.CF_FAILOVER_INDEXES,
                                 failoverIndexes, PayloadKey.TRANSIENT);
    }
-   
-   //Only be used in testing
+
+   // Public --------------------------------------------------------
+
+
+   public void setFailoverDelegates(ClientConnectionFactoryDelegate[] delegates,
+                                    int[] failoverIndexes)
+   {
+      this.delegates = delegates;
+      this.failoverIndexes = failoverIndexes;
+   }
+
+   // Only be used in testing
    public ClientConnectionFactoryDelegate[] getDelegates()
    {
       return delegates;
    }
-   
-   public void setFailoverDelegates(ClientConnectionFactoryDelegate[] delegates, int[] failoverIndexes)
-   {
-      this.delegates = delegates;
-      
-      this.failoverIndexes = failoverIndexes;
-   }
-   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -240,16 +240,16 @@
          FilterFactory ff = new SelectorFactory();
          
          FailoverMapper mapper = new DefaultFailoverMapper();
-            
+
          postOffice =  new DefaultClusteredPostOffice(ds, tm, sqlProperties, createTablesOnStartup,
-                                               nodeId, officeName, ms,
-                                               pm, tr, ff, pool, 
-                                               groupName,
-                                               syncChannelConfig, asyncChannelConfig,
-                                               stateTimeout, castTimeout,
-                                               pullPolicy, rf,
-                                               mapper,
-                                               statsSendPeriod);
+                                                      nodeId, officeName, ms,
+                                                      pm, tr, ff, pool,
+                                                      groupName,
+                                                      syncChannelConfig, asyncChannelConfig,
+                                                      stateTimeout, castTimeout,
+                                                      pullPolicy, rf,
+                                                      mapper,
+                                                      statsSendPeriod);
          
          postOffice.start();
          

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -45,7 +45,10 @@
    void putReplicant(Serializable key, Serializable replicant) throws Exception;
    
    boolean removeReplicant(Serializable key) throws Exception;
-   
+
+   /**
+    * @return an empty map if no replicants are found for 'key', but never null.
+    */
    Map getReplicants(Serializable key) throws Exception;
    
    void registerListener(ReplicationListener listener);
@@ -53,9 +56,10 @@
    void unregisterListener(ReplicationListener listener);
    
    /**
-    * Gets the current failover node id for the specified node id
-    * @param nodeId The node to get the failover node id for
-    * @return the failover node id
+    * Gets the current failover node id for the specified node id.
+    * @param nodeId The node to get the failover node id for.
+    * @return the failover node id. If there is no failover node (one-node cluster), the method
+    *         returns the original nodeID. 
     */
    int getFailoverNodeForNode(int nodeId);   
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -122,13 +122,13 @@
    
    public void start() throws Exception
    {
-      if (trace) { log.trace(this + " starting"); }
+      log.debug(this + " starting");
       
       super.start();
       
       loadBindings();
       
-      if (trace) { log.trace(this + " started"); }
+      log.debug(this + " started");
    }
 
    public void stop() throws Exception

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -137,13 +137,13 @@
    
    private Set leftSet;
    
-   private Element syncChannelConfigE;
+   private Element syncChannelConfigElement;
    
-   private Element asyncChannelConfigE;
+   private Element asyncChannelConfigElement;
    
-   private String syncChannelConfigS;
+   private String syncChannelConfig;
    
-   private String asyncChannelConfigS;
+   private String asyncChannelConfig;
    
    private long stateTimeout;
    
@@ -186,76 +186,87 @@
    /*
     * Constructor using Element for configuration
     */
-   public DefaultClusteredPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
-            boolean createTablesOnStartup,
-            int nodeId, String officeName, MessageStore ms,
-            PersistenceManager pm,
-            TransactionRepository tr,
-            FilterFactory filterFactory,
-            QueuedExecutorPool pool,                              
-            String groupName,
-            Element syncChannelConfig,
-            Element asyncChannelConfig,
-            long stateTimeout, long castTimeout,
-            MessagePullPolicy redistributionPolicy,
-            ClusterRouterFactory rf,
-            FailoverMapper failoverMapper,
-            long statsSendPeriod) throws Exception
-   {            
+   public DefaultClusteredPostOffice(DataSource ds,
+                                     TransactionManager tm,
+                                     Properties sqlProperties,
+                                     boolean createTablesOnStartup,
+                                     int nodeId,
+                                     String officeName,
+                                     MessageStore ms,
+                                     PersistenceManager pm,
+                                     TransactionRepository tr,
+                                     FilterFactory filterFactory,
+                                     QueuedExecutorPool pool,
+                                     String groupName,
+                                     Element syncChannelConfig,
+                                     Element asyncChannelConfig,
+                                     long stateTimeout, long castTimeout,
+                                     MessagePullPolicy redistributionPolicy,
+                                     ClusterRouterFactory rf,
+                                     FailoverMapper failoverMapper,
+                                     long statsSendPeriod)
+      throws Exception
+   {
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
            rf, failoverMapper, statsSendPeriod);
       
-      this.syncChannelConfigE = syncChannelConfig;      
-      this.asyncChannelConfigE = asyncChannelConfig;
+      this.syncChannelConfigElement = syncChannelConfig;
+      this.asyncChannelConfigElement = asyncChannelConfig;
    }
      
    /*
     * Constructor using String for configuration
     */
-   public DefaultClusteredPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
-                              boolean createTablesOnStartup,
-                              int nodeId, String officeName, MessageStore ms,
-                              PersistenceManager pm,
-                              TransactionRepository tr,
-                              FilterFactory filterFactory,
-                              QueuedExecutorPool pool,                              
-                              String groupName,
-                              String syncChannelConfig,
-                              String asyncChannelConfig,
-                              long stateTimeout, long castTimeout,
-                              MessagePullPolicy redistributionPolicy,                      
-                              ClusterRouterFactory rf,
-                              FailoverMapper failoverMapper,
-                              long statsSendPeriod) throws Exception
+   public DefaultClusteredPostOffice(DataSource ds,
+                                     TransactionManager tm,
+                                     Properties sqlProperties,
+                                     boolean createTablesOnStartup,
+                                     int nodeId,
+                                     String officeName,
+                                     MessageStore ms,
+                                     PersistenceManager pm,
+                                     TransactionRepository tr,
+                                     FilterFactory filterFactory,
+                                     QueuedExecutorPool pool,
+                                     String groupName,
+                                     String syncChannelConfig,
+                                     String asyncChannelConfig,
+                                     long stateTimeout, long castTimeout,
+                                     MessagePullPolicy redistributionPolicy,
+                                     ClusterRouterFactory rf,
+                                     FailoverMapper failoverMapper,
+                                     long statsSendPeriod) throws Exception
    {            
       this(ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms,
            pm, tr, filterFactory, pool, groupName, stateTimeout, castTimeout, redistributionPolicy,
            rf, failoverMapper, statsSendPeriod);
 
-      this.syncChannelConfigS = syncChannelConfig;      
-      this.asyncChannelConfigS = asyncChannelConfig;     
+      this.syncChannelConfig = syncChannelConfig;
+      this.asyncChannelConfig = asyncChannelConfig;
    }
-   
-   private DefaultClusteredPostOffice(DataSource ds, TransactionManager tm, Properties sqlProperties,
-                               boolean createTablesOnStartup,
-                               int nodeId, String officeName, MessageStore ms,
-                               PersistenceManager pm,                               
-                               TransactionRepository tr,
-                               FilterFactory filterFactory,
-                               QueuedExecutorPool pool,
-                               String groupName,
-                               long stateTimeout, long castTimeout,                             
-                               MessagePullPolicy redistributionPolicy,                               
-                               ClusterRouterFactory rf,
-                               FailoverMapper failoverMapper,
-                               long statsSendPeriod)
+
+   private DefaultClusteredPostOffice(DataSource ds,
+                                      TransactionManager tm,
+                                      Properties sqlProperties,
+                                      boolean createTablesOnStartup,
+                                      int nodeId,
+                                      String officeName,
+                                      MessageStore ms,
+                                      PersistenceManager pm,
+                                      TransactionRepository tr,
+                                      FilterFactory filterFactory,
+                                      QueuedExecutorPool pool,
+                                      String groupName,
+                                      long stateTimeout, long castTimeout,
+                                      MessagePullPolicy redistributionPolicy,
+                                      ClusterRouterFactory rf,
+                                      FailoverMapper failoverMapper,
+                                      long statsSendPeriod)
    {
-      super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr, filterFactory,
-             pool);
+      super (ds, tm, sqlProperties, createTablesOnStartup, nodeId, officeName, ms, pm, tr,
+             filterFactory, pool);
                    
-      this.pm = pm;
-      
       this.groupName = groupName;
       
       this.stateTimeout = stateTimeout;
@@ -287,18 +298,18 @@
          log.warn("Attempt to start() but " + this + " is already started");
       }
       
-      if (syncChannelConfigE != null)
+      if (syncChannelConfigElement != null)
       {        
-         this.syncChannel = new JChannel(syncChannelConfigE);
-         this.asyncChannel = new JChannel(asyncChannelConfigE); 
+         this.syncChannel = new JChannel(syncChannelConfigElement);
+         this.asyncChannel = new JChannel(asyncChannelConfigElement);
       }
       else
       {
-         this.syncChannel = new JChannel(syncChannelConfigS);
-         this.asyncChannel = new JChannel(asyncChannelConfigS); 
+         this.syncChannel = new JChannel(syncChannelConfig);
+         this.asyncChannel = new JChannel(asyncChannelConfig);
       }
       
-      //We don't want to receive local messages on any of the channels
+      // We don't want to receive local messages on any of the channels
       syncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
       
       asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
@@ -309,8 +320,10 @@
       
       this.controlMembershipListener = new ControlMembershipListener();
       
-      this.controlMessageDispatcher = new MessageDispatcher(syncChannel, controlMessageListener,
-                                                            controlMembershipListener, requestHandler, true);      
+      this.controlMessageDispatcher =
+         new MessageDispatcher(syncChannel, controlMessageListener,
+                               controlMembershipListener, requestHandler, true);
+
       this.dataReceiver = new DataReceiver();
       
       asyncChannel.setReceiver(dataReceiver);    
@@ -596,7 +609,7 @@
       
       if (failoverNode == null)
       {
-         throw new IllegalArgumentException("Cannot find failover node for node " + nodeId);
+         return nodeId;
       }
       
       return failoverNode.intValue();
@@ -612,7 +625,7 @@
       {
          Map m = (Map)replicatedData.get(key);
          
-         return m == null ? null : Collections.unmodifiableMap(m);
+         return m == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(m);
       }
       finally
       {
@@ -622,16 +635,16 @@
 
    public void putReplicant(Serializable key, Serializable replicant) throws Exception
    {
-      putReplicantFromCluster(this.nodeId, key, replicant);
+      putReplicantLocally(nodeId, key, replicant);
       
-      PutReplicantRequest request = new PutReplicantRequest(this.nodeId, key, replicant);
+      PutReplicantRequest request = new PutReplicantRequest(nodeId, key, replicant);
       
       syncSendRequest(request);
    }
    
    public boolean removeReplicant(Serializable key) throws Exception
    {
-      if (removeReplicantFromCluster(this.nodeId, key))
+      if (removeReplicantLocally(this.nodeId, key))
       {      
          RemoveReplicantRequest request = new RemoveReplicantRequest(this.nodeId, key);
          
@@ -679,8 +692,12 @@
          lock.writeLock().release();
       }      
    }
-   
-   public void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception
+
+   /**
+    * @param originatorNodeID - the ID of the node that initiated the modification.
+    */
+   public void putReplicantLocally(int originatorNodeID, Serializable key, Serializable replicant)
+      throws Exception
    {
       lock.writeLock().acquire();
       
@@ -697,15 +714,18 @@
          
          m.put(new Integer(nodeId), replicant); 
          
-         notifyListeners(key, m);         
+         notifyListeners(key, m);
       }
       finally
       {
          lock.writeLock().release();
       }
    }
-   
-   public boolean removeReplicantFromCluster(int nodeId, Serializable key) throws Exception
+
+   /**
+    * @param originatorNodeID - the ID of the node that initiated the modification.
+    */
+   public boolean removeReplicantLocally(int originatorNodeID, Serializable key) throws Exception
    {
       lock.writeLock().acquire();
       
@@ -730,7 +750,7 @@
             replicatedData.remove(key);
          }
          
-         notifyListeners(key, m);         
+         notifyListeners(key, m);
          
          return true;
       }
@@ -1658,7 +1678,7 @@
    
    protected void loadBindings() throws Exception
    {
-      if (trace) { log.trace(this.nodeId + " loading bindings"); }
+      log.debug(this.nodeId + " loading bindings");
       
       boolean isState = syncChannel.getState(null, stateTimeout);
       
@@ -1728,18 +1748,17 @@
    }
    
    /*
-    * Removes all non durable binding data, and any local replicant data for the specified
-    * node
+    * Removes all non durable binding data, and any local replicant data for the specified node.
     */
-   private void removeDataForNode(Integer parameterNodeId) throws Exception
+   private void removeDataForNode(Integer nodeID) throws Exception
    {
-      log.info("Node " + parameterNodeId + " requested to leave cluster");
+      log.info("Node " + nodeID + " requested to leave cluster");
       
       lock.writeLock().acquire();
 
       try
       {          
-         Map nameMap = (Map)nameMaps.get(parameterNodeId);
+         Map nameMap = (Map)nameMaps.get(nodeID);
 
          if (nameMap != null)
          {
@@ -1765,30 +1784,26 @@
             {
                Binding binding = (Binding)iter.next();
                
-               removeBinding(parameterNodeId.intValue(), binding.getQueue().getName());
+               removeBinding(nodeID.intValue(), binding.getQueue().getName());
             }
          }
          
-         //We need to remove any replicant data for the node
-         //this includes the node-address info
-         Iterator iter = replicatedData.entrySet().iterator();
-         
-         while (iter.hasNext())
+         // We need to remove any replicant data for the node. This includes the node-address info.
+         for(Iterator i = replicatedData.entrySet().iterator(); i.hasNext(); )
          {
-            Map.Entry entry = (Map.Entry)iter.next();
+            Map.Entry entry = (Map.Entry)i.next();
             
             String key = (String)entry.getKey();
-            
             Map replicants = (Map)entry.getValue();
             
-            replicants.remove(parameterNodeId);
+            replicants.remove(nodeID);
             
             if (replicants.isEmpty())
             {
-               iter.remove();
+               i.remove();
             }     
             
-            //Need to trigger listeners
+            // Need to trigger listeners
             notifyListeners(key, replicants);
          }                  
       }
@@ -1800,12 +1815,9 @@
    
    private void notifyListeners(Serializable key, Map replicants)
    {
-      Iterator iter = replicationListeners.iterator();
-      
-      while (iter.hasNext())
+      for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
       {
-         ReplicationListener listener = (ReplicationListener)iter.next();
-         
+         ReplicationListener listener = (ReplicationListener)i.next();
          listener.onReplicationChange(key, replicants);
       }
    } 
@@ -1815,17 +1827,15 @@
     */
    private void syncSendRequest(ClusterRequest request) throws Exception
    {
-      if (trace) { log.info(this.nodeId + " sending synch request to group, request: " + request); }
+      if (trace) { log.trace(this.nodeId + " sending synch request to group, request: " + request); }
 
-       System.out.println("***************Request Sent **************");
-
       byte[] bytes = writeRequest(request);
 
       Message message = new Message(null, null, bytes);
 
       controlMessageDispatcher.castMessage(null, message, GroupRequest.GET_ALL, castTimeout);
 
-      if (trace) { log.info(this.nodeId + " sent and executed ok"); }
+      if (trace) { log.trace(this.nodeId + " sent and executed ok"); }
    }
    
 
@@ -1835,7 +1845,7 @@
       lock.readLock().acquire();
       try
       { 
-         Map map = this.getReplicants(ADDRESS_INFO_KEY);
+         Map map = getReplicants(ADDRESS_INFO_KEY);
          
          if (map == null)
          {
@@ -2049,25 +2059,29 @@
    }
    
    /*
-    * Given a JGroups view, generate a map of node to failover node
-    * The mapping is determined by a pluggable policy
+    * Given a JGroups view, generate a map of node to failover node.
+    * The mapping is determined by a pluggable policy.
     */
    private void generateFailoverMap(View view) throws Exception
    {
       List nodes = new ArrayList();
       
-      Iterator iter = view.getMembers().iterator();
-      
-      while (iter.hasNext())
+      for (Iterator i = view.getMembers().iterator(); i.hasNext(); )
       {
-         Address address = (Address)iter.next();
+         Address address = (Address)i.next();
+
+         // Ignore own address, a node can't be its own failover node
+         if (syncChannel.getLocalAddress().equals(address))
+         {
+            continue;
+         }
+
+         // Convert to node id
+         // TODO this should be optimised - currently the implementation of the lookup
+         // is a bit tortuous
          
-         //Convert to node id
-         //TODO this should be optimised - currently the implementation of the lookup
-         //is a bit tortuous
+         Integer n = getNodeIdForSyncAddress(address);
          
-         Integer n = this.getNodeIdForSyncAddress(address);
-         
          if (n == null)
          {
             throw new IllegalStateException("Cannot find node id for address: " + address);
@@ -2078,23 +2092,75 @@
             
       List failoverNodes = failoverMapper.generateMapping(nodes);
       
-      //Now put this in the map of node -> failover node
+      // Now put this in the map of node -> failover node
       
       failoverMap.clear();
-      
-      iter = nodes.iterator();
+
+      Iterator iter = nodes.iterator();
       Iterator iter2 = failoverNodes.iterator();
-      
+
       while (iter.hasNext())
       {
          Integer node = (Integer)iter.next();
-         
+
          Integer failoverNode = (Integer)iter2.next();
-         
+
          failoverMap.put(node, failoverNode);
-      }            
+      }
    }
-   
+
+   /*
+    * A new node has joined the group
+    */
+   private void nodeJoined(Address address) throws Exception
+   {
+      // We need to regenerate the failover map
+
+      generateFailoverMap(currentView);
+   }
+
+   /*
+    * A node has left the group
+    */
+   private void nodeLeft(Address address) throws Throwable
+   {
+      Integer nodeId = getNodeIdForSyncAddress(address);
+
+      if (nodeId != null)
+      {
+         throw new IllegalStateException("Cannot find node id for address " + address);
+      }
+
+      boolean crashed = !this.leaveMessageReceived(nodeId);
+
+      if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
+
+      //Cleanup any hanging transactions - we do this irrespective of whether we crashed
+      check(nodeId);
+
+      //Need to evaluate this before we regenerate the failover map
+      boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
+
+      //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
+      //since that may cause connection factories to be rebound
+      generateFailoverMap(currentView);
+
+      //Remove any replicant data and non durable bindings for the node - again we need to do this
+      //irrespective of whether we crashed
+      //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
+      removeDataForNode(nodeId);
+
+      if (crashed && isFailover)
+      {
+         //The node crashed and we are the failover node
+         //so let's perform failover
+
+         //TODO server side valve
+
+         failOver(nodeId.intValue());
+      }
+   }
+
    // Inner classes -------------------------------------------------------------------
     
    /*
@@ -2171,58 +2237,6 @@
    }
    
    /*
-    * A new node has joined the group
-    */
-   private void nodeJoined(Address address) throws Exception
-   {
-      //We need to regenerate the failover map
-      
-      generateFailoverMap(currentView);     
-   }
-   
-   /*
-    * A node has left the group
-    */
-   private void nodeLeft(Address address) throws Throwable
-   {
-      Integer nodeId = getNodeIdForSyncAddress(address);
-      
-      if (nodeId != null)
-      {
-         throw new IllegalStateException("Cannot find node id for address " + address);
-      }
-                  
-      boolean crashed = !this.leaveMessageReceived(nodeId);
-                        
-      if (trace) { log.trace("Node " + address + " id: " + nodeId +" has left the group, crashed = " + crashed); }
-            
-      //Cleanup any hanging transactions - we do this irrespective of whether we crashed      
-      check(nodeId);
-      
-      //Need to evaluate this before we regenerate the failover map
-      boolean isFailover = isFailoverNodeForNode(nodeId.intValue());
-      
-      //Now we recalculate the failover mapping - this needs to be done before removeDataForNode is called
-      //since that may cause connection factories to be rebound
-      generateFailoverMap(currentView);
-      
-      //Remove any replicant data and non durable bindings for the node - again we need to do this
-      //irrespective of whether we crashed
-      //This will notify any listeners which will recalculate the connection factory delegates and failover delegates
-      removeDataForNode(nodeId);
-      
-      if (crashed && isFailover)
-      {
-         //The node crashed and we are the failover node
-         //so let's perform failover
-         
-         //TODO server side valve
-         
-         failOver(nodeId.intValue());
-      }
-   }
-      
-   /*
     * We use this class so we notice when members leave the group
     */
    private class ControlMembershipListener implements MembershipListener
@@ -2241,7 +2255,7 @@
       {
          if (trace) { log.trace(nodeId + " Got new view, size=" + view.size()); }
 
-         //JGroups will make sure this method is never called by more than one thread concurrently
+         // JGroups will make sure this method is never called by more than one thread concurrently
          
          View oldView = currentView;
          
@@ -2359,9 +2373,7 @@
             
             ClusterRequest request = readRequest(bytes);
             
-            Object result = request.execute(DefaultClusteredPostOffice.this);
-            
-            return result;
+            return request.execute(DefaultClusteredPostOffice.this);
          }
          catch (Throwable e)
          {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PostOfficeInternal.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -52,9 +52,9 @@
  
    void handleNodeLeft(int nodeId) throws Exception;
    
-   void putReplicantFromCluster(int nodeId, Serializable key, Serializable replicant) throws Exception;
+   void putReplicantLocally(int nodeId, Serializable key, Serializable replicant) throws Exception;
    
-   boolean removeReplicantFromCluster(int nodeId, Serializable key) throws Exception;
+   boolean removeReplicantLocally(int nodeId, Serializable key) throws Exception;
    
    void routeFromCluster(Message message, String routingKey, Map queueNameNodeIdMap) throws Exception;
    

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PutReplicantRequest.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -40,54 +40,74 @@
  */
 class PutReplicantRequest extends ClusterRequest
 {
+   // Constants -----------------------------------------------------
+
    static final int TYPE = 12;
 
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
    private int nodeId;
-   
    private Serializable key;
-   
    private Serializable replicant;
-   
+
+   // Constructors --------------------------------------------------
+
    PutReplicantRequest()
-   {      
+   {
    }
-   
+
    PutReplicantRequest(int nodeId, Serializable key, Serializable replicant)
    {
-      this.nodeId = nodeId;  
-      
+      this.nodeId = nodeId;
+
       this.key = key;
-      
+
       this.replicant = replicant;
    }
-   
-   Object execute(PostOfficeInternal office) throws Exception
-   {
-      office.putReplicantFromCluster(nodeId, key, replicant);
-      
-      return null;
-   }
-   
-   byte getType()
-   {
-      return TYPE;
-   }
 
+   // Streamable implementation -------------------------------------
+
    public void read(DataInputStream in) throws Exception
    {
       nodeId = in.readInt();
-      
+
       key = (Serializable)StreamUtils.readObject(in, true);
-      
+
       replicant = (Serializable)StreamUtils.readObject(in, true);
    }
 
    public void write(DataOutputStream out) throws Exception
    {
-      out.writeInt(nodeId);   
-      
+      out.writeInt(nodeId);
+
       StreamUtils.writeObject(out, key, true, true);
-      
+
       StreamUtils.writeObject(out, replicant, true, true);
    }
+
+   // ClusterRequest overrides --------------------------------------
+
+   Object execute(PostOfficeInternal office) throws Exception
+   {
+      office.putReplicantLocally(nodeId, key, replicant);
+      return null;
+   }
+
+   byte getType()
+   {
+      return TYPE;
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java	2006-12-03 08:36:09 UTC (rev 1693)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoveReplicantRequest.java	2006-12-03 22:41:30 UTC (rev 1694)
@@ -48,7 +48,7 @@
    
    Object execute(PostOfficeInternal office) throws Exception
    {
-      office.removeReplicantFromCluster(nodeId, key);
+      office.removeReplicantLocally(nodeId, key);
       
       return null;
    }




More information about the jboss-cvs-commits mailing list