[jboss-remoting-commits] JBoss Remoting SVN: r4755 - remoting3/trunk/standalone/src/main/java/org/jboss/remoting.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Dec 16 18:06:36 EST 2008


Author: david.lloyd at jboss.com
Date: 2008-12-16 18:06:36 -0500 (Tue, 16 Dec 2008)
New Revision: 4755

Modified:
   remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
Log:
Fix executor usage

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java	2008-11-30 04:19:39 UTC (rev 4754)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java	2008-12-16 23:06:36 UTC (rev 4755)
@@ -3,16 +3,17 @@
 import java.io.IOException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
 import org.jboss.remoting.core.EndpointImpl;
 import org.jboss.remoting.spi.RequestHandler;
 import org.jboss.remoting.spi.RequestHandlerSource;
 import org.jboss.remoting.spi.Handle;
 import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.log.Logger;
 
 /**
  * The standalone interface into Remoting.  This class contains static methods that are useful to standalone programs
@@ -20,6 +21,8 @@
  */
 public final class Remoting {
 
+    private static final Logger log = Logger.getLogger("org.jboss.remoting");
+
     /**
      * Create an endpoint.  The endpoint will create its own thread pool with a maximum of 10 threads.
      *
@@ -38,22 +41,42 @@
      * @return the endpoint
      */
     public static Endpoint createEndpoint(final String name, final int maxThreads) {
-        final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, maxThreads, Long.MAX_VALUE, TimeUnit.NANOSECONDS, new AlwaysBlockingQueue<Runnable>(new SynchronousQueue<Runnable>()), new ThreadPoolExecutor.AbortPolicy());
+        final CloseableExecutor executor = createExecutor(maxThreads);
         final EndpointImpl endpoint = new EndpointImpl(executor, name);
         endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
             public void handleClose(final Endpoint closed) {
-                executor.shutdown();
-                try {
-                    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
+                IoUtils.safeClose(executor);
             }
         });
         return endpoint;
     }
 
+    private static final ThreadFactory OUR_THREAD_FACTORY = new ThreadFactory() {
+        private final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory();
+
+        public Thread newThread(final Runnable r) {
+            final Thread thread = defaultThreadFactory.newThread(r);
+            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                public void uncaughtException(final Thread t, final Throwable e) {
+                    log.error(e, "Uncaught exception in thread %s", t);
+                }
+            });
+            return thread;
+        }
+    };
+
     /**
+     * Create a simple thread pool that is compatible with Remoting.  The thread pool will have a maximum of {@code maxThreads}
+     * threads.
+     *
+     * @param maxThreads the maximum thread count
+     * @return a closeable executor
+     */
+    public static CloseableExecutor createExecutor(final int maxThreads) {
+        return IoUtils.closeableExecutor(new ThreadPoolExecutor(1, maxThreads, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50), OUR_THREAD_FACTORY, new ThreadPoolExecutor.CallerRunsPolicy()), 30L, TimeUnit.SECONDS);
+    }
+
+    /**
      * Create an endpoint using the given {@code Executor} to execute tasks.
      *
      * @param executor the executor to use
@@ -105,130 +128,5 @@
         }
     }
 
-    private static class AlwaysBlockingQueue<E> implements BlockingQueue<E> {
-        private final BlockingQueue<E> delegate;
-
-        public AlwaysBlockingQueue(final BlockingQueue<E> delegate) {
-            this.delegate = delegate;
-        }
-
-        public boolean offer(final E o) {
-            try {
-                delegate.put(o);
-                return true;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                return false;
-            }
-        }
-
-        public boolean offer(final E o, final long timeout, final TimeUnit unit) throws InterruptedException {
-            return delegate.offer(o, timeout, unit);
-        }
-
-        public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
-            return delegate.poll(timeout, unit);
-        }
-
-        public E take() throws InterruptedException {
-            return delegate.take();
-        }
-
-        public void put(final E o) throws InterruptedException {
-            delegate.put(o);
-        }
-
-        public int remainingCapacity() {
-            return delegate.remainingCapacity();
-        }
-
-        public boolean add(final E o) {
-            return delegate.add(o);
-        }
-
-        public int drainTo(final Collection<? super E> c) {
-            return delegate.drainTo(c);
-        }
-
-        public int drainTo(final Collection<? super E> c, final int maxElements) {
-            return delegate.drainTo(c, maxElements);
-        }
-
-        public E poll() {
-            return delegate.poll();
-        }
-
-        public E remove() {
-            return delegate.remove();
-        }
-
-        public E peek() {
-            return delegate.peek();
-        }
-
-        public E element() {
-            return delegate.element();
-        }
-
-        public int size() {
-            return delegate.size();
-        }
-
-        public boolean isEmpty() {
-            return delegate.isEmpty();
-        }
-
-        public boolean contains(final Object o) {
-            return delegate.contains(o);
-        }
-
-        public Iterator<E> iterator() {
-            return delegate.iterator();
-        }
-
-        public Object[] toArray() {
-            return delegate.toArray();
-        }
-
-        public <T> T[] toArray(final T[] a) {
-            //noinspection SuspiciousToArrayCall
-            return delegate.toArray(a);
-        }
-
-        public boolean remove(final Object o) {
-            return delegate.remove(o);
-        }
-
-        public boolean containsAll(final Collection<?> c) {
-            return delegate.containsAll(c);
-        }
-
-        public boolean addAll(final Collection<? extends E> c) {
-            return delegate.addAll(c);
-        }
-
-        public boolean removeAll(final Collection<?> c) {
-            return delegate.removeAll(c);
-        }
-
-        public boolean retainAll(final Collection<?> c) {
-            return delegate.retainAll(c);
-        }
-
-        public void clear() {
-            delegate.clear();
-        }
-
-        public boolean equals(final Object o) {
-            return delegate.equals(o);
-        }
-
-        public int hashCode() {
-            return delegate.hashCode();
-        }
-    }
-
-    // privates
-
     private Remoting() { /* empty */ }
 }




More information about the jboss-remoting-commits mailing list