[jboss-cvs] JBoss Messaging SVN: r1695 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/server/connectionfactory messaging/core/plugin/contract messaging/core/plugin/postoffice/cluster

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Dec 3 21:26:24 EST 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-12-03 21:26:21 -0500 (Sun, 03 Dec 2006)
New Revision: 1695

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Fixed broken clustered ConnectionFactory initialization. http://jira.jboss.org/jira/browse/JBMESSAGING-667

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-04 02:26:21 UTC (rev 1695)
@@ -34,6 +34,7 @@
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
 import org.jboss.jms.server.ConnectionFactoryManager;
 import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.Version;
 import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
 import org.jboss.jms.server.remoting.JMSDispatcher;
@@ -65,8 +66,12 @@
    
    protected Context initialContext;
    protected ServerPeer serverPeer;
+
+   // Map<uniqueName<String> - ServerConnectionFactoryEndpoint>
    protected Map endpoints;
-   protected Map factories;
+
+   // Map<uniqueName<String> - ClusteredClientConnectionFactoryDelegate>
+   protected Map clusteredDelegates;
    
    protected Replicator replicator;
    
@@ -76,7 +81,7 @@
    {
       this.serverPeer = serverPeer;
       endpoints = new HashMap();
-      factories = new HashMap();
+      clusteredDelegates = new HashMap();
    }
    
    // ConnectionFactoryManager implementation -----------------------
@@ -94,6 +99,7 @@
       throws Exception
    {
       int id = serverPeer.getNextObjectID();
+      Version version = serverPeer.getVersion();
 
       ServerConnectionFactoryEndpoint endpoint =
          new ServerConnectionFactoryEndpoint(id, serverPeer, clientID,
@@ -101,61 +107,43 @@
                                              defaultTempQueueFullSize,
                                              defaultTempQueuePageSize,
                                              defaultTempQueueDownCacheSize);
+      endpoints.put(uniqueName, endpoint);
 
-      ClientConnectionFactoryDelegate delegate =
-         new ClientConnectionFactoryDelegate(id, locatorURI, serverPeer.getVersion(), clientPing);
-            
-      if (clustered)
+      JMSDispatcher.instance.
+         registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
+
+      ClientConnectionFactoryDelegate localDelegate =
+         new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+
+      if (!clustered)
       {
-         setupReplicator();
-         
-         //We may have no clustered post office deployed, in which case a clustered connection factory
-         //behaves like a standard connection factory
-         if (replicator != null)
-         {                        
-            // First we must replicate the delegate across the cluster so that all the
-            // ConnectionFactoryJNDIMapper instances on different nodes have access to the list of
-            // local connection factories so they can be added to the clustered connection factory
-   
-            replicator.putReplicant(CF_PREFIX + uniqueName, delegate);
-            
-            // Get the list of delegates
-            
-            Map replicants = replicator.getReplicants(CF_PREFIX + uniqueName);
-            
-            //Create a clustered delegate which contains the array of the local delegates
-            //and failover indexes
-            
-            ClusteredClientConnectionFactoryDelegate clusteredDel = 
-               new ClusteredClientConnectionFactoryDelegate(id,
-                        locatorURI, serverPeer.getVersion(), clientPing);
-            
-            setFailoverDelegates(clusteredDel, replicants);
-            
-            delegate = clusteredDel;
-         }
+         rebindConnectionFactory(initialContext, jndiBindings, localDelegate);
+         return;
       }
-      
-      ConnectionFactoryAdvised connFactoryAdvised = new ConnectionFactoryAdvised(endpoint);
-      
-      JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
-      
-      if (jndiBindings != null)
+
+      // We are clustered, we need to propagate the local delegate across the cluster.
+
+      setupReplicator();
+
+      if (replicator == null)
       {
-         List jndiNames = jndiBindings.getNames();
-         for(Iterator i = jndiNames.iterator(); i.hasNext(); )
-         {
-            String jndiName = (String)i.next();
-            JNDIUtil.rebind(initialContext, jndiName, cf);
-         }
+         return;
       }
-      
-      factories.put(uniqueName, cf);
-      
-      endpoints.put(uniqueName, endpoint);
-      
-      JMSDispatcher.instance.registerTarget(new Integer(id), connFactoryAdvised);
-            }
+
+      // Create a "hollow" clustered delegate ...
+
+      ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+         new ClusteredClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
+
+      clusteredDelegates.put(uniqueName, clusteredDelegate);
+
+      // ... and then update it (and the clustered delegate on every node, while we're at it) by
+      // replicating the local delegate across the cluster. This way, the clustred delegates on each
+      // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
+      // will rebind updated ConnectionFactories in JNDI.
+
+      replicator.putReplicant(CF_PREFIX + uniqueName, localDelegate);
+   }
    
    public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
    {
@@ -181,7 +169,7 @@
       }
       
       ClientConnectionFactoryDelegate delegate =
-         (ClientConnectionFactoryDelegate)factories.remove(uniqueName);
+         (ClientConnectionFactoryDelegate)clusteredDelegates.remove(uniqueName);
       
       if (delegate == null)
       {
@@ -241,55 +229,44 @@
    
    // ReplicationListener interface ----------------------------------
    
-   public synchronized void onReplicationChange(Serializable key, Map replicants)
+   public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap)
    {
       try
       {
-         //The list of connection factories across the cluster has changed -
-         //we need to update the clustered connection factories in JNDI with
-         //new ones with the up to date list
+         // The list of connection factories across the cluster has changed, so we need to update
+         // the clustered connection factories in JNDI with new ones with the up to date list.
+         // Replication can be used for other stuff (not just connection factories) so we check
+         // the prefix.
          
-         //Replication can be used for other stuff (not just connection factories)
-         //so we check the prefix
-         
          String sKey = (String)key;
          
          if (key instanceof String && sKey.startsWith(CF_PREFIX))
          {
             String uniqueName = sKey.substring(CF_PREFIX.length());
             
-            ClusteredClientConnectionFactoryDelegate cf =
-               (ClusteredClientConnectionFactoryDelegate)factories.get(uniqueName);
+            ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+               (ClusteredClientConnectionFactoryDelegate)clusteredDelegates.get(uniqueName);
             
-            if (cf == null)
+            if (clusteredDelegate == null)
             {
-               throw new IllegalStateException("Cannot find connection factory " + uniqueName + " to update");
+               throw new IllegalStateException("Cannot find connection delegate for " +
+                                               uniqueName + " to update");
             }
             
-            setFailoverDelegates(cf, replicants);
+            setFailoverDelegates(clusteredDelegate, updatedReplicantMap);
             
-            //Now rebind
+            // Now rebind ...
             
             ServerConnectionFactoryEndpoint endpoint =
-               (ServerConnectionFactoryEndpoint)endpoints.remove(uniqueName);
+               (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
             
             if (endpoint == null)
             {
                throw new IllegalStateException("Cannot find endpoint " + uniqueName );
             }
-            
-            JNDIBindings jndiBindings = endpoint.getJNDIBindings();
-            
-            if (jndiBindings != null)
-            {
-               List jndiNames = jndiBindings.getNames();
-               for(Iterator i = jndiNames.iterator(); i.hasNext(); )
-               {
-                  String jndiName = (String)i.next();
-                  initialContext.rebind(jndiName, cf);
-               }
-            }  
-         } 
+
+            rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), clusteredDelegate);
+         }
       }
       catch (NamingException e)
       {
@@ -316,62 +293,50 @@
    {
       this.serverPeer.getQueuePostOfficeInstance();
    }
-   
-   
-   private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate cf,
-                                     Map replicants)
+
+
+   /**
+    * @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
+    */
+   private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
+                                     Map localDelegates)
    {
-      // We need to get the failover indexes too
-      // If delegates[i] is the current delegate, then the failover delegate
-      // is given by delegates[failoverIndexes[i]]
-      
-      int s = replicants.size();
-      
+      int s = localDelegates.size();
+
+      int[] nodesIDs = new int[s];
+      int[] failoverNodeIDs = new int[s];
       ClientConnectionFactoryDelegate[] delegates = new ClientConnectionFactoryDelegate[s];
-      
-      Iterator iter = replicants.entrySet().iterator();
-      
-      int i = 0;
-      
-      int[] nodes = new int[s];
-      
-      int[] failoverNodes = new int[s];
-      
-      while (iter.hasNext())
+
+      int idx = 0;
+
+      for(Iterator i = localDelegates.entrySet().iterator(); i.hasNext(); )
       {
-         Map.Entry entry = (Map.Entry)iter.next();
-         
-         int nodeId = ((Integer)entry.getKey()).intValue();
-         
-         delegates[i] = (ClientConnectionFactoryDelegate)entry.getValue();
+         Map.Entry entry = (Map.Entry)i.next();
 
-         int failoverNode = replicator.getFailoverNodeForNode(nodeId);
-         
-         nodes[i] = nodeId;
-         
-         failoverNodes[i] = failoverNode; 
-         
-         i++;
+         int nodeID = ((Integer)entry.getKey()).intValue();
+         int failoverNodeID = replicator.getFailoverNodeID(nodeID);
+
+         nodesIDs[idx] = nodeID;
+         failoverNodeIDs[idx] = failoverNodeID;
+         delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
+
+         idx++;
       }
       
       // Generate the failover indexes. This could probably be optimised if need be.
       
       int[] failoverIndexes = new int[s];
       
-      for (int j = 0; j < s; i++)
+      for (int i = 0; i < s; i++)
       {
-         int failoverNode = failoverNodes[j];
-         
-         //What index is this node?
-         
+         int failoverNode = failoverNodeIDs[i];
          int failoverIndex = -1;
          
-         for (int k = 0; k  < s; k++)
+         for (int j = 0; j < s; j++)
          {
-            if (nodes[k] == failoverNode)
+            if (nodesIDs[j] == failoverNode)
             {
-               failoverIndex = k;
-               
+               failoverIndex = j;
                break;
             }
          }
@@ -381,11 +346,29 @@
             throw new IllegalStateException("Failover node is not in list of nodes!");
          }
          
-         failoverIndexes[j] = failoverIndex;
+         failoverIndexes[i] = failoverIndex;
       }
       
-      cf.setFailoverDelegates(delegates, failoverIndexes);
+      clusteredDelegate.setFailoverDelegates(delegates, failoverIndexes);
    }
 
+   private void rebindConnectionFactory(Context ic,
+                                        JNDIBindings jndiBindings,
+                                        ClientConnectionFactoryDelegate delegate)
+      throws NamingException
+   {
+      JBossConnectionFactory cf = new JBossConnectionFactory(delegate);
+
+      if (jndiBindings != null)
+      {
+         List jndiNames = jndiBindings.getNames();
+         for(Iterator i = jndiNames.iterator(); i.hasNext(); )
+         {
+            String jndiName = (String)i.next();
+            JNDIUtil.rebind(ic, jndiName, cf);
+         }
+      }
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java	2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/ReplicationListener.java	2006-12-04 02:26:21 UTC (rev 1695)
@@ -35,5 +35,9 @@
  */
 public interface ReplicationListener
 {
-   void onReplicationChange(Serializable key, Map replicants);
+   /**
+    * @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
+    *        the given key.
+    */
+   void onReplicationChange(Serializable key, Map updatedReplicantMap);
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/contract/Replicator.java	2006-12-04 02:26:21 UTC (rev 1695)
@@ -56,10 +56,12 @@
    void unregisterListener(ReplicationListener listener);
    
    /**
-    * Gets the current failover node id for the specified node id.
-    * @param nodeId The node to get the failover node id for.
-    * @return the failover node id. If there is no failover node (one-node cluster), the method
+    * Gets the current failover node ID for the specified node ID.
+    *
+    * @param nodeID - The node to get the failover node id for.
+    *
+    * @return the failover node ID. If there is no failover node (one-node cluster), the method
     *         returns the original nodeID. 
     */
-   int getFailoverNodeForNode(int nodeId);   
+   int getFailoverNodeID(int nodeID);
 }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-03 22:41:30 UTC (rev 1694)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-04 02:26:21 UTC (rev 1695)
@@ -314,11 +314,11 @@
       
       asyncChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
       
-      this.controlMessageListener = new ControlMessageListener();
+      controlMessageListener = new ControlMessageListener();
       
-      this.requestHandler = new PostOfficeRequestHandler();
+      requestHandler = new PostOfficeRequestHandler();
       
-      this.controlMembershipListener = new ControlMembershipListener();
+      controlMembershipListener = new ControlMembershipListener();
       
       this.controlMessageDispatcher =
          new MessageDispatcher(syncChannel, controlMessageListener,
@@ -603,7 +603,7 @@
       return listBindingsForConditionInternal(condition, false);
    }
    
-   public int getFailoverNodeForNode(int nodeId)
+   public int getFailoverNodeID(int nodeId)
    {
       Integer failoverNode = (Integer)failoverMap.get(new Integer(nodeId));
       
@@ -1812,13 +1812,17 @@
          lock.writeLock().release();
       }
    }
-   
-   private void notifyListeners(Serializable key, Map replicants)
+
+   /**
+    * @param updatedReplicantMap - the updated replicant map. It contains ALL current replicants for
+    *        the given key.
+    */
+   private void notifyListeners(Serializable key, Map updatedReplicantMap)
    {
       for (Iterator i = replicationListeners.iterator(); i.hasNext(); )
       {
          ReplicationListener listener = (ReplicationListener)i.next();
-         listener.onReplicationChange(key, replicants);
+         listener.onReplicationChange(key, updatedReplicantMap);
       }
    } 
    
@@ -1898,7 +1902,7 @@
     */
    private boolean isFailoverNodeForNode(int nodeId)
    {
-      return this.nodeId == getFailoverNodeForNode(nodeId);
+      return this.nodeId == getFailoverNodeID(nodeId);
    }
       
    private byte[] getStateAsBytes() throws Exception




More information about the jboss-cvs-commits mailing list