[hornetq-commits] JBoss hornetq SVN: r9654 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 8 07:30:25 EDT 2010


Author: ataylor
Date: 2010-09-08 07:30:24 -0400 (Wed, 08 Sep 2010)
New Revision: 9654

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
fixed topology and changed startup order in cluster manager

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-09-07 22:46:19 UTC (rev 9653)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java	2010-09-08 11:30:24 UTC (rev 9654)
@@ -69,7 +69,7 @@
             currentMember.getConnector().a =  member.getConnector().a;
             replaced = true;
          }
-         if(hasChanged(currentMember.getConnector().b, member.getConnector().b))
+         if(hasChanged(currentMember.getConnector().b, member.getConnector().b) && member.getConnector().b != null)
          {
             if(currentMember.getConnector().b == null)
             {

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-09-07 22:46:19 UTC (rev 9653)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-09-08 11:30:24 UTC (rev 9654)
@@ -133,6 +133,8 @@
          this.serverLocator.setClusterConnection(true);
          this.serverLocator.setClusterTransportConfiguration(connector);
          this.serverLocator.setBackup(server.getConfiguration().isBackup());
+         this.serverLocator.setReconnectAttempts(-1);
+         this.serverLocator.setRetryInterval(retryInterval);
          
          // a cluster connection will connect to other nodes only if they are directly connected
          // through a static list of connectors 
@@ -334,6 +336,7 @@
       // discard notifications about ourselves
       if (nodeID.equals(nodeUUID.toString()))
       {
+         server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
          return;
       }
 

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-07 22:46:19 UTC (rev 9653)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2010-09-08 11:30:24 UTC (rev 9654)
@@ -401,6 +401,42 @@
             backupSessionFactory = null;
          }
 
+         for (BroadcastGroup broadcastGroup : broadcastGroups.values())
+         {
+            try
+            {
+               broadcastGroup.start();
+            }
+            catch (Exception e)
+            {
+               log.warn("unable to start broadcast group " + broadcastGroup.getName(), e);
+            }
+         }
+
+         for (ClusterConnection clusterConnection : clusterConnections.values())
+         {
+            try
+            {
+               clusterConnection.start();
+            }
+            catch (Exception e)
+            {
+               log.warn("unable to start cluster connection " + clusterConnection.getName(), e);
+            }
+         }
+
+         for (Bridge bridge : bridges.values())
+         {
+            try
+            {
+               bridge.start();
+            }
+            catch (Exception e)
+            {
+               log.warn("unable to start bridge " + bridge.getName(), e);
+            }
+         }
+
          if (clusterConnections.size() > 0)
          {
             announceNode();
@@ -509,7 +545,10 @@
 
       managementService.registerBroadcastGroup(group, config);
 
-      group.start();
+      if (!backup)
+      {
+         group.start();
+      }
    }
 
    private void logWarnNoConnector(final String connectorName, final String bgName)
@@ -658,7 +697,10 @@
 
       managementService.registerBridge(bridge, config);
 
-      bridge.start();
+      if (!backup)
+      {
+         bridge.start();
+      }
    }
 
    private synchronized void deployClusterConnection(final ClusterConnectionConfiguration config) throws Exception
@@ -739,7 +781,10 @@
 
       clusterConnections.put(config.getName(), clusterConnection);
 
-      clusterConnection.start();
+      if (!backup)
+      {
+         clusterConnection.start();
+      }
    }
 
    private Transformer instantiateTransformer(final String transformerClassName)
@@ -762,4 +807,10 @@
       }
       return transformer;
    }
+   //for testing
+   public void clear()
+   {
+      bridges.clear();
+      clusterConnections.clear();
+   }
 }

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java	2010-09-07 22:46:19 UTC (rev 9653)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java	2010-09-08 11:30:24 UTC (rev 9654)
@@ -51,33 +51,23 @@
       super.setUp();
       clearData();
       FakeLockFile.clearLocks();
-      servers.ensureCapacity(5);
-      createConfigs();
+   }
 
+   public void testMultipleFailovers() throws Exception
+   {
+      createLiveConfig(0);
+      createBackupConfig(0, 1,false,  0, 2, 3, 4, 5);
+      createBackupConfig(0, 2,false,  0, 1, 3, 4, 5);
+      createBackupConfig(0, 3,false,  0, 1, 2, 4, 5);
+      createBackupConfig(0, 4, false, 0, 1, 2, 3, 4);
+      createBackupConfig(0, 5, false, 0, 1, 2, 3, 4);
       servers.get(1).start();
       servers.get(2).start();
       servers.get(3).start();
       servers.get(4).start();
       servers.get(5).start();
       servers.get(0).start();
-   }
 
-   /**
-    * @throws Exception
-    */
-   protected void createConfigs() throws Exception
-   {
-
-      createLiveConfig(0);
-      createBackupConfig(1, 0, 2, 3, 4, 5);
-      createBackupConfig(2, 0, 1, 3, 4, 5);
-      createBackupConfig(3, 0, 1, 2, 4, 5);
-      createBackupConfig(4, 0, 1, 2, 3, 4);
-      createBackupConfig(5, 0, 1, 2, 3, 4);
-   }
-
-   public void test() throws Exception
-   {
       ServerLocator locator = getServerLocator(0);
 
       locator.setBlockOnNonDurableSend(true);
@@ -111,12 +101,51 @@
       fail(backupNode, session);
       session.close();
       backupNode = waitForBackup(5);
-      session = sendAndConsume(sf, false);    
+      session = sendAndConsume(sf, false);
       session.close();
       servers.get(backupNode).stop();
       System.out.println("MultipleBackupFailoverTest.test");
    }
 
+   public void testMultipleFailovers2liveservers() throws Exception
+   {
+      createLiveConfig(0, 3);
+      createBackupConfig(0, 1, true, 0, 3);
+      createBackupConfig(0, 2,true, 0, 3);
+      createLiveConfig(3, 0);
+      createBackupConfig(3, 4, true,0, 3);
+      createBackupConfig(3, 5, true,0, 3);
+      servers.get(1).start();
+      servers.get(2).start();
+      servers.get(0).start();
+      servers.get(4).start();
+      servers.get(5).start();
+      servers.get(3).start();
+      ServerLocator locator = getServerLocator(0);
+
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+      locator.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator, 4);
+      ClientSession session = sendAndConsume(sf, true);
+
+      fail(0, session);
+
+      ServerLocator locator2 = getServerLocator(3);
+      locator2.setBlockOnNonDurableSend(true);
+      locator2.setBlockOnDurableSend(true);
+      locator2.setBlockOnAcknowledge(true);
+      locator2.setReconnectAttempts(-1);
+      ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2, 4);
+      ClientSession session2 = sendAndConsume(sf2, true);
+      fail(3, session2);
+      servers.get(2).stop();
+      servers.get(4).stop();
+      servers.get(1).stop();
+      servers.get(3).stop();
+   }
+
    protected void fail(int node, final ClientSession... sessions) throws Exception
    {
       servers.get(node).crash(sessions);
@@ -126,12 +155,12 @@
    {
       long time = System.currentTimeMillis();
       long toWait = seconds * 1000;
-      while(true)
+      while (true)
       {
          for (int i = 0, serversSize = servers.size(); i < serversSize; i++)
          {
             TestableServer backupServer = servers.get(i);
-            if(backupServer.isInitialised())
+            if (backupServer.isInitialised())
             {
                return i;
             }
@@ -144,7 +173,7 @@
          {
             //ignore
          }
-         if(System.currentTimeMillis() > (time + toWait))
+         if (System.currentTimeMillis() > (time + toWait))
          {
             fail("backup server never started");
          }
@@ -152,7 +181,7 @@
    }
 
 
-   private void createBackupConfig(int nodeid, int... nodes)
+   private void createBackupConfig(int liveNode, int nodeid, boolean createClusterConnections, int... nodes)
    {
       Configuration config1 = super.createDefaultConfig();
       config1.getAcceptorConfigurations().clear();
@@ -172,15 +201,22 @@
       TransportConfiguration backupConnector = getConnectorTransportConfiguration(nodeid);
       List<String> pairs = null;
       ClusterConnectionConfiguration ccc1 = new ClusterConnectionConfiguration("cluster1", "jms", backupConnector.getName(), -1, false, false, 1, 1,
-               pairs);
+           createClusterConnections? staticConnectors:pairs);
       config1.getClusterConfigurations().add(ccc1);
       BackupConnectorConfiguration connectorConfiguration = new BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
       config1.setBackupConnectorConfiguration(connectorConfiguration);
       config1.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
+
+
+      config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" + liveNode);
+      config1.setJournalDirectory(config1.getJournalDirectory() + "_" + liveNode);
+      config1.setPagingDirectory(config1.getPagingDirectory() + "_" + liveNode);
+      config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_" + liveNode);
+
       servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
    }
 
-   public void createLiveConfig(int liveNode)
+   public void createLiveConfig(int liveNode, int ... otherLiveNodes)
    {
       TransportConfiguration liveConnector = getConnectorTransportConfiguration(liveNode);
       Configuration config0 = super.createDefaultConfig();
@@ -189,13 +225,27 @@
       config0.setSecurityEnabled(false);
       config0.setSharedStore(true);
       config0.setClustered(true);
-       List<String> pairs = null;
+      List<String> pairs = new ArrayList<String>();
+      for (int node : otherLiveNodes)
+      {
+         TransportConfiguration otherLiveConnector = getConnectorTransportConfiguration(node);
+         config0.getConnectorConfigurations().put(otherLiveConnector.getName(), otherLiveConnector);
+         pairs.add(otherLiveConnector.getName());  
+
+      }
       ClusterConnectionConfiguration ccc0 = new ClusterConnectionConfiguration("cluster1", "jms", liveConnector.getName(), -1, false, false, 1, 1,
-               pairs);
+            pairs);
       config0.getClusterConfigurations().add(ccc0);
       config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+
+      config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" + liveNode);
+      config0.setJournalDirectory(config0.getJournalDirectory() + "_" + liveNode);
+      config0.setPagingDirectory(config0.getPagingDirectory() + "_" + liveNode);
+      config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() + "_" + liveNode);
+
       servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
    }
+
    private TransportConfiguration getConnectorTransportConfiguration(int node)
    {
       HashMap<String, Object> map = new HashMap<String, Object>();
@@ -220,7 +270,8 @@
 
       sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
 
-      assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+      boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+      assertTrue(ok);
       return sf;
    }
 
@@ -230,7 +281,7 @@
       for (int i = 0, configsLength = configs.length; i < configsLength; i++)
       {
          HashMap<String, Object> map = new HashMap<String, Object>();
-         map.put(TransportConstants.SERVER_ID_PROP_NAME, i);
+         map.put(TransportConstants.SERVER_ID_PROP_NAME, nodes[i]);
          configs[i] = new TransportConfiguration(INVM_CONNECTOR_FACTORY, map);
 
       }
@@ -253,10 +304,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                       false,
-                                                       0,
-                                                       System.currentTimeMillis(),
-                                                       (byte)1);
+               false,
+               0,
+               System.currentTimeMillis(),
+               (byte) 1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);
@@ -283,6 +334,7 @@
 
       return session;
    }
+
    class LatchClusterTopologyListener implements ClusterTopologyListener
    {
       final CountDownLatch latch;

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java	2010-09-07 22:46:19 UTC (rev 9653)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java	2010-09-08 11:30:24 UTC (rev 9654)
@@ -14,6 +14,7 @@
 package org.hornetq.tests.integration.cluster.util;
 
 import java.io.File;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -25,7 +26,11 @@
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.DelegatingSession;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
 import org.hornetq.core.server.cluster.impl.FakeLockFile;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.util.ServiceTestBase;
@@ -75,6 +80,7 @@
 
          public void beforeReconnect(HornetQException exception)
          {
+            System.out.println("MyListener.beforeReconnect");
          }
       }
       for (ClientSession session : sessions)
@@ -87,11 +93,13 @@
          remotingConnection.destroy();
          server.getRemotingService().removeConnection(remotingConnection.getID());
       }
+
+      ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
+      clusterManager.clear();
       server.stop();
-      
       // recreate the live.lock file (since it was deleted by the
       // clean stop
-      File lockFile = new File(ServiceTestBase.getJournalDir(), "live.lock");
+      File lockFile = new File(server.getConfiguration().getJournalDirectory(), "live.lock");
       Assert.assertFalse(lockFile.exists());
       lockFile.createNewFile();
 



More information about the hornetq-commits mailing list