[jboss-cvs] JBoss Messaging SVN: r1718 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms: client/delegate server/connectionfactory

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Dec 7 15:31:52 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-07 15:31:50 -0500 (Thu, 07 Dec 2006)
New Revision: 1718

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
Log:
Fixing connection factories

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-07 19:15:49 UTC (rev 1717)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-07 20:31:50 UTC (rev 1718)
@@ -60,11 +60,26 @@
    // Constructors --------------------------------------------------
 
    public ClusteredClientConnectionFactoryDelegate(int objectID, int serverId, String serverLocatorURI,
-                                                   Version serverVersion, boolean clientPing)
+                                                   Version serverVersion, boolean clientPing,
+                                                   ClientConnectionFactoryDelegate[] delegates,
+                                                   int[] failoverIndexes)
    {
       super(objectID, serverId, serverLocatorURI, serverVersion, clientPing);
+      this.delegates = delegates;
+      this.failoverIndexes = failoverIndexes;
    }
 
+   // Some of the properties of ClientConnectionFactoryDelegate are not exposed..
+   // I didn't want to expose then while I needed another delegate's properties to perform a copy.
+   // So, I created this Constructor so I could have access into protected members inside an extension class
+   public ClusteredClientConnectionFactoryDelegate(ClientConnectionFactoryDelegate mainDelegate,
+                                                   ClientConnectionFactoryDelegate[] delegates,
+                                                   int[] failoverIndexes)
+   {
+      this(mainDelegate.getID(), mainDelegate.serverId, mainDelegate.serverLocatorURI,
+         mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverIndexes);
+   }
+
    // DelegateSupport overrides -------------------------------------
 
    public void init()
@@ -92,13 +107,6 @@
    // Public --------------------------------------------------------
 
 
-   public void setFailoverDelegates(ClientConnectionFactoryDelegate[] delegates,
-                                    int[] failoverIndexes)
-   {
-      this.delegates = delegates;
-      this.failoverIndexes = failoverIndexes;
-   }
-
    // Only be used in testing
    public ClientConnectionFactoryDelegate[] getDelegates()
    {

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-07 19:15:49 UTC (rev 1717)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-07 20:31:50 UTC (rev 1718)
@@ -118,10 +118,19 @@
       endpoints.put(uniqueName, endpoint);   
       
       ClientConnectionFactoryDelegate delegate = null;
-      
+
       if (clustered)
       {
          setupReplicator();
+      }
+      else
+      {
+         log.info("ConnectionFactoryJNDIMapper is non clustered");
+      }
+      
+      /* if (clustered)
+      {
+         setupReplicator();
          
          if (replicator != null)
          {                                             
@@ -136,14 +145,19 @@
          //Local
          delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
                                         locatorURI, version, clientPing);
-      }
+      } */
 
+      delegate = new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+                                     locatorURI, version, clientPing);
+
+
+      //delegate = new ClusteredClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(), locatorURI, version, clientPing);
+
+
       log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
       delegates.put(uniqueName, delegate);
 
-      //Now bind it in JNDI
-      rebindConnectionFactory(initialContext, jndiBindings, delegate);
-      
+
       // We are clustered, we need to propagate the local delegate across the cluster.
 
       // ... and then update it (and the clustered delegate on every node, while we're at it) by
@@ -152,7 +166,16 @@
       // will rebind updated ConnectionFactories in JNDI.
       // This will update the local node too
 
-      replicator.put(CF_PREFIX + uniqueName, delegate);
+      log.info("CF_PREFIX=" + CF_PREFIX + " uniqueName=" + uniqueName + " delegate = " + delegate + " replicator = " + replicator);
+      if (replicator != null)
+      {
+         replicator.put(CF_PREFIX + uniqueName, delegate);
+      }
+      else
+      {
+         //Now bind it in JNDI
+         rebindConnectionFactory(initialContext, jndiBindings, delegate);
+      }
 
       //Registering with the dispatcher should always be the last thing otherwise a client could use
       //a partially initialised object
@@ -263,22 +286,19 @@
          {
             String uniqueName = sKey.substring(CF_PREFIX.length());
             
-            ClusteredClientConnectionFactoryDelegate clusteredDelegate =
-               (ClusteredClientConnectionFactoryDelegate)delegates.get(uniqueName);
-            
-            if (clusteredDelegate == null)
+            if (log.isTraceEnabled())
             {
-               log.info("Delegates list: delegates.length=" + delegates.size());
+               log.trace("Delegates list: delegates.length=" + delegates.size());
                for (Iterator iterDelegates = delegates.entrySet().iterator(); iterDelegates.hasNext();)
                {
                   Map.Entry entry = (Map.Entry)iterDelegates.next();
-                  log.info("key=" + entry.getKey() + ", value=" + entry.getValue());
+                  log.trace("key=" + entry.getKey() + ", value=" + entry.getValue());
                }
                throw new IllegalStateException("Cannot find connection delegate for " +
                                                uniqueName + " to update");
             }
             
-            setFailoverDelegates(clusteredDelegate, updatedReplicantMap);
+            ClusteredClientConnectionFactoryDelegate clusteredDelegate = createClusteredDelegate(updatedReplicantMap);
             
             // Now rebind ...
             
@@ -323,16 +343,22 @@
    /**
     * @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
     */
-   private void  setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
-                                     Map localDelegates)
+   private ClusteredClientConnectionFactoryDelegate createClusteredDelegate(Map localDelegates)
    {
+
+
       //TODO: make it trace after the code is stable
       log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
+
+
       int s = localDelegates.size();
 
       int[] nodesIDs = new int[s];
       int[] failoverNodeIDs = new int[s];
       ClientConnectionFactoryDelegate[] delegates = new ClientConnectionFactoryDelegate[s];
+      // Ths created ClusteredClientFactoryDelegate will copy the properties from this delegate which is
+      // the delegate on this current serverPeer
+      ClientConnectionFactoryDelegate mainDelegate = null;
 
       int idx = 0;
 
@@ -349,8 +375,25 @@
          failoverNodeIDs[idx] = failoverNodeID;
          delegates[idx] = (ClientConnectionFactoryDelegate)entry.getValue();
 
+         if (delegates[idx].getServerId() == this.serverPeer.getServerPeerID())
+         {
+
+            // sanity check
+            if (mainDelegate != null)
+            {
+               throw new IllegalStateException("There are two server with serverID, verify your clustering configuration");
+            }
+            mainDelegate = delegates[idx];
+         }
+
          idx++;
       }
+      // sanity check
+      if (mainDelegate == null)
+      {
+         throw new IllegalStateException("Couldn't find a delegate for this serverID=" + this.serverPeer.getServerPeerID());
+      }
+
       
       // Generate the failover indexes. This could probably be optimised if need be.
       
@@ -377,8 +420,11 @@
          
          failoverIndexes[i] = failoverIndex;
       }
-      
-      clusteredDelegate.setFailoverDelegates(delegates, failoverIndexes);
+
+      ClusteredClientConnectionFactoryDelegate clusteredDelegate =
+         new ClusteredClientConnectionFactoryDelegate(mainDelegate,delegates,failoverIndexes);
+
+      return clusteredDelegate;
    }
 
    private void rebindConnectionFactory(Context ic,




More information about the jboss-cvs-commits mailing list