[jboss-cvs] JBoss Messaging SVN: r7178 - in trunk: tests/jms-tests/src/org/jboss/test/messaging/jms and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 3 07:50:33 EDT 2009


Author: jmesnil
Date: 2009-06-03 07:50:33 -0400 (Wed, 03 Jun 2009)
New Revision: 7178

Added:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/VirtualExecutorService.java
Modified:
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionTest.java
Log:
fixed JBMESSAGING-1618: Connection.close should be synchronous

* pasted class VirtualExecutorService from netty to ensure that netty connections will be closed synchronously

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-06-03 09:00:55 UTC (rev 7177)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2009-06-03 11:50:33 UTC (rev 7178)
@@ -137,8 +137,9 @@
 
    private final String servletPath;
    
-   private final Executor threadPool;
+   private final VirtualExecutorService virtualExecutor;
 
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -229,7 +230,7 @@
                                                                      TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
                                                                      configuration);
       
-      this.threadPool = threadPool;
+      virtualExecutor = new VirtualExecutorService(threadPool); 
    }
 
    public synchronized void start()
@@ -241,17 +242,17 @@
       
       if (useNio)
       {    
-         channelFactory = new NioClientSocketChannelFactory(threadPool, threadPool);
+         channelFactory = new NioClientSocketChannelFactory(virtualExecutor, virtualExecutor);
       }
       else
       {
-         channelFactory = new OioClientSocketChannelFactory(threadPool);
+         channelFactory = new OioClientSocketChannelFactory(virtualExecutor);
       }
       // if we are a servlet wrap the socketChannelFactory
       if (useServlet)
       {
          ClientSocketChannelFactory proxyChannelFactory = channelFactory;
-         channelFactory = new HttpTunnelingClientSocketChannelFactory(proxyChannelFactory, threadPool);
+         channelFactory = new HttpTunnelingClientSocketChannelFactory(proxyChannelFactory, virtualExecutor);
       }
       bootstrap = new ClientBootstrap(channelFactory);
 
@@ -320,6 +321,7 @@
 
       bootstrap = null;
       channelGroup.close().awaitUninterruptibly();
+      channelFactory.releaseExternalResources();
       channelFactory = null;
 
       for (Connection connection : connections.values())

Added: trunk/src/main/org/jboss/messaging/integration/transports/netty/VirtualExecutorService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/VirtualExecutorService.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/VirtualExecutorService.java	2009-06-03 11:50:33 UTC (rev 7178)
@@ -0,0 +1,196 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * by the @author tags. See the COPYRIGHT.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.integration.transports.netty;
+
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.netty.util.internal.MapBackedSet;
+
+/**
+ * A delegating {@link ExecutorService} with its own termination management.
+ * <p>
+ * {@link VirtualExecutorService} is used when you want to inject an
+ * {@link ExecutorService} but you do not want to allow the explicit termination
+ * of threads on shutdown request.  It is particularly useful when the
+ * {@link ExecutorService} to inject is shared by different components and
+ * the life cycle of the components depend on the termination of the injected
+ * {@link ExecutorService}.
+ *
+ * <pre>
+ * ExecutorService globalExecutor = ...;
+ * ExecutorService virtualExecutor = new VirtualExecutorService(globalExecutor);
+ *
+ * ChannelFactory factory =
+ *         new NioServerSocketChannelFactory(virtualExecutor, virtualExecutor);
+ * ...
+ *
+ * // ChannelFactory.releaseExternalResources() shuts down the executor and
+ * // interrupts the I/O threads to terminate all I/O tasks and to release all
+ * // resources acquired by ChannelFactory.
+ * factory.releaseExternalResources();
+ *
+ * // Note that globalExecutor is not shut down because VirtualExecutorService
+ * // implements its own termination management. All threads which were acquired
+ * // by ChannelFactory via VirtualExecutorService are returned to the pool.
+ * assert !globalExecutor.isShutdown();
+ * </pre>
+ *
+ * <h3>The differences from an ordinary {@link ExecutorService}</h3>
+ *
+ * A shutdown request ({@link #shutdown()} or {@link #shutdownNow()}) does not
+ * shut down its parent {@link Executor} but simply sets its internal flag to
+ * reject further execution request.
+ * <p>
+ * {@link #shutdownNow()} interrupts only the thread which is executing the
+ * task executed via {@link VirtualExecutorService}.
+ * <p>
+ * {@link #awaitTermination(long, TimeUnit)} does not wait for real thread
+ * termination but wait until {@link VirtualExecutorService} is shut down and
+ * its active tasks are finished and the threads are returned to the parent
+ * {@link Executor}.
+ *
+ * @author The Netty Project (netty-dev at lists.jboss.org)
+ * @author Trustin Lee (tlee at redhat.com)
+ * @version $Rev: 1315 $, $Date: 2009-06-03 03:48:23 -0400 (Wed, 03 Jun 2009) $
+ */
+public class VirtualExecutorService extends AbstractExecutorService {
+
+    private final Executor e;
+    private final ExecutorService s;
+    final Object startStopLock = new Object();
+    volatile boolean shutdown;
+    Set<Thread> activeThreads = new MapBackedSet<Thread>(new IdentityHashMap<Thread, Boolean>());
+
+    /**
+     * Creates a new instance with the specified parent {@link Executor}.
+     */
+    public VirtualExecutorService(Executor parent) {
+        if (parent == null) {
+            throw new NullPointerException("parent");
+        }
+
+        if (parent instanceof ExecutorService) {
+            e = null;
+            s = (ExecutorService) parent;
+        } else {
+            e = parent;
+            s = null;
+        }
+    }
+
+    public boolean isShutdown() {
+        synchronized (startStopLock) {
+            return shutdown;
+        }
+    }
+
+    public boolean isTerminated() {
+        synchronized (startStopLock) {
+            return shutdown && activeThreads.isEmpty();
+        }
+    }
+
+    public void shutdown() {
+        synchronized (startStopLock) {
+            if (shutdown) {
+                return;
+            }
+            shutdown = true;
+        }
+    }
+
+    public List<Runnable> shutdownNow() {
+        synchronized (startStopLock) {
+            if (!isTerminated()) {
+                shutdown();
+                for (Thread t: activeThreads) {
+                    t.interrupt();
+                }
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+    public boolean awaitTermination(long timeout, TimeUnit unit)
+            throws InterruptedException {
+        synchronized (startStopLock) {
+            while (!isTerminated()) {
+                startStopLock.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+            }
+
+            return isTerminated();
+        }
+    }
+
+    public void execute(Runnable command) {
+        if (command == null) {
+            throw new NullPointerException("command");
+        }
+
+        if (shutdown) {
+            throw new RejectedExecutionException();
+        }
+
+        if (s != null) {
+            s.execute(new ChildExecutorRunnable(command));
+        } else {
+            e.execute(new ChildExecutorRunnable(command));
+        }
+    }
+
+    private class ChildExecutorRunnable implements Runnable {
+
+        private final Runnable runnable;
+
+        ChildExecutorRunnable(Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        public void run() {
+            Thread thread = Thread.currentThread();
+            synchronized (startStopLock) {
+                activeThreads.add(thread);
+            }
+            try {
+                runnable.run();
+            } finally {
+                synchronized (startStopLock) {
+                    boolean removed = activeThreads.remove(thread);
+                    assert removed;
+                    if (isTerminated()) {
+                        startStopLock.notifyAll();
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionTest.java	2009-06-03 09:00:55 UTC (rev 7177)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/ConnectionTest.java	2009-06-03 11:50:33 UTC (rev 7178)
@@ -268,28 +268,22 @@
 
    }
    
-   /*
-    * See http://jira.jboss.com/jira/browse/JBMESSAGING-635
-    * 
-    * This needs to be run remotely to see the exception
-    */
-   
-   // This test is currently commented out until we fix netty issue https://jira.jboss.org/jira/browse/JBMESSAGING-1618   
-//   public void testConnectionListenerBug() throws Exception
-//   {
-//      for (int i = 0; i < 10000; i++)
-//      {
-//         //log.info("******************************************** it " + i);
-//         
-//         Connection conn = cf.createConnection();
-//         
-//         MyExceptionListener listener = new MyExceptionListener();
-//         
-//         conn.setExceptionListener(listener);
-//         
-//         conn.close();                 
-//      } 
-//   }
+   // This test is to check netty issue in https://jira.jboss.org/jira/browse/JBMESSAGING-1618   
+   public void testConnectionListenerBug() throws Exception
+   {
+      for (int i = 0; i < 10000; i++)
+      {
+         //log.info("******************************************** it " + i);
+         
+         Connection conn = cf.createConnection();
+         
+         MyExceptionListener listener = new MyExceptionListener();
+         
+         conn.setExceptionListener(listener);
+         
+         conn.close();                 
+      } 
+   }
 
    /**
     * This test is similar to a JORAM Test...




More information about the jboss-cvs-commits mailing list