Author: ataylor
Date: 2010-11-03 09:02:01 -0400 (Wed, 03 Nov 2010)
New Revision: 9836
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
call listeners before reconnect and test fixes
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-03
08:37:11 UTC (rev 9835)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-11-03
13:02:01 UTC (rev 9836)
@@ -165,7 +165,7 @@
final int reconnectAttempts,
final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
- final List<Interceptor> interceptors) throws
HornetQException
+ final List<Interceptor> interceptors)
{
e.fillInStackTrace();
@@ -613,15 +613,19 @@
connection = null;
}
- callFailureListeners(me, true, connection != null);
if (connection == null)
{
sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+ callFailureListeners(me, true, false);
}
}
// This needs to be outside the failover lock to prevent deadlock
+ if(connection != null)
+ {
+ callFailureListeners(me, true, true);
+ }
if (sessionsToClose != null)
{
// If connection is null it means we didn't succeed in failing over or
reconnecting
@@ -1191,18 +1195,24 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
+ if (msg.getNodeID() != null)
+ {
+ System.out.println("received disconnect from node " +
msg.getNodeID());
+ }
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for
a long time and fail can
// cause reconnect loop
public void run()
{
- conn.fail(new
HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
- "The connection was disconnected
because of server shutdown"));
if (msg.getNodeID() != null)
{
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
+
+ conn.fail(new
HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
+ "The connection was disconnected
because of server shutdown"));
+
}
});
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-11-03
08:37:11 UTC (rev 9835)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2010-11-03
13:02:01 UTC (rev 9836)
@@ -1297,7 +1297,19 @@
connectors = new ArrayList<Connector>();
for (TransportConfiguration initialConnector : initialConnectors)
{
- connectors.add(new Connector(initialConnector));
+ ClientSessionFactoryInternal factory = new
ClientSessionFactoryImpl(ServerLocatorImpl.this,
+ initialConnector,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
+ connectors.add(new Connector(initialConnector, factory));
}
}
@@ -1336,18 +1348,14 @@
private boolean interrupted = false;
private Exception e;
- public Connector(TransportConfiguration initialConnector)
+ public Connector(TransportConfiguration initialConnector,
ClientSessionFactoryInternal factory)
{
this.initialConnector = initialConnector;
+ this.factory = factory;
}
public ClientSessionFactory call() throws HornetQException
{
- factory = getFactory();
- if(factory == null)
- {
- return null;
- }
try
{
factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -1359,11 +1367,11 @@
this.e = e;
throw e;
}
- if(factory != null)
+ /*if(factory != null)
{
factory.close();
factory = null;
- }
+ }*/
return null;
}
isConnected = true;
@@ -1382,7 +1390,7 @@
return isConnected;
}
- public synchronized void disconnect()
+ public void disconnect()
{
interrupted = true;
@@ -1393,39 +1401,6 @@
factory = null;
}
}
-
- private synchronized ClientSessionFactoryInternal getFactory() throws
HornetQException
- {
- if(interrupted)
- {
- return null;
- }
- if (factory == null)
- {
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed
to initialise session factory", e);
- }
-
- factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
- initialConnector,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
- }
- return factory;
- }
}
}
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-03
08:37:11 UTC (rev 9835)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-11-03
13:02:01 UTC (rev 9836)
@@ -238,7 +238,7 @@
{
return;
}
-
+
if (serverLocator != null)
{
serverLocator.removeClusterTopologyListener(this);
@@ -259,10 +259,10 @@
if (serverLocator != null)
{
+ //serverLocator.removeClusterTopologyListener(this);
serverLocator.close();
}
-
if (managementService != null)
{
TypedProperties props = new TypedProperties();
@@ -331,14 +331,13 @@
//Remove the flow record for that node
- MessageFlowRecord record = records.remove(nodeID);
+ MessageFlowRecord record = records.get(nodeID);
if (record != null)
{
try
{
record.reset();
- //record.close();
}
catch (Exception e)
{
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-03
08:37:11 UTC (rev 9835)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java 2010-11-03
13:02:01 UTC (rev 9836)
@@ -59,7 +59,7 @@
private static final Logger log = Logger.getLogger(TopologyClusterTestBase.class);
- private static final long WAIT_TIMEOUT = 30000;
+ private static final long WAIT_TIMEOUT = 5000;
abstract protected ServerLocator createHAServerLocator();
@@ -239,8 +239,6 @@
stopServers(2, 3, 1, 4);
- waitForClusterConnections(0, 0);
-
assertTrue("Was not notified that all servers are DOWN",
downLatch.await(10, SECONDS));
checkContains(new int[] { 0 }, nodeIDs, nodes);
Modified:
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-03
08:37:11 UTC (rev 9835)
+++
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2010-11-03
13:02:01 UTC (rev 9836)
@@ -93,7 +93,9 @@
{
for (ClientSessionFactoryImpl factory : ClientSessionFactoryImpl.factories)
{
+ // System.out.println(threadDump("oops"));
//factory.e.printStackTrace();
+ // System.exit(0);
}
}
super.tearDown(); //To change body of overridden methods use File | Settings |
File Templates.