[jboss-cvs] JBoss Messaging SVN: r1710 - in branches/Branch_Client_Failover_Experiment: src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/server/connectionfactory src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 5 21:07:11 EST 2006


Author: clebert.suconic at jboss.com
Date: 2006-12-05 21:07:06 -0500 (Tue, 05 Dec 2006)
New Revision: 1710

Modified:
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
   branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
Fixing ClusteredClientConnectionFactoryDelegate and other stuff

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-06 02:07:06 UTC (rev 1710)
@@ -73,7 +73,10 @@
 
       for (int i = 0; i < delegates.length; i++)
       {
-         delegates[i].init();
+         if (this!=delegates[i])
+         {
+            delegates[i].init();
+         }
       }
 
       // We add this to the meta data so the failOver aspect can get access to it
@@ -102,6 +105,11 @@
       return delegates;
    }
 
+   public String toString()
+   {
+      return "ClusteredClientConnectionFactoryDelegate[" + id + "] with delegates.length = " + (delegates==null?"null":Integer.toString(delegates.length));
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2006-12-06 02:07:06 UTC (rev 1710)
@@ -119,8 +119,6 @@
       
       ClientConnectionFactoryDelegate delegate = null;
       
-      boolean replicateChanges = false;
-      
       if (clustered)
       {
          setupReplicator();
@@ -138,25 +136,23 @@
          //Local
          delegate = new ClientConnectionFactoryDelegate(id, locatorURI, version, clientPing);
       }
-      
+
+      log.trace("Adding delegates factory " + uniqueName + " pointing to " + delegate);
       delegates.put(uniqueName, delegate);
-      
+
       //Now bind it in JNDI
       rebindConnectionFactory(initialContext, jndiBindings, delegate);
       
-      if (replicateChanges)
-      {
-         // We are clustered, we need to propagate the local delegate across the cluster.
+      // We are clustered, we need to propagate the local delegate across the cluster.
 
-         // ... and then update it (and the clustered delegate on every node, while we're at it) by
-         // replicating the local delegate across the cluster. This way, the clustred delegates on each
-         // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
-         // will rebind updated ConnectionFactories in JNDI.
-         // This will update the local node too
+      // ... and then update it (and the clustered delegate on every node, while we're at it) by
+      // replicating the local delegate across the cluster. This way, the clustred delegates on each
+      // node will contain an updated list of local delegates, and all ConnectionFactoryJNDIMapper
+      // will rebind updated ConnectionFactories in JNDI.
+      // This will update the local node too
 
-         replicator.put(CF_PREFIX + uniqueName, delegate);
-      }
-            
+      replicator.put(CF_PREFIX + uniqueName, delegate);
+
       //Registering with the dispatcher should always be the last thing otherwise a client could use
       //a partially initialised object
       JMSDispatcher.instance.registerTarget(new Integer(id), new ConnectionFactoryAdvised(endpoint));
@@ -164,6 +160,7 @@
    
    public synchronized void unregisterConnectionFactory(String uniqueName, boolean clustered) throws Exception
    {
+      log.trace("ConnectionFactory " + uniqueName + " being unregistered");
       ServerConnectionFactoryEndpoint endpoint =
          (ServerConnectionFactoryEndpoint)endpoints.remove(uniqueName);
       
@@ -184,7 +181,7 @@
             log.debug(jndiName + " unregistered");
          }
       }
-      
+
       ClientConnectionFactoryDelegate delegate =
          (ClientConnectionFactoryDelegate)delegates.remove(uniqueName);
       
@@ -248,6 +245,7 @@
    
    public synchronized void onReplicationChange(Serializable key, Map updatedReplicantMap)
    {
+      log.info("Got replication call " + key + " replicants=" + updatedReplicantMap);
       try
       {
          // The list of connection factories across the cluster has changed, so we need to update
@@ -266,6 +264,12 @@
             
             if (clusteredDelegate == null)
             {
+               log.info("Delegates list: delegates.length=" + delegates.size());
+               for (Iterator iterDelegates = delegates.entrySet().iterator(); iterDelegates.hasNext();)
+               {
+                  Map.Entry entry = (Map.Entry)iterDelegates.next();
+                  log.info("key=" + entry.getKey() + ", value=" + entry.getValue());
+               }
                throw new IllegalStateException("Cannot find connection delegate for " +
                                                uniqueName + " to update");
             }
@@ -315,9 +319,11 @@
    /**
     * @param localDelegates - Map<Integer(nodeId) - ClientConnectionFactoryDelegate>
     */
-   private void setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
+   private void  setFailoverDelegates(ClusteredClientConnectionFactoryDelegate clusteredDelegate,
                                      Map localDelegates)
    {
+      //TODO: make it trace after the code is stable
+      log.info("Updating FailoverDelegates " + localDelegates.size() + " on serverPeer:" + serverPeer.getServerPeerID());
       int s = localDelegates.size();
 
       int[] nodesIDs = new int[s];
@@ -360,7 +366,7 @@
          
          if (failoverIndex == -1)
          {
-            throw new IllegalStateException("Failover node is not in list of nodes!");
+            throw new IllegalStateException("Failover node is not in list of nodes at node " + serverPeer.getServerPeerID() + "!");
          }
          
          failoverIndexes[i] = failoverIndex;
@@ -382,6 +388,7 @@
          for(Iterator i = jndiNames.iterator(); i.hasNext(); )
          {
             String jndiName = (String)i.next();
+            log.info("Rebinding " + jndiName + " CF=" + cf );
             JNDIUtil.rebind(ic, jndiName, cf);
          }
       }

Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java	2006-12-06 02:07:06 UTC (rev 1710)
@@ -21,6 +21,7 @@
  */
 package org.jboss.messaging.core.plugin.postoffice.cluster;
 
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -39,10 +40,8 @@
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
 import javax.sql.DataSource;
 import javax.transaction.TransactionManager;
-
 import org.jboss.jms.server.QueuedExecutorPool;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -75,8 +74,6 @@
 import org.jgroups.blocks.RequestHandler;
 import org.w3c.dom.Element;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * A DefaultClusteredPostOffice
@@ -1306,7 +1303,7 @@
 
 
    /**
-    *  Verifies changes on the View deciding if a node joined or lefted the cluster
+    *  Verifies changes on the View deciding if a node joined or left the cluster
     *
     * */
    private void verifyMembership(View oldView, View newView) throws Throwable
@@ -1858,7 +1855,7 @@
             
             // Need to trigger listeners
             notifyListeners(key, replicants);
-         }         
+         }
       }
    }
 

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java	2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringBase.java	2006-12-06 02:07:06 UTC (rev 1710)
@@ -117,7 +117,7 @@
    private void startServer(int peer)
       throws Exception
    {
-      ServerManagement.start("all-cluster", peer);
+      ServerManagement.start("all", peer);
       ServerManagement.deployClusteredQueue("testDistributedQueue", peer);
       ServerManagement.deployClusteredTopic("testDistributedTopic", peer);
    }
@@ -128,12 +128,15 @@
       {
          super.tearDown();
 
+         ServerManagement.log(ServerManagement.INFO,"Undeploying Server 0");
          ServerManagement.undeployQueue("testDistributedQueue", 0);
          ServerManagement.undeployTopic("testDistributedTopic", 0);
 
+         ServerManagement.log(ServerManagement.INFO,"Undeploying Server 1");
          ServerManagement.undeployQueue("testDistributedQueue", 1);
          ServerManagement.undeployTopic("testDistributedTopic", 1);
 
+         ServerManagement.log(ServerManagement.INFO,"Undeploying Server 2");
          ServerManagement.undeployQueue("testDistributedQueue", 2);
          ServerManagement.undeployTopic("testDistributedTopic", 2);
 

Modified: branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-05 18:38:08 UTC (rev 1709)
+++ branches/Branch_Client_Failover_Experiment/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-06 02:07:06 UTC (rev 1710)
@@ -24,6 +24,8 @@
 
 import javax.jms.MessageConsumer;
 import javax.jms.TextMessage;
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.test.messaging.tools.ServerManagement;
 
@@ -51,21 +53,35 @@
 
    // Public --------------------------------------------------------
 
-   public void testLeaveFailover() throws Exception
+   public void testConnectionFactoryConnect() throws Exception
    {
       try
       {
-         System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-         assertEquals(3,ServerManagement.getServer(0).getNumberOfNodesOnCluster());
-         assertEquals(3,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
-         ServerManagement.stop(0,true);
-         System.out.println("Number of nodes on Cluster = " + ServerManagement.getServer(1).getNumberOfNodesOnCluster());
-         assertEquals(2,ServerManagement.getServer(1).getNumberOfNodesOnCluster());
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+         ClusteredClientConnectionFactoryDelegate delegate =
+                      (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         log.info ("number of delegates = " + delegate.getDelegates().length);
+         log.info ("number of servers = " + ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+
+         assertEquals(3, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+         assertEquals(3, delegate.getDelegates().length);
+
+
+         ServerManagement.log(ServerManagement.INFO,"Stopping server 2 as part of testConnectionFactoryConnect");
+         ServerManagement.stop(2,true);
+
+         factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+         delegate = (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+         assertEquals(2, ServerManagement.getServer(0).getNumberOfNodesOnCluster());
+         assertEquals(2, delegate.getDelegates().length);
       }
       finally
       {
-         ServerManagement.start("all", 0); // tear down needs the server up
+         ServerManagement.start("all", 2);
       }
+
+
    }
 
 




More information about the jboss-cvs-commits mailing list