Author: ataylor
Date: 2010-11-30 11:27:01 -0500 (Tue, 30 Nov 2010)
New Revision: 9953
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
activate cluster connection properly and test fixes
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -244,7 +244,7 @@
public void setBackupConnector(TransportConfiguration live, TransportConfiguration
backUp)
{
- if(live.equals(connectorConfig))
+ if(live.equals(connectorConfig) && backUp != null)
{
backupConfig = backUp;
}
@@ -1209,10 +1209,7 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
- if (msg.getNodeID() != null)
- {
- System.out.println("received disconnect from node " +
msg.getNodeID());
- }
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for
a long time and fail can
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/protocol/core/impl/wireformat/NodeAnnounceMessage.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -49,16 +49,8 @@
this.backup = backup;
this.connector = tc;
- if(System.getProperty("foo") != null)
- {
-
if(tc.toString().contains("org-hornetq-core-remoting-impl-invm-InVMConnectorFactory?server-id=1"))
- {
- System.out.println("");
- }
- }
-
}
-
+
public NodeAnnounceMessage()
{
super(PacketImpl.NODE_ANNOUNCE);
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -48,7 +48,7 @@
SimpleString queueName,
int distance) throws Exception;
- void activate();
+ void activate() throws Exception;
TransportConfiguration getConnector();
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -195,41 +195,14 @@
return;
}
- if (serverLocator != null)
+ started = true;
+
+ if(!backup)
{
- serverLocator.addClusterTopologyListener(this);
- serverLocator.start();
- // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
- server.getExecutorFactory().getExecutor().execute(new Runnable()
- {
- public void run()
- {
- try
- {
- serverLocator.connect();
- }
- catch (Exception e)
- {
- if(started)
- {
- log.warn("did not connect the cluster connection to other
nodes", e);
- }
- }
- }
- });
+ activate();
}
-
- started = true;
- if (managementService != null)
- {
- TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(new SimpleString("name"), name);
- Notification notification = new Notification(nodeUUID.toString(),
-
NotificationType.CLUSTER_CONNECTION_STARTED,
- props);
- managementService.sendNotification(notification);
- }
+
}
public void stop() throws Exception
@@ -299,7 +272,7 @@
return nodes;
}
- public synchronized void activate()
+ public synchronized void activate() throws Exception
{
if (!started)
{
@@ -307,6 +280,41 @@
}
backup = false;
+
+
+ if (serverLocator != null)
+ {
+ serverLocator.addClusterTopologyListener(this);
+ serverLocator.start();
+ // FIXME Ugly ugly code to connect to other nodes and form the cluster... :(
+ server.getExecutorFactory().getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ serverLocator.connect();
+ }
+ catch (Exception e)
+ {
+ if(started)
+ {
+ log.warn("did not connect the cluster connection to other
nodes", e);
+ }
+ }
+ }
+ });
+ }
+
+ if (managementService != null)
+ {
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(new SimpleString("name"), name);
+ Notification notification = new Notification(nodeUUID.toString(),
+
NotificationType.CLUSTER_CONNECTION_STARTED,
+ props);
+ managementService.sendNotification(notification);
+ }
}
public TransportConfiguration getConnector()
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -100,7 +100,7 @@
private Topology topology = new Topology();
- private ServerLocatorInternal backupServerLocator;
+ private volatile ServerLocatorInternal backupServerLocator;
private final List<ServerLocatorInternal> clusterLocators = new
ArrayList<ServerLocatorInternal>();
@@ -384,7 +384,7 @@
{
try
{
- clusterConnection.start();
+ clusterConnection.activate();
}
catch (Exception e)
{
@@ -790,12 +790,10 @@
clusterConnections.put(config.getName(), clusterConnection);
- if (!backup)
+ clusterConnection.start();
+
+ if(backup)
{
- clusterConnection.start();
- }
- else
- {
announceBackup(config, connector);
}
}
@@ -836,7 +834,11 @@
try
{
ClientSessionFactory backupSessionFactory =
backupServerLocator.connect();
- backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), true, connector));
+ log.info("backup announced");
+ }
}
catch (Exception e)
{
Modified:
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -605,17 +605,18 @@
}
connectorsService.stop();
+ //we stop the groupinghandler before we stop te cluster manager so binding
mappings aren't removed in case of failover
+ if (groupingHandler != null)
+ {
+ managementService.removeNotificationListener(groupingHandler);
+ groupingHandler = null;
+ }
if (clusterManager != null)
{
clusterManager.stop();
}
- if (groupingHandler != null)
- {
- managementService.removeNotificationListener(groupingHandler);
- groupingHandler = null;
- }
}
// we stop the remoting service outside a lock
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -785,7 +785,7 @@
{
for (int i = 0; i < consumers.length; i++)
{
- if (consumers[i] != null)
+ if (consumers[i] != null && !consumers[i].consumer.isClosed())
{
ClusterTestBase.log.info("Dumping consumer " + i);
@@ -1163,6 +1163,11 @@
protected void setupSessionFactory(final int node, final boolean netty) throws
Exception
{
+ setupSessionFactory(node, netty, false);
+ }
+
+ protected void setupSessionFactory(final int node, final boolean netty, boolean ha)
throws Exception
+ {
if (sfs[node] != null)
{
throw new IllegalArgumentException("Already a server at " + node);
@@ -1181,7 +1186,14 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY,
params);
}
- locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ if (ha)
+ {
+ locators[node] = HornetQClient.createServerLocatorWithHA(serverTotc);
+ }
+ else
+ {
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ }
locators[node].setBlockOnNonDurableSend(true);
locators[node].setBlockOnDurableSend(true);
@@ -1190,6 +1202,7 @@
sfs[node] = sf;
}
+
protected void setupSessionFactory(final int node, final boolean netty, int
reconnectAttempts) throws Exception
{
if (sfs[node] != null)
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -29,64 +29,9 @@
*/
public class GroupingFailoverReplicationTest extends GroupingFailoverTestBase
{
- private static final Logger log =
Logger.getLogger(GroupingFailoverReplicationTest.class);
-
@Override
- protected void setupReplicatedServer(final int node,
- final boolean fileStorage,
- final boolean netty,
- final int backupNode)
+ boolean isSharedServer()
{
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
-
- Configuration configuration = new ConfigurationImpl();
-
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(node, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalDirectory(getJournalDir(node, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setPagingDirectory(getPageDir(node, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
- configuration.setBackup(true);
- configuration.setSharedStore(false);
-
- configuration.getAcceptorConfigurations().clear();
-
- Map<String, Object> params = generateParams(node, netty);
-
- TransportConfiguration invmtc = new
TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
-
- if (netty)
- {
- TransportConfiguration nettytc = new
TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(nettytc);
- }
-
- HornetQServer server;
-
- if (fileStorage)
- {
- server = HornetQServers.newHornetQServer(configuration);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- servers[node] = server;
+ return false;
}
-
- @Override
- void setupMasterServer(final int i, final boolean fileStorage, final boolean netty)
- {
- setupLiveServer(i, fileStorage, false, netty);
- }
}
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -28,67 +28,10 @@
*/
public class GroupingFailoverSharedServerTest extends GroupingFailoverTestBase
{
- @Override
- protected void setupReplicatedServer(final int node,
- final boolean fileStorage,
- final boolean netty,
- final int backupNode)
- {
- if (servers[node] != null)
- {
- throw new IllegalArgumentException("Already a server at node " +
node);
- }
- Configuration configuration = new ConfigurationImpl();
-
- configuration.setSecurityEnabled(false);
- configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
- configuration.setJournalMinFiles(2);
- configuration.setJournalMaxIO_AIO(1000);
- configuration.setJournalDirectory(getJournalDir(backupNode, false));
- configuration.setJournalFileSize(100 * 1024);
- configuration.setJournalType(getDefaultJournalType());
- configuration.setPagingDirectory(getPageDir(backupNode, false));
- configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
- configuration.setClustered(true);
- configuration.setJournalCompactMinFiles(0);
- configuration.setBackup(true);
- configuration.setSharedStore(true);
-
- configuration.getAcceptorConfigurations().clear();
-
- Map<String, Object> params = generateParams(node, netty);
-
- TransportConfiguration invmtc = new
TransportConfiguration(ServiceTestBase.INVM_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(invmtc);
-
- if (netty)
- {
- TransportConfiguration nettytc = new
TransportConfiguration(ServiceTestBase.NETTY_ACCEPTOR_FACTORY, params);
- configuration.getAcceptorConfigurations().add(nettytc);
- }
-
- HornetQServer server;
-
- if (fileStorage)
- {
- if(nodeManagers[backupNode] == null)
- {
- nodeManagers[backupNode] = new InVMNodeManager();
- }
- server = createInVMFailoverServer(true, configuration,
nodeManagers[backupNode]);
- }
- else
- {
- server = HornetQServers.newHornetQServer(configuration, false);
- }
- servers[node] = server;
- }
-
@Override
- public void setupMasterServer(final int i, final boolean fileStorage, final boolean
netty)
+ boolean isSharedServer()
{
- setupServer(i, fileStorage, netty);
+ return true;
}
-
}
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -21,7 +21,9 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -37,17 +39,17 @@
public void testGroupingLocalHandlerFails() throws Exception
{
- setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
- setupMasterServer(0, isFileStorage(), isNetty());
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
+ setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
- setupClusterConnectionWithBackups("cluster1", "queues", false,
1, isNetty(), 1, new int[] { 0 });
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0);
- setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 1);
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 2, 1);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
@@ -74,6 +76,11 @@
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForServerTopology(servers[1], 3, 5);
+
sendWithProperty(0, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 0);
@@ -90,12 +97,6 @@
waitForBindings(2, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 1, false);
-
- waitForBindings(1, "queues.testaddress", 1, 1, true);
-
- waitForBindings(1, "queues.testaddress", 1, 1, false);
-
sendWithProperty(2, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id1"));
verifyReceiveAll(10, 2);
@@ -116,18 +117,19 @@
public void testGroupingLocalHandlerFailsMultipleGroups() throws Exception
{
- setupReplicatedServer(2, isFileStorage(), isNetty(), 0);
+ setupBackupServer(2, 0, isFileStorage(), isSharedServer(), isNetty());
- setupMasterServer(0, isFileStorage(), isNetty());
+ setupLiveServer(0, isFileStorage(), isSharedServer(), isNetty());
- setupServer(1, isFileStorage(), isNetty());
+ setupLiveServer(1, isFileStorage(), isSharedServer(), isNetty());
setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1);
- setupClusterConnectionWithBackups("cluster1", "queues", false,
1, isNetty(), 1, new int[] { 0 });
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0);
- setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 1);
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 2, 1);
+
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
@@ -138,7 +140,9 @@
try
{
startServers(2, 0, 1);
+
setupSessionFactory(0, isNetty());
+
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
@@ -155,6 +159,12 @@
waitForBindings(0, "queues.testaddress", 1, 1, false);
waitForBindings(1, "queues.testaddress", 1, 1, false);
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForServerTopology(servers[1], 3, 5);
+
+
sendWithProperty(0, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id1"));
sendWithProperty(0, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id2"));
sendWithProperty(0, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id3"));
@@ -166,22 +176,8 @@
closeSessionFactory(0);
- final CountDownLatch latch = new CountDownLatch(1);
+ servers[0].kill();
- class MyListener implements FailureListener
- {
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
- }
-
- Map<String, MessageFlowRecord> records =
((ClusterConnectionImpl)getServer(1).getClusterManager()
-
.getClusterConnection(new SimpleString("cluster1"))).getRecords();
- RemotingConnection rc =
records.get("0").getBridge().getForwardingConnection();
- rc.addFailureListener(new MyListener());
- fail(rc, latch);
-
waitForServerRestart(2);
setupSessionFactory(2, isNetty());
@@ -203,7 +199,7 @@
sendWithProperty(2, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id5"));
sendWithProperty(2, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id6"));
- verifyReceiveAllWithGroupIDRoundRobin(0, 30, 1, 2);
+ verifyReceiveAllWithGroupIDRoundRobin(2, 30, 1, 2);
System.out.println("*****************************************************************************");
}
@@ -219,14 +215,27 @@
}
}
- abstract void setupMasterServer(int i, boolean fileStorage, boolean netty);
+ private void waitForServerTopology(HornetQServer server, int nodes, int seconds)
+ throws InterruptedException
+ {
+ Topology topology = server.getClusterManager().getTopology();
+ long timeToWait = System.currentTimeMillis() + (seconds * 1000);
+ while(topology.nodes()!= nodes)
+ {
+ Thread.sleep(100);
+ if(System.currentTimeMillis() > timeToWait)
+ {
+ fail("timed out waiting for server topology");
+ }
+ }
+ }
public boolean isNetty()
{
return true;
}
- abstract void setupReplicatedServer(int node, boolean fileStorage, boolean netty, int
backupNode);
+ abstract boolean isSharedServer();
private void fail(final RemotingConnection conn, final CountDownLatch latch) throws
InterruptedException
{
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -97,7 +97,8 @@
System.setProperty("foo", "bar");
servers.get(3).crash(session2);
int liveAfter3 = waitForNewLive(10000, true, servers, 4, 5);
-
+ locator.close();
+ locator2.close();
if (liveAfter0 == 2)
{
servers.get(1).stop();
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/integration/cluster/failover/SingleLiveMultipleBackupsFailoverTest.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -94,6 +94,8 @@
session = sendAndConsume(sf, false);
session.close();
servers.get(backupNode).stop();
+
+ locator.close();
}
protected void createBackupConfig(int liveNode, int nodeid, int... nodes)
Modified:
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-30
15:39:22 UTC (rev 9952)
+++
branches/2_2_0_HA_Improvements_preMerge/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-30
16:27:01 UTC (rev 9953)
@@ -98,6 +98,20 @@
{
System.exit(0);
}
+ Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
+ for (Thread thread : threadMap.keySet())
+ {
+ StackTraceElement[] stack = threadMap.get(thread);
+ for (StackTraceElement stackTraceElement : stack)
+ {
+
if(stackTraceElement.getMethodName().contains("getConnectionWithRetry"))
+ {
+ System.out.println(this.getName() + " has left threads
running");
+ fail("test left serverlocator running, this could effect other
tests");
+ //System.exit(0);
+ }
+ }
+ }
}
protected static Map<String, Object> generateParams(final int node, final
boolean netty)