Author: jmesnil
Date: 2009-12-02 04:45:17 -0500 (Wed, 02 Dec 2009)
New Revision: 8492
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
fixed connection cleanup ordering issue
* on the client side, use an ordered executor to ensure that runnables which
disconnect & destroy the connection are called in the correct order
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-02
09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -177,6 +178,8 @@
private final ScheduledExecutorService scheduledThreadPool;
+ private final Executor closeExecutor;
+
private RemotingConnection connection;
private final long retryInterval;
@@ -271,6 +274,8 @@
this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
+ this.closeExecutor = orderedExecutorFactory.getExecutor();
+
this.interceptors = interceptors;
}
@@ -912,6 +917,7 @@
connector = connectorFactory.createConnector(transportParams,
handler,
this,
+ closeExecutor,
threadPool,
scheduledThreadPool);
@@ -1075,7 +1081,7 @@
if (type == PacketImpl.DISCONNECT)
{
- threadPool.execute(new Runnable()
+ closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for
a long time and fail can
// cause reconnect loop
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java 2009-12-02
09:43:08 UTC (rev 8491)
+++
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -33,6 +33,7 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-02 09:43:08
UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-02 09:45:17
UTC (rev 8492)
@@ -29,6 +29,7 @@
{
Connector createConnector(Map<String, Object> configuration, BufferHandler
handler,
ConnectionLifeCycleListener listener,
+ Executor closeExecutor,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2009-12-02
09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -134,6 +134,8 @@
private final ScheduledExecutorService scheduledThreadPool;
+ private final Executor closeExecutor;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -143,6 +145,7 @@
public NettyConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
@@ -225,6 +228,8 @@
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
configuration);
+ this.closeExecutor = closeExecutor;
+
virtualExecutor = new VirtualExecutorService(threadPool);
this.scheduledThreadPool = scheduledThreadPool;
@@ -607,28 +612,26 @@
if (connections.remove(connectionID) != null)
{
// Execute on different thread to avoid deadlocks
- new Thread()
+ closeExecutor.execute(new Runnable()
{
- @Override
public void run()
{
listener.connectionDestroyed(connectionID);
}
- }.start();
+ });
}
}
public void connectionException(final Object connectionID, final HornetQException
me)
{
// Execute on different thread to avoid deadlocks
- new Thread()
+ closeExecutor.execute(new Runnable()
{
- @Override
public void run()
{
listener.connectionException(connectionID, me);
}
- }.start();
+ });
}
}
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java 2009-12-02
09:43:08 UTC (rev 8491)
+++
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -33,10 +33,11 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyConnector(configuration, handler, listener, threadPool,
scheduledThreadPool);
+ return new NettyConnector(configuration, handler, listener, closeExecutor,
threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-12-02
09:43:08 UTC (rev 8491)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -41,4 +41,15 @@
{
return false;
}
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("\n\n" + i + "\n\n");
+ testStartStopServers();
+ tearDown();
+ setUp();
+ }
+ }
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java 2009-12-02
09:43:08 UTC (rev 8491)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -34,5 +34,16 @@
{
return false;
}
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("\n\n" + i + "\n\n");
+ testStartStopServers();
+ tearDown();
+ setUp();
+ }
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2009-12-02
09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -101,7 +101,7 @@
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -151,7 +151,7 @@
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -204,7 +204,7 @@
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -258,7 +258,7 @@
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -311,7 +311,7 @@
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -360,7 +360,7 @@
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -402,7 +402,7 @@
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new
DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
Modified:
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java 2009-12-02
09:43:08 UTC (rev 8491)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -52,6 +52,7 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor executor, ScheduledExecutorService
scheduledThreadPool)
{
return new MockConnector(configuration, handler, listener);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2009-12-02
09:43:08 UTC (rev 8491)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2009-12-02
09:45:17 UTC (rev 8492)
@@ -69,7 +69,7 @@
}
};
- NettyConnector connector = new NettyConnector(params, handler, listener,
Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+ NettyConnector connector = new NettyConnector(params, handler, listener,
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(5));
connector.start();
assertTrue(connector.isStarted());
@@ -103,7 +103,7 @@
try
{
- new NettyConnector(params, null, listener, Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(5));
+ new NettyConnector(params, null, listener, Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
fail("Should throw Exception");
}
@@ -114,7 +114,7 @@
try
{
- new NettyConnector(params, handler, null, Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(5));
+ new NettyConnector(params, handler, null, Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
fail("Should throw Exception");
}