Author: ataylor
Date: 2010-09-16 09:41:15 -0400 (Thu, 16 Sep 2010)
New Revision: 9696
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
Log:
added connector code to connect to static connectors in parallel
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -157,8 +157,6 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
- final int initialConnectAttempts,
- final boolean failoverOnInitialConnection,
final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors) throws
HornetQException
@@ -197,6 +195,10 @@
this.interceptors = interceptors;
+ }
+
+ public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection)
throws HornetQException
+ {
// Get the connection
getConnectionWithRetry(initialConnectAttempts);
@@ -210,9 +212,9 @@
// Try and connect to the backup
log.warn("Server is not available to make initial connection to. Will
try backup server instead.");
-
+
this.connectorConfig = backupConfig;
-
+
connectorFactory =
instantiateConnectorFactory(connectorConfig.getFactoryClassName());
transportParams = this.connectorConfig.getParams();
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -12,6 +12,7 @@
*/
package org.hornetq.core.client.impl;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.SessionFailureListener;
@@ -36,4 +37,6 @@
int numSessions();
void removeSession(final ClientSessionInternal session, boolean failingOver);
+
+ void connect(int reconnectAttempts, boolean failoverOnInitialConnection) throws
HornetQException;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -18,17 +18,8 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.*;
+import java.util.concurrent.*;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -71,6 +62,8 @@
private TransportConfiguration[] initialConnectors;
+ private StaticConnector staticConnector = new StaticConnector();
+
private Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -162,9 +155,9 @@
private String groupID;
private String nodeID;
-
+
private TransportConfiguration clusterTransportConfiguration;
-
+
private boolean backup;
private static synchronized ExecutorService getGlobalThreadPool()
@@ -184,8 +177,8 @@
if (globalScheduledThreadPool == null)
{
ThreadFactory factory = new
HornetQThreadFactory("HornetQ-client-global-scheduled-threads",
- true,
- getThisClassLoader());
+ true,
+ getThisClassLoader());
globalScheduledThreadPool =
Executors.newScheduledThreadPool(HornetQClient.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
@@ -428,48 +421,22 @@
{
initialise();
}
-
- public void connect()
+
+ public ClientSessionFactory connect() throws Exception
{
+ ClientSessionFactory sf;
// static list of initial connectors
if (initialConnectors != null && discoveryGroup == null)
{
- for (TransportConfiguration connector : initialConnectors)
- {
- ClientSessionFactory sf = null;
- do
- {
- try
- {
- sf = createSessionFactory(connector);
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.NOT_CONNECTED)
- {
- continue;
- }
- }
- catch (Exception e)
- {
- break;
- }
- }
- while (sf == null);
- }
+ sf = staticConnector.connect();
}
// wait for discovery group to get the list of initial connectors
else
{
- try
- {
- ClientSessionFactory sf = createSessionFactory();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ sf = createSessionFactory();
}
+ factories.add(sf);
+ return sf;
}
public ClientSessionFactory createSessionFactory(final TransportConfiguration
transportConfiguration) throws Exception
@@ -488,7 +455,7 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
}
- ClientSessionFactory factory = new ClientSessionFactoryImpl(this,
+ ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this,
transportConfiguration,
callTimeout,
clientFailureCheckPeriod,
@@ -497,14 +464,14 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
- initialConnectAttempts,
- failoverOnInitialConnection,
threadPool,
scheduledThreadPool,
interceptors);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+
factories.add(factory);
-
+
return factory;
}
@@ -537,7 +504,7 @@
}
}
- ClientSessionFactory factory = null;
+ ClientSessionFactoryInternal factory = null;
synchronized (this)
{
@@ -562,11 +529,10 @@
retryIntervalMultiplier,
maxRetryInterval,
reconnectAttempts,
- initialConnectAttempts,
- failoverOnInitialConnection,
threadPool,
scheduledThreadPool,
interceptors);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
@@ -582,7 +548,7 @@
if (topologyArray == null && initialConnectors != null
&& attempts == initialConnectors.length)
{
throw new HornetQException(HornetQException.NOT_CONNECTED,
- "Cannot connect to server(s). Tried with all available
servers.");
+ "Cannot connect to server(s). Tried with all available
servers.");
}
retry = true;
}
@@ -1007,7 +973,7 @@
{
this.nodeID = nodeID;
}
-
+
public String getNodeID()
{
return nodeID;
@@ -1022,12 +988,12 @@
{
return clusterConnection;
}
-
+
public TransportConfiguration getClusterTransportConfiguration()
{
return clusterTransportConfiguration;
}
-
+
public void setClusterTransportConfiguration(TransportConfiguration tc)
{
this.clusterTransportConfiguration = tc;
@@ -1037,17 +1003,12 @@
{
return backup;
}
-
+
public void setBackup(boolean backup)
{
this.backup = backup;
}
- public void announceBackup()
- {
- connect();
- }
-
@Override
protected void finalize() throws Throwable
{
@@ -1074,6 +1035,10 @@
log.error("Failed to stop discovery group", e);
}
}
+ else
+ {
+ staticConnector.disconnect();
+ }
for (ClientSessionFactory factory : factories)
{
@@ -1214,12 +1179,19 @@
{
this.initialConnectors[count++] = entry.getConnector();
}
-
+
if (ha && clusterConnection && !receivedTopology &&
initialConnectors.length > 0)
{
// FIXME the node is alone in the cluster. We create a connection to the new
node
// to trigger the node notification to form the cluster.
- connect();
+ try
+ {
+ connect();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings
| File Templates.
+ }
}
}
@@ -1253,4 +1225,176 @@
return pairs.get(live);
}
+ class StaticConnector
+ {
+ private List<Connector> connectors;
+
+ public ClientSessionFactory connect() throws HornetQException
+ {
+ if (closed)
+ {
+ throw new IllegalStateException("Cannot create session factory, server
locator is closed (maybe it has been garbage collected)");
+ }
+
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
+ }
+
+ ClientSessionFactory csf = null;
+
+ createConnectors();
+
+ try
+ {
+ List<Future<ClientSessionFactory>> futures =
threadPool.invokeAll(connectors);
+ for (int i = 0, futuresSize = futures.size(); i < futuresSize; i++)
+ {
+ Future<ClientSessionFactory> future = futures.get(i);
+ try
+ {
+ csf = future.get();
+ if(csf != null)
+ break;
+ }
+ catch (Exception e)
+ {
+ log.debug("unable to connect with static connector " +
connectors.get(i).initialConnector);
+ }
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to
connect to any static connectors", e);
+ }
+
+ if (csf == null)
+ {
+ throw new HornetQException(HornetQException.NOT_CONNECTED, "Failed to
connect to any static connectors");
+ }
+ return csf;
+ }
+
+ private synchronized void createConnectors()
+ {
+ connectors = new ArrayList<Connector>();
+ for (TransportConfiguration initialConnector : initialConnectors)
+ {
+ connectors.add(new Connector(initialConnector));
+ }
+ }
+
+
+ public synchronized void disconnect()
+ {
+ if (connectors != null)
+ {
+ for (Connector connector : connectors)
+ {
+ connector.disconnect();
+ }
+ }
+ }
+
+
+ class Connector implements Callable<ClientSessionFactory>
+ {
+ private TransportConfiguration initialConnector;
+ private ClientSessionFactoryInternal factory;
+ private boolean isConnected = false;
+ private boolean interrupted = false;
+ private Exception e;
+
+ public Connector(TransportConfiguration initialConnector)
+ {
+ this.initialConnector = initialConnector;
+ }
+
+ public ClientSessionFactory call() throws HornetQException
+ {
+ factory = getFactory();
+ try
+ {
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
+ }
+ catch (HornetQException e)
+ {
+ if (!interrupted)
+ {
+ this.e = e;
+ throw e;
+ }
+ return null;
+ }
+ isConnected = true;
+ for (Connector connector : connectors)
+ {
+ if (!connector.isConnected())
+ {
+ connector.disconnect();
+ }
+ }
+ return factory;
+ }
+
+ public boolean isConnected()
+ {
+ return isConnected;
+ }
+
+ public void disconnect()
+ {
+ interrupted = true;
+ try
+ {
+ ClientSessionFactoryInternal factory = getFactory();
+ if (factory != null)
+ {
+ factory.causeExit();
+ }
+ else
+ {
+
System.out.println("ServerLocatorImpl$StaticConnector$Connector.disconnect");
+ }
+ }
+ catch (HornetQException e1)
+ {
+ log.debug("exception closing factory");
+ }
+ }
+
+ private synchronized ClientSessionFactoryInternal getFactory() throws
HornetQException
+ {
+ if (factory == null)
+ {
+ try
+ {
+ initialise();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed
to initialise session factory", e);
+ }
+
+ factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
+ initialConnector,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ }
+ return factory;
+ }
+ }
+ }
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -13,6 +13,7 @@
package org.hornetq.core.client.impl;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
@@ -37,7 +38,7 @@
String getNodeID();
- void connect();
+ ClientSessionFactory connect() throws Exception;
void notifyNodeUp(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last, int distance);
@@ -55,7 +56,5 @@
void setBackup(boolean backup);
- void announceBackup();
-
void setInitialConnectAttempts(int reconnectAttempts);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -134,7 +134,10 @@
this.serverLocator.setClusterTransportConfiguration(connector);
this.serverLocator.setBackup(server.getConfiguration().isBackup());
this.serverLocator.setReconnectAttempts(-1);
- this.serverLocator.setRetryInterval(retryInterval);
+ if(retryInterval > 0)
+ {
+ this.serverLocator.setRetryInterval(retryInterval);
+ }
// a cluster connection will connect to other nodes only if they are directly
connected
// through a static list of connectors
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -183,7 +183,7 @@
private void deployBackupListener(BackupConnectorConfiguration
connectorConfiguration)
throws Exception
{
- ServerLocator locator;
+ ServerLocatorInternal locator;
if (connectorConfiguration.getDiscoveryGroupName() != null)
{
DiscoveryGroupConfiguration groupConfiguration =
configuration.getDiscoveryGroupConfigurations().get(connectorConfiguration.getDiscoveryGroupName());
@@ -217,7 +217,7 @@
//todo update the topology
}
});
- backupSessionFactory = locator.createSessionFactory();
+ backupSessionFactory = locator.connect();
backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), true,
configuration.getConnectorConfigurations().get(connectorConfiguration.getConnector())));
}
@@ -370,7 +370,7 @@
}
// backup node becomes live
- public synchronized void activate()
+ public synchronized void activate()
{
if (backup)
{
@@ -437,10 +437,15 @@
}
}
- if (clusterConnections.size() > 0)
+ for (ClusterTopologyListener listener : clientListeners)
{
- announceNode();
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
}
+
+ for (ClusterTopologyListener listener : clusterConnectionListeners)
+ {
+ listener.nodeUP(nodeID, member.getConnector(), false, member.getDistance());
+ }
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -601,9 +601,9 @@
configuration.setBackup(false);
+ initialisePart2();
+
clusterManager.activate();
-
- initialisePart2();
log.info("Backup Server is now live");
Added:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java
(rev 0)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/client/ServerLocatorConnectTest.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * User: andy
+ * Date: Sep 15, 2010
+ * Time: 2:27:07 PM
+ * * @author <a href="mailto:andy.taylor@jboss.com">Andy
Taylor</a>
+ */
+public class ServerLocatorConnectTest extends ServiceTestBase
+{
+ private HornetQServer server;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ Configuration configuration = createDefaultConfig(isNetty());
+ server = createServer(false, configuration);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+ super.tearDown();
+ }
+
+ public void testSingleConnectorSingleServer() throws Exception
+ {
+
+ ServerLocator locator =
HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false,
generateParams(0, isNetty())));
+ ClientSessionFactory csf = locator.createSessionFactory();
+ csf.close();
+ }
+
+ public void testSingleConnectorSingleServerConnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal)
HornetQClient.createServerLocatorWithoutHA(createTransportConfiguration(isNetty(), false,
generateParams(0, isNetty())));
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)
locator.connect();
+ assertNotNull(csf);
+ assertEquals(csf.numConnections(), 1);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerConnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal)
HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(0,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(1,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4, isNetty()))
+ );
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)
locator.connect();
+ assertNotNull(csf);
+ assertEquals(csf.numConnections(), 1);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerConnectReconnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal)
HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(0,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(1,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4, isNetty()))
+ );
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal csf = (ClientSessionFactoryInternal)
locator.connect();
+ assertNotNull(csf);
+ assertEquals(csf.numConnections(), 1);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerNoConnect() throws Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal)
HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(1,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(5, isNetty()))
+ );
+ ClientSessionFactoryInternal csf = null;
+ try
+ {
+ csf = (ClientSessionFactoryInternal) locator.connect();
+ }
+ catch (Exception e)
+ {
+ assertTrue(e instanceof HornetQException);
+ assertEquals(((HornetQException)e).getCode(), HornetQException.NOT_CONNECTED);
+ }
+ assertNull(csf);
+ locator.close();
+ }
+
+ public void testMultipleConnectorSingleServerNoConnectAttemptReconnect() throws
Exception
+ {
+ ServerLocatorInternal locator = (ServerLocatorInternal)
HornetQClient.createServerLocatorWithoutHA(
+ createTransportConfiguration(isNetty(), false, generateParams(1,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(2,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(3,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(4,
isNetty())),
+ createTransportConfiguration(isNetty(), false, generateParams(5, isNetty()))
+ );
+ locator.setReconnectAttempts(-1);
+ ClientSessionFactoryInternal csf = null;
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ Connector target = new Connector(locator, countDownLatch);
+ Thread t = new Thread(target);
+ t.start();
+ //let them get started
+ Thread.sleep(1500);
+ locator.close();
+ assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
+ assertTrue(target.e instanceof HornetQException);
+ assertEquals(((HornetQException)target.e).getCode(),
HornetQException.NOT_CONNECTED);
+ }
+
+ public boolean isNetty()
+ {
+ return false;
+ }
+
+ static class Connector implements Runnable
+ {
+ private ServerLocatorInternal locator;
+ CountDownLatch latch;
+ Exception e;
+ public Connector(ServerLocatorInternal locator, CountDownLatch latch)
+ {
+ this.locator = locator;
+ this.latch = latch;
+ }
+
+ public void run()
+ {
+ try
+ {
+ locator.connect();
+ }
+ catch (Exception e)
+ {
+ this.e = e;
+ }
+ latch.countDown();
+ }
+ }
+}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -34,14 +34,34 @@
{
protected Map<Integer, TestableServer> servers = new HashMap<Integer,
TestableServer>();
+ @Override
+ protected void tearDown() throws Exception
+ {
+ for (TestableServer testableServer : servers.values())
+ {
+ if(testableServer != null)
+ {
+ try
+ {
+ testableServer.destroy();
+ }
+ catch (Throwable e)
+ {
+ //
+ }
+ }
+ }
+ super.tearDown();
+ }
+
public void testMultipleFailovers2LiveServers() throws Exception
{
createLiveConfig(0, 3);
createBackupConfig(0, 1, true, 0, 3);
- createBackupConfig(0, 2,true, 0, 3);
+ createBackupConfig(0, 2, true, 0, 3);
createLiveConfig(3, 0);
- createBackupConfig(3, 4, true,0, 3);
- createBackupConfig(3, 5, true,0, 3);
+ createBackupConfig(3, 4, true, 0, 3, 1, 4);
+ createBackupConfig(3, 5, true, 0, 3, 1, 4);
servers.get(0).start();
servers.get(3).start();
servers.get(1).start();
@@ -60,7 +80,7 @@
servers.get(0).crash(session);
int liveAfter0 = waitForBackup(10000, servers, 1, 2);
-
+
ServerLocator locator2 = getServerLocator(3);
locator2.setBlockOnNonDurableSend(true);
locator2.setBlockOnDurableSend(true);
@@ -74,23 +94,23 @@
if (liveAfter0 == 2)
{
servers.get(1).stop();
- servers.get(2).stop();
+ servers.get(2).stop();
}
else
{
servers.get(2).stop();
- servers.get(1).stop();
+ servers.get(1).stop();
}
-
+
if (liveAfter3 == 4)
{
servers.get(5).stop();
- servers.get(4).stop();
+ servers.get(4).stop();
}
else
{
servers.get(4).stop();
- servers.get(5).stop();
+ servers.get(5).stop();
}
}
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/RemoteProcessHornetQServer.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -65,7 +65,15 @@
}
return false;
}
-
+
+ public void destroy()
+ {
+ if(serverProcess != null)
+ {
+ serverProcess.destroy();
+ }
+ }
+
public void setInitialised(boolean initialised)
{
this.initialised = initialised;
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/SameProcessHornetQServer.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -56,7 +56,12 @@
{
return server.isInitialised();
}
-
+
+ public void destroy()
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void start() throws Exception
{
server.start();
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-09-16
12:55:50 UTC (rev 9695)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/util/TestableServer.java 2010-09-16
13:41:15 UTC (rev 9696)
@@ -31,4 +31,6 @@
public void crash(ClientSession... sessions) throws Exception;
public boolean isInitialised();
+
+ void destroy();
}