Author: ataylor
Date: 2010-10-29 06:13:41 -0400 (Fri, 29 Oct 2010)
New Revision: 9818
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
fixed threading issue and tests
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -141,7 +141,13 @@
private volatile boolean stopPingingAfterOne;
private volatile boolean closed;
-
+
+ public final Exception e = new Exception();
+
+ private final Object waitLock = new Object();
+
+ public static List<ClientSessionFactoryImpl> factories = new
ArrayList<ClientSessionFactoryImpl>();
+
// Static
//
---------------------------------------------------------------------------------------
@@ -161,6 +167,9 @@
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors) throws
HornetQException
{
+
+ e.fillInStackTrace();
+
this.serverLocator = serverLocator;
this.connectorConfig = connectorConfig;
@@ -390,6 +399,10 @@
public void causeExit()
{
exitLoop = true;
+ synchronized (waitLock)
+ {
+ waitLock.notify();
+ }
}
public void close()
@@ -867,59 +880,69 @@
long interval = retryInterval;
int count = 0;
-
- while (true)
+ factories.add(this);
+ try
{
- if (exitLoop)
+ synchronized (waitLock)
{
- return;
- }
+ while (true)
+ {
+ if (exitLoop)
+ {
+ return;
+ }
- getConnection();
+ getConnection();
- if (connection == null)
- {
- // Failed to get connection
+ if (connection == null)
+ {
+ // Failed to get connection
- if (reconnectAttempts != 0)
- {
- count++;
+ if (reconnectAttempts != 0)
+ {
+ count++;
- if (reconnectAttempts != -1 && count == reconnectAttempts)
- {
- log.warn("Tried " + reconnectAttempts + " times to
connect. Now giving up.");
+ if (reconnectAttempts != -1 && count ==
reconnectAttempts)
+ {
+ log.warn("Tried " + reconnectAttempts + " times
to connect. Now giving up.");
- return;
- }
+ return;
+ }
- try
- {
- Thread.sleep(interval);
- }
- catch (InterruptedException ignore)
- {
- }
+ try
+ {
+ waitLock.wait(interval);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
- // Exponential back-off
- long newInterval = (long)(interval * retryIntervalMultiplier);
+ // Exponential back-off
+ long newInterval = (long)(interval * retryIntervalMultiplier);
- if (newInterval > maxRetryInterval)
- {
- newInterval = maxRetryInterval;
+ if (newInterval > maxRetryInterval)
+ {
+ newInterval = maxRetryInterval;
+ }
+
+ interval = newInterval;
+ }
+ else
+ {
+ return;
+ }
+ }
+ else
+ {
+ return;
+ }
}
-
- interval = newInterval;
- }
- else
- {
- return;
- }
}
- else
- {
- return;
- }
}
+ finally
+ {
+ factories.remove(this);
+ }
}
private void cancelScheduledTasks()
@@ -1084,6 +1107,21 @@
return connection;
}
+ public void finalize() throws Throwable
+ {
+ if (!closed)
+ {
+ log.warn("I'm closing a core ClientSessionFactory you left open. Please
make sure you close all ClientSessionFactories explicitly " + "before letting
them go out of scope! " +
+ System.identityHashCode(this));
+
+ log.warn("The ClientSessionFactory you didn't close was created
here:", e);
+
+ close();
+ }
+
+ super.finalize();
+ }
+
private ConnectorFactory instantiateConnectorFactory(final String
connectorFactoryClassName)
{
ClassLoader loader = Thread.currentThread().getContextClassLoader();
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -159,6 +159,7 @@
private TransportConfiguration clusterTransportConfiguration;
private boolean backup;
+ private final Exception e = new Exception();
private static synchronized ExecutorService getGlobalThreadPool()
{
@@ -303,6 +304,8 @@
final int discoveryPort,
final TransportConfiguration[] transportConfigs)
{
+ e.fillInStackTrace();
+
this.ha = useHA;
this.discoveryAddress = discoveryAddress;
@@ -1310,11 +1313,25 @@
}
}
+ public void finalize() throws Throwable
+ {
+ if (!closed)
+ {
+ log.warn("I'm closing a core ServerLocator you left open. Please
make sure you close all ServerLocators explicitly " + "before letting them go
out of scope! " +
+ System.identityHashCode(this));
+ log.warn("The ServerLocator you didn't close was created
here:", e);
+
+ close();
+ }
+
+ super.finalize();
+ }
+
class Connector implements Callable<ClientSessionFactory>
{
private TransportConfiguration initialConnector;
- private ClientSessionFactoryInternal factory;
+ private volatile ClientSessionFactoryInternal factory;
private boolean isConnected = false;
private boolean interrupted = false;
private Exception e;
@@ -1327,6 +1344,10 @@
public ClientSessionFactory call() throws HornetQException
{
factory = getFactory();
+ if(factory == null)
+ {
+ return null;
+ }
try
{
factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -1338,6 +1359,11 @@
this.e = e;
throw e;
}
+ if(factory != null)
+ {
+ factory.close();
+ factory = null;
+ }
return null;
}
isConnected = true;
@@ -1356,29 +1382,24 @@
return isConnected;
}
- public void disconnect()
+ public synchronized void disconnect()
{
interrupted = true;
- try
+
+ if (factory != null)
{
- ClientSessionFactoryInternal factory = getFactory();
- if (factory != null)
- {
- factory.causeExit();
- }
- else
- {
-
System.out.println("ServerLocatorImpl$StaticConnector$Connector.disconnect");
- }
+ factory.causeExit();
+ factory.close();
+ factory = null;
}
- catch (HornetQException e1)
- {
- log.debug("exception closing factory");
- }
}
private synchronized ClientSessionFactoryInternal getFactory() throws
HornetQException
{
+ if(interrupted)
+ {
+ return null;
+ }
if (factory == null)
{
try
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -214,6 +214,13 @@
bridges.clear();
+ if(backupSessionFactory != null)
+ {
+ backupSessionFactory.close();
+ backupSessionFactory.getServerLocator().close();
+ backupSessionFactory = null;
+ }
+
started = false;
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/DuplicateDetectionTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -115,6 +115,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testSimpleDuplicateDetectionWithString() throws Exception
@@ -183,6 +185,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testCacheSize() throws Exception
@@ -340,6 +344,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testTransactedDuplicateDetection1() throws Exception
@@ -390,6 +396,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testTransactedDuplicateDetection2() throws Exception
@@ -434,6 +442,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testTransactedDuplicateDetection3() throws Exception
@@ -490,6 +500,8 @@
session.close();
sf.close();
+
+ locator.close();
}
/*
@@ -553,6 +565,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection1() throws Exception
@@ -630,6 +644,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection2() throws Exception
@@ -709,6 +725,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection3() throws Exception
@@ -787,6 +805,8 @@
session.close();
sf.close();
+
+ locator.close();
}
public void testXADuplicateDetection4() throws Exception
@@ -867,6 +887,8 @@
session.close();
sf.close();
+
+ locator.close();
}
private ClientMessage createMessage(final ClientSession session, final int i)
@@ -958,6 +980,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1037,6 +1061,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1128,6 +1154,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1229,6 +1257,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1314,6 +1344,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1403,6 +1435,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1494,6 +1528,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1597,6 +1633,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1696,6 +1734,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1797,6 +1837,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
@@ -1897,6 +1939,8 @@
sf.close();
+ locator.close();
+
messagingService2.stop();
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ConsumerCloseTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -48,6 +48,7 @@
private SimpleString queue;
private SimpleString address;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -156,7 +157,7 @@
address = RandomUtil.randomSimpleString();
queue = RandomUtil.randomSimpleString();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
sf = locator.createSessionFactory();
@@ -176,6 +177,8 @@
sf.close();
+ locator.close();
+
server.stop();
session = null;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -119,6 +119,8 @@
session.close();
+ locator.close();
+
server.stop();
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ExpiryAddressTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -38,6 +38,7 @@
private HornetQServer server;
private ClientSession clientSession;
+ private ServerLocator locator;
public void testBasicSend() throws Exception
{
@@ -194,6 +195,8 @@
sendSession.close();
+ locator.close();
+
}
public void testExpireWithDefaultAddressSettings() throws Exception
@@ -310,7 +313,7 @@
// start the server
server.start();
// then we create a client as normal
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
locator.setBlockOnAcknowledge(true);
ClientSessionFactory sessionFactory = locator.createSessionFactory();
@@ -344,6 +347,7 @@
//
}
}
+ locator.close();
server = null;
clientSession = null;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/FailureDeadlockTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -95,6 +95,10 @@
}
+ cf1.close();
+
+ cf2.close();
+
server = null;
jmsServer = null;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/IncompatibleVersionTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -52,6 +52,7 @@
private HornetQServer server;
private CoreRemotingConnection connection;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -66,7 +67,7 @@
server.getConfiguration().setConnectionTTLOverride(500);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
ClientSessionFactory csf = locator.createSessionFactory();
connection = csf.getConnection();
@@ -77,6 +78,8 @@
{
connection.destroy();
+ locator.close();
+
server.stop();
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/MessagePriorityTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -49,6 +49,7 @@
private ClientSession session;
private ClientSessionFactory sf;
+ private ServerLocator locator;
// Static --------------------------------------------------------
@@ -274,7 +275,7 @@
config.setSecurityEnabled(false);
server = HornetQServers.newHornetQServer(config, false);
server.start();
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY));
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
sf = locator.createSessionFactory();
@@ -288,6 +289,8 @@
session.close();
+ locator.close();
+
server.stop();
sf = null;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/NewDeadLetterAddressTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -37,6 +37,7 @@
private HornetQServer server;
private ClientSession clientSession;
+ private ServerLocator locator;
public void testSendToDLAWhenNoRoute() throws Exception
{
@@ -71,7 +72,7 @@
// start the server
server.start();
// then we create a client as normal
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+ locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
ClientSessionFactory sessionFactory = locator.createSessionFactory();
clientSession = sessionFactory.createSession(false, true, false);
}
@@ -90,6 +91,7 @@
//
}
}
+ locator.close();
if (server != null && server.isStarted())
{
try
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -58,6 +58,7 @@
ServerLocator locator =
HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false,
generateParams(0, isNetty())));
ClientSessionFactory csf = locator.createSessionFactory();
csf.close();
+ locator.close();
}
public void testSingleConnectorSingleServerConnect() throws Exception
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -187,6 +187,8 @@
session0.close();
session2.close();
+ locator.close();
+
server0.stop();
server1.stop();
service2.stop();
@@ -315,6 +317,8 @@
session0.close();
session2.close();
+ locator.close();
+
server0.stop();
server1.stop();
service2.stop();
@@ -435,6 +439,8 @@
session0.close();
session1.close();
+ locator.close();
+
server0.stop();
server1.stop();
@@ -572,6 +578,8 @@
session0.close();
session1.close();
+ locator.close();
+
server0.stop();
server1.stop();
@@ -705,6 +713,8 @@
session0.close();
session1.close();
+ locator.close();
+
server0.stop();
server1.stop();
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -41,6 +41,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
super.tearDown();
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.*;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -97,6 +98,8 @@
nodeManagers[i] = new InVMNodeManager();
}
+ locators = new ServerLocator[ClusterTestBase.MAX_SERVERS];
+
}
@Override
@@ -151,6 +154,8 @@
protected NodeManager[] nodeManagers;
+ protected ServerLocator[] locators;
+
protected ClientSessionFactory[] sfs;
protected ClientConsumer getConsumer(final int node)
@@ -431,6 +436,21 @@
}
}
+ protected void closeAllServerLocatorsFactories() throws Exception
+ {
+ for (int i = 0; i < locators.length; i++)
+ {
+ ServerLocator sf = locators[i];
+
+ if (sf != null)
+ {
+ sf.close();
+
+ locators[i] = null;
+ }
+ }
+ }
+
protected void closeSessionFactory(final int node)
{
ClientSessionFactory sf = sfs[node];
@@ -1134,11 +1154,11 @@
serverTotc = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY,
params);
}
- ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(serverTotc);
+ locators[node] = HornetQClient.createServerLocatorWithoutHA(serverTotc);
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- ClientSessionFactory sf = locator.createSessionFactory();
+ locators[node].setBlockOnNonDurableSend(true);
+ locators[node].setBlockOnDurableSend(true);
+ ClientSessionFactory sf = locators[node].createSessionFactory();
sfs[node] = sf;
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterWithBackupTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -132,6 +132,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4, 5);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -19,8 +19,10 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.group.GroupingHandler;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
@@ -88,6 +90,9 @@
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -193,6 +198,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -252,6 +259,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -314,6 +323,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -374,6 +385,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -436,6 +449,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -493,6 +508,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -573,6 +590,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -675,6 +694,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -783,6 +804,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -886,6 +909,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -950,6 +975,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -1016,10 +1043,28 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if(locators != null)
+ {
+ for (ServerLocator locator : locators)
+ {
+ if(locator != null)
+ {
+ System.out.println("ClusteredGroupingTest.tearDown");
+ }
+ }
+ }
+ super.tearDown(); //To change body of overridden methods use File | Settings |
File Templates.
+ }
+
public boolean isNetty()
{
return true;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -159,6 +159,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -664,6 +664,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
clearServer(0, 1, 2);
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OneWayChainClusterTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -53,6 +53,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
super.tearDown();
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -51,6 +51,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
super.tearDown();
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -110,6 +110,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
startServers();
@@ -1771,6 +1773,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterWithBackupTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -88,6 +88,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers();
startServers();
@@ -131,6 +133,8 @@
closeAllConsumers();
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
}
catch (Throwable e)
{
@@ -257,6 +261,8 @@
closeAllConsumers();
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
}
//@Override
@@ -459,6 +465,8 @@
closeAllConsumers();
closeAllSessionFactories();
+
+ closeAllServerLocatorsFactories();
}
@Override
@@ -597,6 +605,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(5, 6, 7, 8, 9, 0, 1, 2, 3, 4);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TemporaryQueueClusterTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -131,6 +131,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -56,6 +56,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
super.tearDown();
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/ClusterWithBackupFailoverTestBase.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -310,6 +310,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2, 3, 4, 5);
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -122,6 +122,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
@@ -225,6 +227,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1, 2);
}
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/restart/ClusterRestartTest.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -98,6 +98,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
}
}
@@ -171,6 +173,8 @@
closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+
stopServers(0, 1);
}
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-10-27
21:50:48 UTC (rev 9817)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-10-29
10:13:41 UTC (rev 9818)
@@ -15,7 +15,9 @@
import java.io.File;
import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
@@ -70,6 +72,33 @@
protected static final String NETTY_CONNECTOR_FACTORY =
NettyConnectorFactory.class.getCanonicalName();
+ private List<ServerLocator> locators = new ArrayList<ServerLocator>();
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ for (ServerLocator locator : locators)
+ {
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ }
+ }
+ locators.clear();
+ if(!ClientSessionFactoryImpl.factories.isEmpty())
+ {
+ for (ClientSessionFactoryImpl factory : ClientSessionFactoryImpl.factories)
+ {
+ //factory.e.printStackTrace();
+ }
+ }
+ super.tearDown(); //To change body of overridden methods use File | Settings |
File Templates.
+ }
+
protected static Map<String, Object> generateParams(final int node, final
boolean netty)
{
Map<String, Object> params = new HashMap<String, Object>();
@@ -421,12 +450,16 @@
protected ServerLocator createInVMNonHALocator()
{
- return HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ ServerLocator locatorWithoutHA = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ locators.add(locatorWithoutHA);
+ return locatorWithoutHA;
}
protected ServerLocator createNettyNonHALocator()
{
- return HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+ ServerLocator serverLocatorWithoutHA =
HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(NETTY_CONNECTOR_FACTORY));
+ locators.add(serverLocatorWithoutHA);
+ return serverLocatorWithoutHA;
}
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws
Exception