Author: clebert.suconic(a)jboss.com
Date: 2011-09-11 23:28:05 -0400 (Sun, 11 Sep 2011)
New Revision: 11319
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixing tests
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-12
02:25:45 UTC (rev 11318)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-09-12
03:28:05 UTC (rev 11319)
@@ -577,6 +577,11 @@
try
{
+ if (producer != null)
+ {
+ producer.close();
+ }
+
csf.cleanup();
}
catch (Throwable dontCare)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-12
02:25:45 UTC (rev 11318)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-12
03:28:05 UTC (rev 11319)
@@ -415,10 +415,13 @@
{
public void run()
{
- if (serverLocator != null)
+ synchronized (ClusterConnectionImpl.this)
{
- serverLocator.close();
- serverLocator = null;
+ if (serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
}
}
@@ -707,6 +710,23 @@
final boolean start) throws Exception
{
final ServerLocatorInternal targetLocator = new
ServerLocatorImpl(clusterManagerTopology, false, connector);
+
+ String nodeId;
+
+ synchronized (this)
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ if (serverLocator == null)
+ {
+ return;
+ }
+
+ nodeId = serverLocator.getNodeID();
+ }
targetLocator.setReconnectAttempts(0);
@@ -725,7 +745,7 @@
targetLocator.setAfterConnectionInternalListener(this);
- targetLocator.setNodeID(serverLocator.getNodeID());
+ targetLocator.setNodeID(nodeId);
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-12
02:25:45 UTC (rev 11318)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-12
03:28:05 UTC (rev 11319)
@@ -38,7 +38,6 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
-import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
@@ -96,8 +95,6 @@
TransportConstants.DEFAULT_PORT + 8,
TransportConstants.DEFAULT_PORT + 9, };
- private static final long WAIT_TIMEOUT = 10000;
-
protected int getLargeMessageSize()
{
return 500;
@@ -272,42 +269,6 @@
throw new IllegalStateException(msg);
}
- protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
- {
- waitForTopology(server, nodes, WAIT_TIMEOUT);
- }
-
- protected void waitForTopology(final HornetQServer server, final int nodes, final long
timeout) throws Exception
- {
- log.debug("waiting for " + nodes + " on the topology for server =
" + server);
-
- long start = System.currentTimeMillis();
-
- Topology topology = server.getClusterManager().getTopology();
-
- do
- {
- if (nodes == topology.getMembers().size())
- {
- return;
- }
-
- Thread.sleep(10);
- }
- while (System.currentTimeMillis() - start < timeout);
-
- String msg = "Timed out waiting for cluster topology of " + nodes +
- " (received " +
- topology.getMembers().size() +
- ") topology = " +
- topology +
- ")";
-
- log.error(msg);
-
- throw new Exception(msg);
- }
-
protected void waitForBindings(final int node,
final String address,
final int expectedBindingCount,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java 2011-09-12
02:25:45 UTC (rev 11318)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/jms/cluster/TopicClusterTest.java 2011-09-12
03:28:05 UTC (rev 11319)
@@ -83,13 +83,13 @@
// topic1 and 2 should be the same.
// Using a different instance here just to make sure it is implemented
correctly
MessageConsumer cons2 = session2.createDurableSubscriber(topic2,
"sub2");
- Thread.sleep(2000);
+ Thread.sleep(500);
MessageProducer prod1 = session1.createProducer(topic1);
prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0 ; i < 2; i++)
{
prod1.send(session1.createTextMessage("someMessage"));
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-12
02:25:45 UTC (rev 11318)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/JMSClusteredTestBase.java 2011-09-12
03:28:05 UTC (rev 11319)
@@ -118,6 +118,10 @@
jmsServer2.start();
jmsServer2.activated();
waitForServer(jmsServer2.getHornetQServer());
+
+ waitForTopology(jmsServer1.getHornetQServer(), 2);
+
+ waitForTopology(jmsServer2.getHornetQServer(), 2);
cf1 = (ConnectionFactory)
HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new
TransportConfiguration(InVMConnectorFactory.class.getName(),
generateInVMParams(0)));
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-12
02:25:45 UTC (rev 11318)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-12
03:28:05 UTC (rev 11319)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -60,6 +61,9 @@
{
// Constants -----------------------------------------------------
+
+ protected static final long WAIT_TIMEOUT = 10000;
+
// Attributes ----------------------------------------------------
@@ -98,6 +102,43 @@
}
}
+ protected void waitForTopology(final HornetQServer server, final int nodes) throws
Exception
+ {
+ waitForTopology(server, nodes, WAIT_TIMEOUT);
+ }
+
+ protected void waitForTopology(final HornetQServer server, final int nodes, final long
timeout) throws Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server =
" + server);
+
+ long start = System.currentTimeMillis();
+
+ Topology topology = server.getClusterManager().getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < timeout);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes +
+ " (received " +
+ topology.getMembers().size() +
+ ") topology = " +
+ topology +
+ ")";
+
+ log.error(msg);
+
+ throw new Exception(msg);
+ }
+
+
protected static Map<String, Object> generateParams(final int node, final
boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();