[hornetq-commits] JBoss hornetq SVN: r8492 - in trunk: src/main/org/hornetq/core/remoting/impl/invm and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 2 04:45:17 EST 2009


Author: jmesnil
Date: 2009-12-02 04:45:17 -0500 (Wed, 02 Dec 2009)
New Revision: 8492

Modified:
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
   trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
   trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
fixed connection cleanup ordering issue

* on the client side, use an ordered executor to ensure that runnables which
  disconnect & destroy the connection are called in the correct order

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -20,6 +20,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
@@ -177,6 +178,8 @@
 
    private final ScheduledExecutorService scheduledThreadPool;
 
+   private final Executor closeExecutor;
+   
    private RemotingConnection connection;
 
    private final long retryInterval;
@@ -271,6 +274,8 @@
 
       this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
 
+      this.closeExecutor = orderedExecutorFactory.getExecutor();
+      
       this.interceptors = interceptors;
    }
 
@@ -912,6 +917,7 @@
                connector = connectorFactory.createConnector(transportParams,
                                                             handler,
                                                             this,
+                                                            closeExecutor,
                                                             threadPool,
                                                             scheduledThreadPool);
 
@@ -1075,7 +1081,7 @@
 
          if (type == PacketImpl.DISCONNECT)
          {
-            threadPool.execute(new Runnable()
+            closeExecutor.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
                // cause reconnect loop

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -33,6 +33,7 @@
    public Connector createConnector(final Map<String, Object> configuration,
                                     final BufferHandler handler,
                                     final ConnectionLifeCycleListener listener,
+                                    final Executor closExecutor,
                                     final Executor threadPool, 
                                     final ScheduledExecutorService scheduledThreadPool)
    {      

Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -29,6 +29,7 @@
 {
    Connector createConnector(Map<String, Object> configuration, BufferHandler handler,                           
                              ConnectionLifeCycleListener listener,
+                             Executor closeExecutor,
                              Executor threadPool, 
                              ScheduledExecutorService scheduledThreadPool);
    

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -134,6 +134,8 @@
 
    private final ScheduledExecutorService scheduledThreadPool;
 
+   private final Executor closeExecutor;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -143,6 +145,7 @@
    public NettyConnector(final Map<String, Object> configuration,
                          final BufferHandler handler,
                          final ConnectionLifeCycleListener listener,
+                         final Executor closeExecutor,
                          final Executor threadPool,
                          final ScheduledExecutorService scheduledThreadPool)
    {
@@ -225,6 +228,8 @@
                                                                 TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
                                                                 configuration);
 
+      this.closeExecutor = closeExecutor;
+      
       virtualExecutor = new VirtualExecutorService(threadPool);
 
       this.scheduledThreadPool = scheduledThreadPool;
@@ -607,28 +612,26 @@
          if (connections.remove(connectionID) != null)
          {
             // Execute on different thread to avoid deadlocks
-            new Thread()
+            closeExecutor.execute(new Runnable()
             {
-               @Override
                public void run()
                {
                   listener.connectionDestroyed(connectionID);
                }
-            }.start();
+            });
          }
       }
 
       public void connectionException(final Object connectionID, final HornetQException me)
       {
          // Execute on different thread to avoid deadlocks
-         new Thread()
+         closeExecutor.execute(new Runnable()
          {
-            @Override
             public void run()
             {
                listener.connectionException(connectionID, me);
             }
-         }.start();
+         });
       }
    }
 

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -33,10 +33,11 @@
    public Connector createConnector(final Map<String, Object> configuration,
                                     final BufferHandler handler,
                                     final ConnectionLifeCycleListener listener,
+                                    final Executor closeExecutor,
                                     final Executor threadPool,
                                     final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyConnector(configuration, handler, listener, threadPool, scheduledThreadPool);
+      return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -41,4 +41,15 @@
    {
       return false;
    }
+   
+   public void _test() throws Exception
+   {
+      for (int i = 0; i < 50; i++)
+      {
+         System.out.println("\n\n" + i + "\n\n");
+         testStartStopServers();
+         tearDown();
+         setUp();
+      }  
+   }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -34,5 +34,16 @@
    {
       return false;
    }
+   
+   public void _test() throws Exception
+   {
+      for (int i = 0; i < 50; i++)
+      {
+         System.out.println("\n\n" + i + "\n\n");
+         testStartStopServers();
+         tearDown();
+         setUp();
+      }  
+   }
 
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -101,7 +101,7 @@
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -151,7 +151,7 @@
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -204,7 +204,7 @@
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -258,7 +258,7 @@
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -311,7 +311,7 @@
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -360,7 +360,7 @@
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -402,7 +402,7 @@
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
-      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+      connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
       connector.start();
       Connection conn = connector.createConnection();
       connCreatedLatch.await(5, TimeUnit.SECONDS);

Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -52,6 +52,7 @@
    public Connector createConnector(final Map<String, Object> configuration,
                                     final BufferHandler handler,
                                     final ConnectionLifeCycleListener listener,
+                                    final Executor closeExecutor,
                                     final Executor executor, ScheduledExecutorService scheduledThreadPool)
    {
       return new MockConnector(configuration, handler, listener);

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2009-12-02 09:45:17 UTC (rev 8492)
@@ -69,7 +69,7 @@
          }
       };
       
-      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+      NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
       
       connector.start();
       assertTrue(connector.isStarted());
@@ -103,7 +103,7 @@
 
       try
       {
-         new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+         new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
          
          fail("Should throw Exception");
       }
@@ -114,7 +114,7 @@
       
       try
       {
-         new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+         new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
          
          fail("Should throw Exception");
       }



More information about the hornetq-commits mailing list