Author: david.lloyd(a)jboss.com
Date: 2008-12-16 18:06:36 -0500 (Tue, 16 Dec 2008)
New Revision: 4755
Modified:
remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
Log:
Fix executor usage
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java 2008-11-30
04:19:39 UTC (rev 4754)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java 2008-12-16
23:06:36 UTC (rev 4755)
@@ -3,16 +3,17 @@
import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
-import java.util.Collection;
-import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executors;
import org.jboss.remoting.core.EndpointImpl;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.RequestHandlerSource;
import org.jboss.remoting.spi.Handle;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.CloseableExecutor;
+import org.jboss.xnio.log.Logger;
/**
* The standalone interface into Remoting. This class contains static methods that are
useful to standalone programs
@@ -20,6 +21,8 @@
*/
public final class Remoting {
+ private static final Logger log = Logger.getLogger("org.jboss.remoting");
+
/**
* Create an endpoint. The endpoint will create its own thread pool with a maximum
of 10 threads.
*
@@ -38,22 +41,42 @@
* @return the endpoint
*/
public static Endpoint createEndpoint(final String name, final int maxThreads) {
- final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, maxThreads,
Long.MAX_VALUE, TimeUnit.NANOSECONDS, new AlwaysBlockingQueue<Runnable>(new
SynchronousQueue<Runnable>()), new ThreadPoolExecutor.AbortPolicy());
+ final CloseableExecutor executor = createExecutor(maxThreads);
final EndpointImpl endpoint = new EndpointImpl(executor, name);
endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
public void handleClose(final Endpoint closed) {
- executor.shutdown();
- try {
- executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ IoUtils.safeClose(executor);
}
});
return endpoint;
}
+ private static final ThreadFactory OUR_THREAD_FACTORY = new ThreadFactory() {
+ private final ThreadFactory defaultThreadFactory =
Executors.defaultThreadFactory();
+
+ public Thread newThread(final Runnable r) {
+ final Thread thread = defaultThreadFactory.newThread(r);
+ thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(final Thread t, final Throwable e) {
+ log.error(e, "Uncaught exception in thread %s", t);
+ }
+ });
+ return thread;
+ }
+ };
+
/**
+ * Create a simple thread pool that is compatible with Remoting. The thread pool
will have a maximum of {@code maxThreads}
+ * threads.
+ *
+ * @param maxThreads the maximum thread count
+ * @return a closeable executor
+ */
+ public static CloseableExecutor createExecutor(final int maxThreads) {
+ return IoUtils.closeableExecutor(new ThreadPoolExecutor(1, maxThreads, 30L,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50), OUR_THREAD_FACTORY, new
ThreadPoolExecutor.CallerRunsPolicy()), 30L, TimeUnit.SECONDS);
+ }
+
+ /**
* Create an endpoint using the given {@code Executor} to execute tasks.
*
* @param executor the executor to use
@@ -105,130 +128,5 @@
}
}
- private static class AlwaysBlockingQueue<E> implements BlockingQueue<E>
{
- private final BlockingQueue<E> delegate;
-
- public AlwaysBlockingQueue(final BlockingQueue<E> delegate) {
- this.delegate = delegate;
- }
-
- public boolean offer(final E o) {
- try {
- delegate.put(o);
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- }
- }
-
- public boolean offer(final E o, final long timeout, final TimeUnit unit) throws
InterruptedException {
- return delegate.offer(o, timeout, unit);
- }
-
- public E poll(final long timeout, final TimeUnit unit) throws
InterruptedException {
- return delegate.poll(timeout, unit);
- }
-
- public E take() throws InterruptedException {
- return delegate.take();
- }
-
- public void put(final E o) throws InterruptedException {
- delegate.put(o);
- }
-
- public int remainingCapacity() {
- return delegate.remainingCapacity();
- }
-
- public boolean add(final E o) {
- return delegate.add(o);
- }
-
- public int drainTo(final Collection<? super E> c) {
- return delegate.drainTo(c);
- }
-
- public int drainTo(final Collection<? super E> c, final int maxElements) {
- return delegate.drainTo(c, maxElements);
- }
-
- public E poll() {
- return delegate.poll();
- }
-
- public E remove() {
- return delegate.remove();
- }
-
- public E peek() {
- return delegate.peek();
- }
-
- public E element() {
- return delegate.element();
- }
-
- public int size() {
- return delegate.size();
- }
-
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- public boolean contains(final Object o) {
- return delegate.contains(o);
- }
-
- public Iterator<E> iterator() {
- return delegate.iterator();
- }
-
- public Object[] toArray() {
- return delegate.toArray();
- }
-
- public <T> T[] toArray(final T[] a) {
- //noinspection SuspiciousToArrayCall
- return delegate.toArray(a);
- }
-
- public boolean remove(final Object o) {
- return delegate.remove(o);
- }
-
- public boolean containsAll(final Collection<?> c) {
- return delegate.containsAll(c);
- }
-
- public boolean addAll(final Collection<? extends E> c) {
- return delegate.addAll(c);
- }
-
- public boolean removeAll(final Collection<?> c) {
- return delegate.removeAll(c);
- }
-
- public boolean retainAll(final Collection<?> c) {
- return delegate.retainAll(c);
- }
-
- public void clear() {
- delegate.clear();
- }
-
- public boolean equals(final Object o) {
- return delegate.equals(o);
- }
-
- public int hashCode() {
- return delegate.hashCode();
- }
- }
-
- // privates
-
private Remoting() { /* empty */ }
}