Author: clebert.suconic
Date: 2011-10-31 15:20:04 -0400 (Mon, 31 Oct 2011)
New Revision: 11621
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
Log:
Fixing tests: TopologyClusterTestBase was wrong (needed to wait topology before start
ServerLocator) and a few synchronization blocks were missing on Discovery initialization
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -57,11 +57,15 @@
*/
public class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener,
Serializable
{
- /*needed for backward compatibility*/
+ /*needed for backward compatibility*/
private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
+
/*end of compatibility fixes*/
- private enum STATE{ INITIALIZED, CLOSED, CLOSING};
-
+ private enum STATE
+ {
+ INITIALIZED, CLOSED, CLOSING
+ };
+
private static final long serialVersionUID = -1615857864410205260L;
private static final Logger log = Logger.getLogger(ServerLocatorImpl.class);
@@ -75,6 +79,7 @@
private transient String identity;
private final Set<ClientSessionFactoryInternal> factories = new
HashSet<ClientSessionFactoryInternal>();
+
private final Set<ClientSessionFactoryInternal> connectingFactories = new
HashSet<ClientSessionFactoryInternal>();
private TransportConfiguration[] initialConnectors;
@@ -530,7 +535,7 @@
return pair.getA();
}
-
+
// Get from initialconnectors
int pos = loadBalancingPolicy.select(initialConnectors.length);
@@ -601,20 +606,19 @@
{
return afterConnectListener;
}
-
+
public ClientSessionFactory createSessionFactory(String nodeID) throws Exception
{
log.info(topology.describe("full topology"));
TopologyMember topologyMember = topology.getMember(nodeID);
-
+
log.info("Creating connection factory towards " + nodeID + " =
" + topologyMember);
-
+
if (topologyMember == null)
{
return null;
}
- else
- if (topologyMember.getA() != null)
+ else if (topologyMember.getA() != null)
{
ClientSessionFactoryInternal factory =
(ClientSessionFactoryInternal)createSessionFactory(topologyMember.getA());
if (topologyMember.getB() != null)
@@ -641,7 +645,7 @@
assertOpen();
initialise();
-
+
ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
transportConfiguration,
callTimeout,
@@ -1239,31 +1243,34 @@
state = STATE.CLOSING;
- if (discoveryGroup != null)
+ synchronized (this)
{
- try
+ if (discoveryGroup != null)
{
- discoveryGroup.stop();
+ try
+ {
+ discoveryGroup.stop();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to stop discovery group", e);
+ }
}
- catch (Exception e)
+ else
{
- log.error("Failed to stop discovery group", e);
+ staticConnector.disconnect();
}
}
- else
- {
- staticConnector.disconnect();
- }
-
+
synchronized (connectingFactories)
{
for (ClientSessionFactoryInternal csf : connectingFactories)
{
- csf.close();
+ csf.close();
}
connectingFactories.clear();
}
-
+
synchronized (factories)
{
Set<ClientSessionFactoryInternal> clonedFactory = new
HashSet<ClientSessionFactoryInternal>(factories);
@@ -1320,7 +1327,7 @@
readOnly = false;
state = STATE.CLOSED;
-
+
}
/** This is directly called when the connection to the node is gone,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/Bridge.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -59,4 +59,6 @@
* Basically this is for cluster bridges being disconnected
*/
void disconnect();
+
+ boolean isConnected();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -65,4 +65,6 @@
void informTopology();
void announceBackup();
+
+ boolean isNodeActive(String id);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -131,6 +131,8 @@
private NotificationService notificationService;
+ private boolean stopping = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -198,7 +200,7 @@
{
this.notificationService = notificationService;
}
-
+
public synchronized void start() throws Exception
{
if (started)
@@ -208,6 +210,8 @@
started = true;
+ stopping = false;
+
if (activated)
{
activate();
@@ -221,7 +225,7 @@
notificationService.sendNotification(notification);
}
}
-
+
public String debug()
{
return toString();
@@ -304,20 +308,32 @@
});
}
- /** The cluster manager needs to use the same executor to close the serverLocator,
otherwise the stop will break.
- * This method is intended to expose this executor to the ClusterManager */
+ public boolean isConnected()
+ {
+ return session != null;
+ }
+
+ /** The cluster manager needs to use the same executor to close the serverLocator,
otherwise the stop will break.
+ * This method is intended to expose this executor to the ClusterManager */
public Executor getExecutor()
{
return executor;
}
-
+
public void stop() throws Exception
{
+ if (stopping)
+ {
+ return;
+ }
+
+ stopping = true;
+
if (log.isDebugEnabled())
{
log.debug("Bridge " + this.name + " being stopped");
}
-
+
if (futureScheduledReconnection != null)
{
futureScheduledReconnection.cancel(true);
@@ -470,7 +486,10 @@
{
if (log.isDebugEnabled())
{
- log.debug("The transformer " + transformer + " made a copy
of the message " + message + " as transformedMessage");
+ log.debug("The transformer " + transformer +
+ " made a copy of the message " +
+ message +
+ " as transformedMessage");
}
}
return transformedMessage;
@@ -543,12 +562,12 @@
// that this will throw a disconnect, we need to remove the message
// from the acks so it will get resent, duplicate detection will cope
// with any messages resent
-
+
if (log.isTraceEnabled())
{
log.trace("going to send message " + message);
}
-
+
try
{
producer.send(dest, message);
@@ -579,7 +598,7 @@
{
producer.close();
}
-
+
csf.cleanup();
}
catch (Throwable dontCare)
@@ -680,7 +699,6 @@
return csf;
}
-
/* Hook for creating session factory */
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
@@ -806,6 +824,12 @@
return;
}
+ if (stopping)
+ {
+ log.info("Bridge is stopping, will not retry");
+ return;
+ }
+
if (reconnectAttemptsInUse >= 0 && retryCount > reconnectAttempts)
{
log.warn("Bridge " + this.name +
@@ -968,7 +992,10 @@
{
public synchronized void run()
{
- connect();
+ if (!stopping)
+ {
+ connect();
+ }
}
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -71,7 +71,7 @@
private final SimpleString idsHeaderName;
private final String targetNodeID;
-
+
private final long targetNodeEventUID;
private final ServerLocatorInternal discoveryLocator;
@@ -150,10 +150,11 @@
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
ClientSessionFactoryInternal factory =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory(targetNodeID);
-
+
if (factory == null)
{
- log.warn("NodeID=" + targetNodeID + " is not available on the
topology. Retrying the connection to that node now");
+ log.warn("NodeID=" + targetNodeID +
+ " is not available on the topology. Retrying the connection to
that node now");
return null;
}
factory.setReconnectAttempts(0);
@@ -181,7 +182,7 @@
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require
different headers
ServerMessage messageCopy = message.copy();
-
+
if (log.isTraceEnabled())
{
log.trace("Clustered bridge copied message " + message + " as
" + messageCopy + " before delivery");
@@ -192,7 +193,7 @@
Set<SimpleString> propNames = new
HashSet<SimpleString>(messageCopy.getPropertyNames());
byte[] queueIds = message.getBytesProperty(idsHeaderName);
-
+
if (queueIds == null)
{
// Sanity check only
@@ -215,7 +216,7 @@
messageCopy.putBytesProperty(MessageImpl.HDR_ROUTE_TO_IDS, queueIds);
messageCopy = super.beforeForward(messageCopy);
-
+
return messageCopy;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -142,15 +142,14 @@
private final Set<TransportConfiguration> allowableConnections = new
HashSet<TransportConfiguration>();
private final ClusterManagerInternal manager;
-
-
+
// Stuff that used to be on the ClusterManager
-
private final Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
+ private boolean stopping = false;
public ClusterConnectionImpl(final ClusterManagerInternal manager,
final TransportConfiguration[] tcConfigs,
@@ -214,7 +213,7 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
-
+
this.topology.setExecutor(executor);
this.server = server;
@@ -325,7 +324,7 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
-
+
this.topology.setExecutor(executor);
this.server = server;
@@ -359,7 +358,7 @@
this.manager = manager;
}
- public void start() throws Exception
+ public void start() throws Exception
{
synchronized (this)
{
@@ -367,10 +366,10 @@
{
return;
}
-
-
+
+ stopping = false;
started = true;
-
+
if (!backup)
{
activate();
@@ -378,7 +377,7 @@
}
}
-
+
public void flushExecutor()
{
Future future = new Future();
@@ -395,7 +394,7 @@
{
return;
}
-
+ stopping = true;
if (log.isDebugEnabled())
{
log.debug(this + "::stopping ClusterConnection");
@@ -435,9 +434,7 @@
props);
managementService.sendNotification(notification);
}
-
-
executor.execute(new Runnable()
{
public void run()
@@ -463,36 +460,35 @@
started = false;
}
-
public void announceBackup()
{
executor.execute(new Runnable()
{
public void run()
{
- try
- {
- if (log.isDebugEnabled())
- {
- log.debug(ClusterConnectionImpl.this + ":: announcing " +
connector + " to " + backupServerLocator);
- }
- ClientSessionFactory backupSessionFactory =
backupServerLocator.connect();
- if (backupSessionFactory != null)
- {
- backupSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new
NodeAnnounceMessage(System.currentTimeMillis(),
-
nodeUUID.toString(),
- true,
- connector,
- null));
- log.info("backup announced");
- }
- }
- catch (Exception e)
- {
- log.warn("Unable to announce backup, retrying", e);
- }
+ try
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(ClusterConnectionImpl.this + ":: announcing " +
connector + " to " + backupServerLocator);
+ }
+ ClientSessionFactory backupSessionFactory =
backupServerLocator.connect();
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new
NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
+ log.info("backup announced");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to announce backup, retrying", e);
+ }
}
});
}
@@ -501,7 +497,7 @@
{
return topology.getMember(manager.getNodeId());
}
-
+
public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
{
topology.addClusterTopologyListener(listener);
@@ -519,7 +515,7 @@
{
return topology;
}
-
+
public void nodeAnnounced(final long uniqueEventID,
final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
@@ -581,6 +577,16 @@
return server;
}
+ public boolean isNodeActive(String nodeId)
+ {
+ MessageFlowRecord rec = records.get(nodeId);
+ if (rec == null)
+ {
+ return false;
+ }
+ return rec.getBridge().isConnected();
+ }
+
public Map<String, String> getNodes()
{
synchronized (records)
@@ -610,7 +616,7 @@
}
backup = false;
-
+
topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
if (backupServerLocator != null)
@@ -627,8 +633,6 @@
backupServerLocator = null;
}
-
-
serverLocator = clusterConnector.createServerLocator(true);
if (serverLocator != null)
@@ -696,6 +700,10 @@
public void nodeDown(final long eventUID, final String nodeID)
{
+ if (stopping)
+ {
+ return;
+ }
if (log.isDebugEnabled())
{
log.debug(this + " receiving nodeDown for nodeID=" + nodeID, new
Exception("trace"));
@@ -731,6 +739,10 @@
final Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
final boolean last)
{
+ if (stopping)
+ {
+ return;
+ }
if (log.isDebugEnabled())
{
String ClusterTestBase = "receiving nodeUP for nodeID=";
@@ -831,13 +843,13 @@
}
}
}
-
+
public synchronized void informTopology()
{
String nodeID = server.getNodeID().toString();
-
+
TopologyMember localMember;
-
+
if (backup)
{
localMember = new TopologyMember(null, connector);
@@ -850,7 +862,6 @@
topology.updateAsLive(nodeID, localMember);
}
-
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
@@ -859,21 +870,21 @@
final boolean start) throws Exception
{
final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, true,
connector);
-
+
String nodeId;
-
+
synchronized (this)
{
if (!started)
{
return;
}
-
+
if (serverLocator == null)
{
return;
}
-
+
nodeId = serverLocator.getNodeID();
}
@@ -1514,8 +1525,9 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl@" + System.identityHashCode(this) +
- "[nodeUUID=" + nodeUUID +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ "[nodeUUID=" +
+ nodeUUID +
", connector=" +
connector +
", address=" +
@@ -1565,7 +1577,7 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for
" + Arrays.toString(tcConfigs));
}
- ServerLocatorImpl locator = new
ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology
: null, true, tcConfigs);
locator.setClusterConnection(true);
return locator;
}
@@ -1597,7 +1609,7 @@
public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null,
true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology :
null, true, dg);
return locator;
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/HAClientTopologyWithDiscoveryTest.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -33,7 +33,13 @@
{
return false;
}
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
protected void setupCluster() throws Exception
{
setupCluster(false);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/IsolatedTopologyTest.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -28,7 +28,6 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.UnitTestCase;
/**
* A IsolatedTopologyTest
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-10-31
15:18:27 UTC (rev 11620)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2011-10-31
19:20:04 UTC (rev 11621)
@@ -17,6 +17,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.hornetq.api.core.HornetQException;
@@ -70,6 +71,11 @@
abstract protected boolean isNetty() throws Exception;
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
@Override
protected void setUp() throws Exception
{
@@ -114,11 +120,12 @@
{
ok = (ok && actual.contains(nodeIDs[expected[i]]));
}
- if (ok)
+ if (ok)
{
return;
}
- } while(System.currentTimeMillis() - start < 5000);
+ }
+ while (System.currentTimeMillis() - start < 5000);
fail("did not contain all expected node ID: " + actual);
}
@@ -145,8 +152,8 @@
{
if (e.getCode() == HornetQException.OBJECT_CLOSED || e.getCode() ==
HornetQException.UNBLOCKED)
{
- ClientSessionFactory sf = locator.createSessionFactory();
- return sf.createSession();
+ ClientSessionFactory sf = locator.createSessionFactory();
+ return sf.createSession();
}
else
{
@@ -174,7 +181,14 @@
for (ClusterConnection clusterConn : clusterManager.getClusterConnections())
{
- nodesCount += clusterConn.getNodes().size();
+ Map<String, String> nodes = clusterConn.getNodes();
+ for (String id : nodes.keySet())
+ {
+ if (clusterConn.isNodeActive(id))
+ {
+ nodesCount++;
+ }
+ }
}
if (nodesCount == count)
@@ -185,85 +199,92 @@
Thread.sleep(10);
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
-
+
log.error(clusterDescription(servers[node]));
throw new IllegalStateException("Timed out waiting for cluster connections
");
}
+
public void testReceiveNotificationsWhenOtherNodesAreStartedAndStopped() throws
Throwable
{
startServers(0);
ServerLocator locator = createHAServerLocator();
-
- ((ServerLocatorImpl)locator).getTopology().setOwner("testReceive");
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(4);
+ try
+ {
+ ((ServerLocatorImpl)locator).getTopology().setOwner("testReceive");
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID,
- String nodeID,
- Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
- boolean last)
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if(!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
{
- System.out.println("Node UP " + nodeID + " added");
- log.info("Node UP " + nodeID + " added");
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ System.out.println("Node UP " + nodeID + "
added");
+ log.info("Node UP " + nodeID + " added");
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
+ else
+ {
+ System.out.println("Node UP " + nodeID + " was already
here");
+ log.info("Node UP " + nodeID + " was already
here");
+ }
}
- else
- {
- System.out.println("Node UP " + nodeID + " was already
here");
- log.info("Node UP " + nodeID + " was already here");
- }
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- log.info("Node down " + nodeID + " accepted");
- System.out.println("Node down " + nodeID + "
accepted");
- nodes.remove(nodeID);
- downLatch.countDown();
+ if (nodes.contains(nodeID))
+ {
+ log.info("Node down " + nodeID + " accepted");
+ System.out.println("Node down " + nodeID + "
accepted");
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
+ else
+ {
+ log.info("Node down " + nodeID + " already
removed");
+ System.out.println("Node down " + nodeID + " already
removed");
+ }
}
- else
- {
- log.info("Node down " + nodeID + " already removed");
- System.out.println("Node down " + nodeID + " already
removed");
- }
- }
- });
+ });
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- startServers(1, 4, 3, 2);
- String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
+ startServers(1, 4, 3, 2);
+ String[] nodeIDs = getNodeIDs(0, 1, 2, 3, 4);
- assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
- checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are UP",
upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 4, 3, 2 }, nodeIDs, nodes);
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
- stopServers(2, 3, 1, 4);
+ stopServers(2, 3, 1, 4);
- assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
- checkContains(new int[] { 0 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
- sf.close();
-
- locator.close();
-
- stopServers(0);
+ sf.close();
+ }
+ finally
+ {
+ locator.close();
+
+ stopServers(0);
+ }
+
}
public void testReceiveNotifications() throws Throwable
@@ -273,73 +294,81 @@
ServerLocator locator = createHAServerLocator();
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(4);
+ try
+ {
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID,
- String nodeID,
- Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
- boolean last)
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if (!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
{
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
}
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- nodes.remove(nodeID);
- downLatch.countDown();
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
}
- }
- });
+ });
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
- checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are UP",
upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
- ClientSession session = sf.createSession();
-
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
+ ClientSession session = sf.createSession();
- stopServers(0);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+ stopServers(0);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
- stopServers(2);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+ stopServers(2);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
- stopServers(4);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+ stopServers(4);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
- stopServers(3);
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1 }, nodeIDs, nodes);
+ stopServers(3);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
- stopServers(1);
-
- assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
- checkContains(new int[] {}, nodeIDs, nodes);
+ stopServers(1);
- sf.close();
-
- locator.close();
+ assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
+ checkContains(new int[] {}, nodeIDs, nodes);
+
+ sf.close();
+ }
+ finally
+ {
+ locator.close();
+ }
+
}
+
public void testStopNodes() throws Throwable
{
startServers(0, 1, 2, 3, 4);
@@ -347,80 +376,87 @@
ServerLocator locator = createHAServerLocator();
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
+ try
+ {
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID, String nodeID,
- Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
- boolean last)
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if (!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
{
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
}
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- nodes.remove(nodeID);
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ }
}
- }
- });
+ });
- ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSessionFactory sf = locator.createSessionFactory();
- assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
- checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+ assertTrue("Was not notified that all servers are UP",
upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
+ ClientSession session = sf.createSession();
- ClientSession session = sf.createSession();
-
- stopServers(0);
- assertFalse(servers[0].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
+ stopServers(0);
+ assertFalse(servers[0].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 2, 3, 4 }, nodeIDs, nodes);
- stopServers(2);
- assertFalse(servers[2].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
+ stopServers(2);
+ assertFalse(servers[2].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3, 4 }, nodeIDs, nodes);
- stopServers(4);
- assertFalse(servers[4].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
+ stopServers(4);
+ assertFalse(servers[4].isStarted());
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1, 3 }, nodeIDs, nodes);
- stopServers(3);
- assertFalse(servers[3].isStarted());
+ stopServers(3);
+ assertFalse(servers[3].isStarted());
- session = checkSessionOrReconnect(session, locator);
- checkContains(new int[] { 1 }, nodeIDs, nodes);
+ session = checkSessionOrReconnect(session, locator);
+ checkContains(new int[] { 1 }, nodeIDs, nodes);
- stopServers(1);
- assertFalse(servers[1].isStarted());
- try
+ stopServers(1);
+ assertFalse(servers[1].isStarted());
+ try
+ {
+ session = checkSessionOrReconnect(session, locator);
+ fail();
+ }
+ catch (Exception e)
+ {
+
+ }
+ }
+ finally
{
- session = checkSessionOrReconnect(session, locator);
- fail();
+ locator.close();
}
- catch (Exception e)
- {
- }
-
- locator.close();
}
-
+
public void testMultipleClientSessionFactories() throws Throwable
{
startServers(0, 1, 2, 3, 4);
@@ -428,67 +464,75 @@
ServerLocator locator = createHAServerLocator();
- final List<String> nodes = new ArrayList<String>();
- final CountDownLatch upLatch = new CountDownLatch(5);
- final CountDownLatch downLatch = new CountDownLatch(4);
+ try
+ {
- locator.addClusterTopologyListener(new ClusterTopologyListener()
- {
- public void nodeUP(final long uniqueEventID, String nodeID,
- Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
- boolean last)
+ waitForClusterConnections(0, 4);
+ waitForClusterConnections(1, 4);
+ waitForClusterConnections(2, 4);
+ waitForClusterConnections(3, 4);
+ waitForClusterConnections(4, 4);
+
+ final List<String> nodes = new ArrayList<String>();
+ final CountDownLatch upLatch = new CountDownLatch(5);
+ final CountDownLatch downLatch = new CountDownLatch(4);
+
+ locator.addClusterTopologyListener(new ClusterTopologyListener()
{
- if (!nodes.contains(nodeID))
+ public void nodeUP(final long uniqueEventID,
+ String nodeID,
+ Pair<TransportConfiguration, TransportConfiguration>
connectorPair,
+ boolean last)
{
- nodes.add(nodeID);
- upLatch.countDown();
+ if (!nodes.contains(nodeID))
+ {
+ nodes.add(nodeID);
+ upLatch.countDown();
+ }
}
- }
- public void nodeDown(final long uniqueEventID, String nodeID)
- {
- if (nodes.contains(nodeID))
+ public void nodeDown(final long uniqueEventID, String nodeID)
{
- nodes.remove(nodeID);
- downLatch.countDown();
+ if (nodes.contains(nodeID))
+ {
+ nodes.remove(nodeID);
+ downLatch.countDown();
+ }
}
- }
- });
+ });
- ClientSessionFactory[] sfs = new ClientSessionFactory[] {
-
locator.createSessionFactory(),
-
locator.createSessionFactory(),
-
locator.createSessionFactory(),
-
locator.createSessionFactory(),
-
locator.createSessionFactory() };
- assertTrue("Was not notified that all servers are UP", upLatch.await(10,
SECONDS));
- checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
+ ClientSessionFactory[] sfs = new ClientSessionFactory[] {
locator.createSessionFactory(),
+
locator.createSessionFactory(),
+
locator.createSessionFactory(),
+
locator.createSessionFactory(),
+
locator.createSessionFactory() };
+ assertTrue("Was not notified that all servers are UP",
upLatch.await(10, SECONDS));
+ checkContains(new int[] { 0, 1, 2, 3, 4 }, nodeIDs, nodes);
- waitForClusterConnections(0, 4);
- waitForClusterConnections(1, 4);
- waitForClusterConnections(2, 4);
- waitForClusterConnections(3, 4);
- waitForClusterConnections(4, 4);
- //we cant close all of the servers, we need to leave one up to notify us
- stopServers(4, 2, 3, 1);
+ // we cant close all of the servers, we need to leave one up to notify us
+ stopServers(4, 2, 3, 1);
- boolean ok = downLatch.await(10, SECONDS);
- if(!ok)
- {
-
System.out.println("TopologyClusterTestBase.testMultipleClientSessionFactories");
+ boolean ok = downLatch.await(10, SECONDS);
+ if (!ok)
+ {
+ log.warn("TopologyClusterTestBase.testMultipleClientSessionFactories
will fail");
+ }
+ assertTrue("Was not notified that all servers are Down", ok);
+ checkContains(new int[] { 0 }, nodeIDs, nodes);
+
+ for (int i = 0; i < sfs.length; i++)
+ {
+ ClientSessionFactory sf = sfs[i];
+ sf.close();
+ }
}
- assertTrue("Was not notified that all servers are Down", ok);
- checkContains(new int[] { 0 }, nodeIDs, nodes);
-
- for (int i = 0; i < sfs.length; i++)
+ finally
{
- ClientSessionFactory sf = sfs[i];
- sf.close();
+ locator.close();
+
+ stopServers(0);
}
-
- locator.close();
- stopServers(0);
}
// Private -------------------------------------------------------