Author: ataylor
Date: 2010-09-08 07:30:24 -0400 (Wed, 08 Sep 2010)
New Revision: 9654
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
Log:
fixed topology and changed startup order in cluster manager
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-07
22:46:19 UTC (rev 9653)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/Topology.java 2010-09-08
11:30:24 UTC (rev 9654)
@@ -69,7 +69,7 @@
currentMember.getConnector().a = member.getConnector().a;
replaced = true;
}
- if(hasChanged(currentMember.getConnector().b, member.getConnector().b))
+ if(hasChanged(currentMember.getConnector().b, member.getConnector().b)
&& member.getConnector().b != null)
{
if(currentMember.getConnector().b == null)
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-07
22:46:19 UTC (rev 9653)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-08
11:30:24 UTC (rev 9654)
@@ -133,6 +133,8 @@
this.serverLocator.setClusterConnection(true);
this.serverLocator.setClusterTransportConfiguration(connector);
this.serverLocator.setBackup(server.getConfiguration().isBackup());
+ this.serverLocator.setReconnectAttempts(-1);
+ this.serverLocator.setRetryInterval(retryInterval);
// a cluster connection will connect to other nodes only if they are directly
connected
// through a static list of connectors
@@ -334,6 +336,7 @@
// discard notifications about ourselves
if (nodeID.equals(nodeUUID.toString()))
{
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, distance);
return;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-07
22:46:19 UTC (rev 9653)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-08
11:30:24 UTC (rev 9654)
@@ -401,6 +401,42 @@
backupSessionFactory = null;
}
+ for (BroadcastGroup broadcastGroup : broadcastGroups.values())
+ {
+ try
+ {
+ broadcastGroup.start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to start broadcast group " +
broadcastGroup.getName(), e);
+ }
+ }
+
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ try
+ {
+ clusterConnection.start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to start cluster connection " +
clusterConnection.getName(), e);
+ }
+ }
+
+ for (Bridge bridge : bridges.values())
+ {
+ try
+ {
+ bridge.start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to start bridge " + bridge.getName(), e);
+ }
+ }
+
if (clusterConnections.size() > 0)
{
announceNode();
@@ -509,7 +545,10 @@
managementService.registerBroadcastGroup(group, config);
- group.start();
+ if (!backup)
+ {
+ group.start();
+ }
}
private void logWarnNoConnector(final String connectorName, final String bgName)
@@ -658,7 +697,10 @@
managementService.registerBridge(bridge, config);
- bridge.start();
+ if (!backup)
+ {
+ bridge.start();
+ }
}
private synchronized void deployClusterConnection(final ClusterConnectionConfiguration
config) throws Exception
@@ -739,7 +781,10 @@
clusterConnections.put(config.getName(), clusterConnection);
- clusterConnection.start();
+ if (!backup)
+ {
+ clusterConnection.start();
+ }
}
private Transformer instantiateTransformer(final String transformerClassName)
@@ -762,4 +807,10 @@
}
return transformer;
}
+ //for testing
+ public void clear()
+ {
+ bridges.clear();
+ clusterConnections.clear();
+ }
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java 2010-09-07
22:46:19 UTC (rev 9653)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupFailoverTest.java 2010-09-08
11:30:24 UTC (rev 9654)
@@ -51,33 +51,23 @@
super.setUp();
clearData();
FakeLockFile.clearLocks();
- servers.ensureCapacity(5);
- createConfigs();
+ }
+ public void testMultipleFailovers() throws Exception
+ {
+ createLiveConfig(0);
+ createBackupConfig(0, 1,false, 0, 2, 3, 4, 5);
+ createBackupConfig(0, 2,false, 0, 1, 3, 4, 5);
+ createBackupConfig(0, 3,false, 0, 1, 2, 4, 5);
+ createBackupConfig(0, 4, false, 0, 1, 2, 3, 4);
+ createBackupConfig(0, 5, false, 0, 1, 2, 3, 4);
servers.get(1).start();
servers.get(2).start();
servers.get(3).start();
servers.get(4).start();
servers.get(5).start();
servers.get(0).start();
- }
- /**
- * @throws Exception
- */
- protected void createConfigs() throws Exception
- {
-
- createLiveConfig(0);
- createBackupConfig(1, 0, 2, 3, 4, 5);
- createBackupConfig(2, 0, 1, 3, 4, 5);
- createBackupConfig(3, 0, 1, 2, 4, 5);
- createBackupConfig(4, 0, 1, 2, 3, 4);
- createBackupConfig(5, 0, 1, 2, 3, 4);
- }
-
- public void test() throws Exception
- {
ServerLocator locator = getServerLocator(0);
locator.setBlockOnNonDurableSend(true);
@@ -111,12 +101,51 @@
fail(backupNode, session);
session.close();
backupNode = waitForBackup(5);
- session = sendAndConsume(sf, false);
+ session = sendAndConsume(sf, false);
session.close();
servers.get(backupNode).stop();
System.out.println("MultipleBackupFailoverTest.test");
}
+ public void testMultipleFailovers2liveservers() throws Exception
+ {
+ createLiveConfig(0, 3);
+ createBackupConfig(0, 1, true, 0, 3);
+ createBackupConfig(0, 2,true, 0, 3);
+ createLiveConfig(3, 0);
+ createBackupConfig(3, 4, true,0, 3);
+ createBackupConfig(3, 5, true,0, 3);
+ servers.get(1).start();
+ servers.get(2).start();
+ servers.get(0).start();
+ servers.get(4).start();
+ servers.get(5).start();
+ servers.get(3).start();
+ ServerLocator locator = getServerLocator(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
4);
+ ClientSession session = sendAndConsume(sf, true);
+
+ fail(0, session);
+
+ ServerLocator locator2 = getServerLocator(3);
+ locator2.setBlockOnNonDurableSend(true);
+ locator2.setBlockOnDurableSend(true);
+ locator2.setBlockOnAcknowledge(true);
+ locator2.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal sf2 = createSessionFactoryAndWaitForTopology(locator2,
4);
+ ClientSession session2 = sendAndConsume(sf2, true);
+ fail(3, session2);
+ servers.get(2).stop();
+ servers.get(4).stop();
+ servers.get(1).stop();
+ servers.get(3).stop();
+ }
+
protected void fail(int node, final ClientSession... sessions) throws Exception
{
servers.get(node).crash(sessions);
@@ -126,12 +155,12 @@
{
long time = System.currentTimeMillis();
long toWait = seconds * 1000;
- while(true)
+ while (true)
{
for (int i = 0, serversSize = servers.size(); i < serversSize; i++)
{
TestableServer backupServer = servers.get(i);
- if(backupServer.isInitialised())
+ if (backupServer.isInitialised())
{
return i;
}
@@ -144,7 +173,7 @@
{
//ignore
}
- if(System.currentTimeMillis() > (time + toWait))
+ if (System.currentTimeMillis() > (time + toWait))
{
fail("backup server never started");
}
@@ -152,7 +181,7 @@
}
- private void createBackupConfig(int nodeid, int... nodes)
+ private void createBackupConfig(int liveNode, int nodeid, boolean
createClusterConnections, int... nodes)
{
Configuration config1 = super.createDefaultConfig();
config1.getAcceptorConfigurations().clear();
@@ -172,15 +201,22 @@
TransportConfiguration backupConnector =
getConnectorTransportConfiguration(nodeid);
List<String> pairs = null;
ClusterConnectionConfiguration ccc1 = new
ClusterConnectionConfiguration("cluster1", "jms",
backupConnector.getName(), -1, false, false, 1, 1,
- pairs);
+ createClusterConnections? staticConnectors:pairs);
config1.getClusterConfigurations().add(ccc1);
BackupConnectorConfiguration connectorConfiguration = new
BackupConnectorConfiguration(staticConnectors, backupConnector.getName());
config1.setBackupConnectorConfiguration(connectorConfiguration);
config1.getConnectorConfigurations().put(backupConnector.getName(),
backupConnector);
+
+
+ config1.setBindingsDirectory(config1.getBindingsDirectory() + "_" +
liveNode);
+ config1.setJournalDirectory(config1.getJournalDirectory() + "_" +
liveNode);
+ config1.setPagingDirectory(config1.getPagingDirectory() + "_" +
liveNode);
+ config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_" + liveNode);
+
servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config1)));
}
- public void createLiveConfig(int liveNode)
+ public void createLiveConfig(int liveNode, int ... otherLiveNodes)
{
TransportConfiguration liveConnector =
getConnectorTransportConfiguration(liveNode);
Configuration config0 = super.createDefaultConfig();
@@ -189,13 +225,27 @@
config0.setSecurityEnabled(false);
config0.setSharedStore(true);
config0.setClustered(true);
- List<String> pairs = null;
+ List<String> pairs = new ArrayList<String>();
+ for (int node : otherLiveNodes)
+ {
+ TransportConfiguration otherLiveConnector =
getConnectorTransportConfiguration(node);
+ config0.getConnectorConfigurations().put(otherLiveConnector.getName(),
otherLiveConnector);
+ pairs.add(otherLiveConnector.getName());
+
+ }
ClusterConnectionConfiguration ccc0 = new
ClusterConnectionConfiguration("cluster1", "jms",
liveConnector.getName(), -1, false, false, 1, 1,
- pairs);
+ pairs);
config0.getClusterConfigurations().add(ccc0);
config0.getConnectorConfigurations().put(liveConnector.getName(), liveConnector);
+
+ config0.setBindingsDirectory(config0.getBindingsDirectory() + "_" +
liveNode);
+ config0.setJournalDirectory(config0.getJournalDirectory() + "_" +
liveNode);
+ config0.setPagingDirectory(config0.getPagingDirectory() + "_" +
liveNode);
+ config0.setLargeMessagesDirectory(config0.getLargeMessagesDirectory() +
"_" + liveNode);
+
servers.add(new SameProcessHornetQServer(createFakeLockServer(true, config0)));
}
+
private TransportConfiguration getConnectorTransportConfiguration(int node)
{
HashMap<String, Object> map = new HashMap<String, Object>();
@@ -220,7 +270,8 @@
sf = (ClientSessionFactoryInternal) locator.createSessionFactory();
- assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+ boolean ok = countDownLatch.await(5, TimeUnit.SECONDS);
+ assertTrue(ok);
return sf;
}
@@ -230,7 +281,7 @@
for (int i = 0, configsLength = configs.length; i < configsLength; i++)
{
HashMap<String, Object> map = new HashMap<String, Object>();
- map.put(TransportConstants.SERVER_ID_PROP_NAME, i);
+ map.put(TransportConstants.SERVER_ID_PROP_NAME, nodes[i]);
configs[i] = new TransportConfiguration(INVM_CONNECTOR_FACTORY, map);
}
@@ -253,10 +304,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte) 1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
@@ -283,6 +334,7 @@
return session;
}
+
class LatchClusterTopologyListener implements ClusterTopologyListener
{
final CountDownLatch latch;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-07
22:46:19 UTC (rev 9653)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-08
11:30:24 UTC (rev 9654)
@@ -14,6 +14,7 @@
package org.hornetq.tests.integration.cluster.util;
import java.io.File;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -25,7 +26,11 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.client.impl.DelegatingSession;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
import org.hornetq.core.server.cluster.impl.FakeLockFile;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.util.ServiceTestBase;
@@ -75,6 +80,7 @@
public void beforeReconnect(HornetQException exception)
{
+ System.out.println("MyListener.beforeReconnect");
}
}
for (ClientSession session : sessions)
@@ -87,11 +93,13 @@
remotingConnection.destroy();
server.getRemotingService().removeConnection(remotingConnection.getID());
}
+
+ ClusterManagerImpl clusterManager = (ClusterManagerImpl)
server.getClusterManager();
+ clusterManager.clear();
server.stop();
-
// recreate the live.lock file (since it was deleted by the
// clean stop
- File lockFile = new File(ServiceTestBase.getJournalDir(), "live.lock");
+ File lockFile = new File(server.getConfiguration().getJournalDirectory(),
"live.lock");
Assert.assertFalse(lockFile.exists());
lockFile.createNewFile();