[jboss-cvs] JBossAS SVN: r99253 - in projects/jboss-threads/trunk/main: src/main/java/org/jboss/threads and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 11 18:14:53 EST 2010
Author: david.lloyd at jboss.com
Date: 2010-01-11 18:14:52 -0500 (Mon, 11 Jan 2010)
New Revision: 99253
Added:
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java
Removed:
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java
Modified:
projects/jboss-threads/trunk/main/
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java
Log:
Revisions for management, proper thread pool types
Property changes on: projects/jboss-threads/trunk/main
___________________________________________________________________
Name: svn:ignore
+ target
Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossExecutors.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -24,6 +24,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
@@ -45,15 +46,8 @@
private JBossExecutors() {}
private static final RuntimePermission MODIFY_THREAD_PERMISSION = new RuntimePermission("modifyThread");
- private static final RuntimePermission CREATE_PRIVILEGED_THREAD_PERMISSION = new RuntimePermission("createPrivilegedThreads");
private static final RuntimePermission COPY_CONTEXT_CLASSLOADER_PERMISSION = new RuntimePermission("copyClassLoader");
- private static final AccessControlContext PRIVILEGED_CONTEXT = AccessController.doPrivileged(new PrivilegedAction<AccessControlContext>() {
- public AccessControlContext run() {
- return AccessController.getContext();
- }
- });
-
private static final DirectExecutorService DIRECT_EXECUTOR_SERVICE = new DelegatingDirectExecutorService(SimpleDirectExecutor.INSTANCE);
private static final DirectExecutorService REJECTING_EXECUTOR_SERVICE = new DelegatingDirectExecutorService(RejectingExecutor.INSTANCE);
private static final DirectExecutorService DISCARDING_EXECUTOR_SERVICE = new DelegatingDirectExecutorService(DiscardingExecutor.INSTANCE);
@@ -174,18 +168,6 @@
}
/**
- * Create an executor which executes tasks at the privilege level of this library.
- * TODO - is this the best approach?
- *
- * @param delegate the executor to delegate to at the privileged level
- * @return the new direct executor
- */
- static DirectExecutor highPrivilegeExecutor(final DirectExecutor delegate) {
- checkAccess(CREATE_PRIVILEGED_THREAD_PERMISSION);
- return privilegedExecutor(delegate, PRIVILEGED_CONTEXT);
- }
-
- /**
* Create a direct executor which runs tasks with the given context class loader.
*
* @param delegate the executor to delegate to
@@ -266,7 +248,7 @@
}
/**
- * Create an executor which runs the given initization task before running its given task.
+ * Create an executor which runs the given initialization task before running its given task.
*
* @param delegate the delegate direct executor
* @param initializer the initialization task
@@ -331,9 +313,35 @@
* @return the executor
*/
public static Executor threadFactoryExecutor(final ThreadFactory factory) {
- return new ThreadFactoryExecutor(factory);
+ return new ThreadFactoryExecutor("unnamed", factory, Integer.MAX_VALUE, false);
}
+ /**
+ * Create an executor that executes each task in a new thread. By default up to the given number of threads may run
+ * concurrently, after which new tasks will be rejected.
+ *
+ * @param factory the thread factory to use
+ * @param maxThreads the maximum number of allowed threads
+ * @return the executor
+ */
+ public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads) {
+ return new ThreadFactoryExecutor("unnamed", factory, maxThreads, false);
+ }
+
+ /**
+ * Create an executor that executes each task in a new thread. By default up to the given number of threads may run
+ * concurrently, after which the caller will block or new tasks will be rejected, according to the setting of the
+ * {@code blocking} parameter.
+ *
+ * @param factory the thread factory to use
+ * @param maxThreads the maximum number of allowed threads
+ * @param blocking {@code true} if the submitter should block when the maximum number of threads has been reached
+ * @return the executor
+ */
+ public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking) {
+ return new ThreadFactoryExecutor("unnamed", factory, maxThreads, blocking);
+ }
+
// ==================================================
// REJECTED EXECUTION HANDLERS
// ==================================================
@@ -639,6 +647,16 @@
return new LoggingUncaughtExceptionHandler(log);
}
+ /**
+ * Get an uncaught exception handler which logs to the given logger.
+ *
+ * @param categoryName the name of the logger category to log to
+ * @return the handler
+ */
+ public static Thread.UncaughtExceptionHandler loggingExceptionHandler(final String categoryName) {
+ return new LoggingUncaughtExceptionHandler(Logger.getLogger(categoryName));
+ }
+
private static final Thread.UncaughtExceptionHandler LOGGING_HANDLER = loggingExceptionHandler(THREAD_ERROR_LOGGER);
/**
@@ -749,6 +767,45 @@
}
/**
+ * Execute a task uninterruptibly.
+ *
+ * @param executor the executor to delegate to
+ * @param task the task to execute
+ * @throws RejectedExecutionException if the task was rejected by the executor
+ */
+ public static void executeUninterruptibly(Executor executor, Runnable task) throws RejectedExecutionException {
+ boolean intr = Thread.interrupted();
+ try {
+ for (;;) {
+ try {
+ executor.execute(task);
+ return;
+ } catch (ExecutionInterruptedException e) {
+ intr |= Thread.interrupted();
+ }
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Get an executor which executes tasks uninterruptibly in the event of blocking.
+ *
+ * @param delegate the delegate executor
+ * @return the uninterruptible executor
+ */
+ public static Executor uninterruptibleExecutor(final Executor delegate) {
+ if (delegate instanceof UninterruptibleExecutor) {
+ return delegate;
+ } else {
+ return new UninterruptibleExecutor(delegate);
+ }
+ }
+
+ /**
* Create a builder for a dependent task.
*
* @param executor the executor to use
Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThread.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -28,7 +28,8 @@
* A JBoss thread. Supports logging and extra operations.
*/
public final class JBossThread extends Thread {
- private static final Logger log = Logger.getLogger(JBossThread.class);
+ private static final Logger log = Logger.getLogger("org.jboss.threads");
+ private static final Logger ihlog = Logger.getLogger("org.jboss.threads.interrupt-handler");
private volatile InterruptHandler interruptHandler;
private ThreadNameInfo threadNameInfo;
@@ -97,7 +98,7 @@
* handler is called from the <em>calling</em> thread, not the thread being interrupted.
*/
public void interrupt() {
- log.tracef("Interrupting thread \"%s\"", this);
+ ihlog.tracef("Interrupting thread \"%s\"", this);
try {
super.interrupt();
} finally {
@@ -106,7 +107,7 @@
try {
interruptHandler.handleInterrupt(this);
} catch (Throwable t) {
- log.errorf(t, "Interrupt handler %s threw an exception", interruptHandler);
+ ihlog.errorf(t, "Interrupt handler %s threw an exception", interruptHandler);
}
}
}
Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,8 +1,8 @@
/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
@@ -28,7 +28,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.atomic.AtomicInteger;
-import java.lang.reflect.Method;
+import org.jboss.threads.management.ThreadPoolExecutorMBean;
/**
*
@@ -62,67 +62,26 @@
public void stop() {
shutdown();
- try {
- awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- // todo log if fails?
- } catch (InterruptedException e) {
- // todo log it?
- Thread.currentThread().interrupt();
- }
}
- public void destroy() {
- // todo is this the right behavior?
- shutdownNow();
- try {
- awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- // todo log if fails?
- } catch (InterruptedException e) {
- // todo log it?
- Thread.currentThread().interrupt();
- }
+ public void execute(final Runnable command) {
+ super.execute(command);
}
- private static final Method GET_ALLOW_CORE_THREAD_TIMEOUT;
- private static final Method SET_ALLOW_CORE_THREAD_TIMEOUT;
-
- static {
- Method method = null;
- try {
- method = ThreadPoolExecutor.class.getMethod("allowsCoreThreadTimeOut");
- } catch (NoSuchMethodException e) {
- }
- GET_ALLOW_CORE_THREAD_TIMEOUT = method;
- try {
- method = ThreadPoolExecutor.class.getMethod("allowCoreThreadTimeOut", boolean.class);
- } catch (NoSuchMethodException e) {
- }
- SET_ALLOW_CORE_THREAD_TIMEOUT = method;
- }
-
public String getName() {
return name;
}
+ public int getLargestThreadCount() {
+ return super.getLargestPoolSize();
+ }
+
public boolean isAllowCoreThreadTimeout() {
- final Method method = GET_ALLOW_CORE_THREAD_TIMEOUT;
- try {
- return method != null ? ((Boolean) method.invoke(this)).booleanValue() : false;
- } catch (Exception e) {
- return false;
- }
+ return allowsCoreThreadTimeOut();
}
public void setAllowCoreThreadTimeout(final boolean allow) {
- final Method method = SET_ALLOW_CORE_THREAD_TIMEOUT;
- try {
- if (method != null) {
- method.invoke(this, Boolean.valueOf(allow));
- return;
- }
- } catch (Exception e) {
- }
- throw new UnsupportedOperationException();
+ setAllowCoreThreadTimeout(allow);
}
public int getMaxPoolSize() {
@@ -141,7 +100,7 @@
setKeepAliveTime(milliseconds, TimeUnit.MILLISECONDS);
}
- public int getCurrentPoolSize() {
+ public int getCurrentThreadCount() {
return getPoolSize();
}
@@ -157,6 +116,14 @@
super.setRejectedExecutionHandler(new CountingRejectHandler(handler));
}
+ public boolean isBlocking() {
+ return false;
+ }
+
+ public void setBlocking(final boolean blocking) {
+ throw new UnsupportedOperationException();
+ }
+
private final class CountingRejectHandler implements RejectedExecutionHandler {
private final RejectedExecutionHandler delegate;
@@ -170,6 +137,9 @@
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
rejectCount.incrementAndGet();
+ if (isShutdown()) {
+ throw new StoppedExecutorException();
+ }
delegate.rejectedExecution(r, executor);
}
}
Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/OrderedExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -22,10 +22,10 @@
package org.jboss.threads;
+import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
@@ -47,7 +47,7 @@
// @protectedby lock
private boolean running;
// @protectedby lock
- private RejectionPolicy policy;
+ private boolean blocking;
// @protectedby lock
private Executor handoffExecutor;
@@ -58,7 +58,7 @@
* @param parent the parent to delegate tasks to
*/
public OrderedExecutor(final Executor parent) {
- this(parent, RejectionPolicy.BLOCK, null);
+ this(parent, new ArrayDeque<Runnable>());
}
/**
@@ -68,18 +68,28 @@
* @param queue the queue to use to hold tasks
*/
public OrderedExecutor(final Executor parent, final Queue<Runnable> queue) {
- this(parent, queue, RejectionPolicy.BLOCK, null);
+ this(parent, queue, true, null);
}
/**
- * Construct a new instance using an unbounded FIFO queue.
+ * Construct a new instance using a bounded FIFO queue of the given size and a blocking reject policy.
*
+ * @param parent the parent to delegate tasks to
+ * @param queueLength the fixed length of the queue to use to hold tasks
+ */
+ public OrderedExecutor(final Executor parent, final int queueLength) {
+ this(parent, new ArrayQueue<Runnable>(queueLength), true, null);
+ }
+
+ /**
+ * Construct a new instance using a bounded FIFO queue of the given size and a handoff reject policy
+ *
* @param parent the parent executor
- * @param policy the task rejection policy
+ * @param queueLength the fixed length of the queue to use to hold tasks
* @param handoffExecutor the executor to hand tasks to if the queue is full
*/
- public OrderedExecutor(final Executor parent, final RejectionPolicy policy, final Executor handoffExecutor) {
- this(parent, new LinkedList<Runnable>(), policy, handoffExecutor);
+ public OrderedExecutor(final Executor parent, final int queueLength, final Executor handoffExecutor) {
+ this(parent, new ArrayQueue<Runnable>(queueLength), false, handoffExecutor);
}
/**
@@ -87,25 +97,19 @@
*
* @param parent the parent executor
* @param queue the task queue to use
- * @param policy the task rejection policy
+ * @param blocking {@code true} if rejected tasks should block, {@code false} if rejected tasks should be handed off
* @param handoffExecutor the executor to hand tasks to if the queue is full
*/
- public OrderedExecutor(final Executor parent, final Queue<Runnable> queue, final RejectionPolicy policy, final Executor handoffExecutor) {
+ public OrderedExecutor(final Executor parent, final Queue<Runnable> queue, final boolean blocking, final Executor handoffExecutor) {
if (parent == null) {
throw new NullPointerException("parent is null");
}
if (queue == null) {
throw new NullPointerException("queue is null");
}
- if (policy == null) {
- throw new NullPointerException("policy is null");
- }
- if (policy == RejectionPolicy.HANDOFF && handoffExecutor == null) {
- throw new NullPointerException("handoffExecutor is null");
- }
this.queue = queue;
this.parent = parent;
- this.policy = policy;
+ this.blocking = blocking;
this.handoffExecutor = handoffExecutor;
}
@@ -115,26 +119,21 @@
* @param command the task to run.
*/
public void execute(Runnable command) {
- try {
- lock.lockInterruptibly();
+ final Executor executor;
+ OUT: for (;;) {
+ lock.lock();
try {
while (! queue.offer(command)) {
- switch (policy) {
- case ABORT:
- throw new RejectedExecutionException();
- case BLOCK:
+ if (blocking) {
+ try {
removeCondition.await();
- break;
- case DISCARD:
- return;
- case DISCARD_OLDEST:
- if (queue.poll() != null) {
- queue.add(command);
- }
- break;
- case HANDOFF:
- handoffExecutor.execute(command);
- return;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ExecutionInterruptedException();
+ }
+ } else {
+ executor = handoffExecutor;
+ break OUT;
}
}
if (! running) {
@@ -149,12 +148,14 @@
}
}
}
+ return;
} finally {
lock.unlock();
}
- } catch (InterruptedException e) {
- throw new RejectedExecutionException();
}
+ if (executor != null) {
+ executor.execute(command);
+ }
}
private class Runner implements Runnable {
Copied: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java (from rev 92149, projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java)
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueueExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,681 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.Queue;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import org.jboss.logging.Logger;
+import org.jboss.threads.management.BoundedQueueThreadPoolExecutorMBean;
+
+/**
+ * An executor which uses a regular queue to hold tasks. The executor may be tuned at runtime in many ways.
+ */
+public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BoundedQueueThreadPoolExecutorMBean {
+ private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
+
+ private final String name;
+ private final Lock lock = new ReentrantLock();
+ // signal when a task is written to the queue
+ private final Condition enqueueCondition = lock.newCondition();
+ // signal when the queue is read
+ private final Condition removeCondition = lock.newCondition();
+ // signalled when threads terminate
+ private final Condition threadExitCondition = lock.newCondition();
+ private final ThreadFactory threadFactory;
+
+ // all protected by poolLock...
+ private int corePoolSize;
+ private int maxPoolSize;
+ private int largestPoolSize;
+ private int rejectCount;
+ private boolean allowCoreThreadTimeout;
+ private long keepAliveTime;
+ private TimeUnit keepAliveTimeUnit;
+ private boolean blocking;
+ private Executor handoffExecutor;
+
+ private int threadCount;
+ private Set<Thread> workers = new HashSet<Thread>();
+
+ private boolean stop;
+ private boolean interrupt;
+
+ private Queue<Runnable> queue;
+
+ /**
+ * Create a new instance.
+ *
+ * @param name the name of the executor
+ * @param corePoolSize the number of threads to create before enqueueing tasks
+ * @param maxPoolSize the maximum number of threads to create
+ * @param keepAliveTime the amount of time that an idle thread should remain active
+ * @param keepAliveTimeUnit the unit of time for {@code keepAliveTime}
+ * @param queue the queue to use for tasks
+ * @param threadFactory the thread factory to use for new threads
+ * @param blocking {@code true} if the executor should block when the queue is full and no threads are available, {@code false} to use the handoff executor
+ * @param handoffExecutor the executor which is called when blocking is disabled and a task cannot be accepted, or {@code null} to reject the task
+ */
+ public QueueExecutor(final String name, final int corePoolSize, final int maxPoolSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit, final Queue<Runnable> queue, final ThreadFactory threadFactory, final boolean blocking, final Executor handoffExecutor) {
+ this.name = name;
+ if (threadFactory == null) {
+ throw new NullPointerException("threadFactory is null");
+ }
+ if (queue == null) {
+ throw new NullPointerException("queue is null");
+ }
+ if (keepAliveTimeUnit == null) {
+ throw new NullPointerException("keepAliveTimeUnit is null");
+ }
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.threadFactory = threadFactory;
+ // configurable...
+ this.keepAliveTime = keepAliveTime;
+ this.keepAliveTimeUnit = keepAliveTimeUnit;
+ this.corePoolSize = corePoolSize;
+ this.maxPoolSize = maxPoolSize;
+ this.queue = queue;
+ this.blocking = blocking;
+ this.handoffExecutor = handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Create a new instance.
+ *
+ * @param name the name of the executor
+ * @param corePoolSize the number of threads to create before enqueueing tasks
+ * @param maxPoolSize the maximum number of threads to create
+ * @param keepAliveTime the amount of time that an idle thread should remain active
+ * @param keepAliveTimeUnit the unit of time for {@code keepAliveTime}
+ * @param queueLength the fixed queue length to use for tasks
+ * @param threadFactory the thread factory to use for new threads
+ * @param blocking {@code true} if the executor should block when the queue is full and no threads are available, {@code false} to use the handoff executor
+ * @param handoffExecutor the executor which is called when blocking is disabled and a task cannot be accepted, or {@code null} to reject the task
+ */
+ public QueueExecutor(final String name, final int corePoolSize, final int maxPoolSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit, final int queueLength, final ThreadFactory threadFactory, final boolean blocking, final Executor handoffExecutor) {
+ this.name = name;
+ if (threadFactory == null) {
+ throw new NullPointerException("threadFactory is null");
+ }
+ if (queue == null) {
+ throw new NullPointerException("queue is null");
+ }
+ if (keepAliveTimeUnit == null) {
+ throw new NullPointerException("keepAliveTimeUnit is null");
+ }
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.threadFactory = threadFactory;
+ // configurable...
+ this.keepAliveTime = keepAliveTime;
+ this.keepAliveTimeUnit = keepAliveTimeUnit;
+ this.corePoolSize = corePoolSize;
+ this.maxPoolSize = maxPoolSize;
+ queue = new ArrayQueue<Runnable>(queueLength);
+ this.blocking = blocking;
+ this.handoffExecutor = handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Execute a task.
+ *
+ * @param task the task to execute
+ * @throws RejectedExecutionException when a task is rejected by the handoff executor
+ * @throws StoppedExecutorException when the executor is terminating
+ * @throws ExecutionInterruptedException when blocking is enabled and the current thread is interrupted before a task could be accepted
+ */
+ public void execute(final Runnable task) throws RejectedExecutionException {
+ final Executor executor;
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ for (;;) {
+ if (stop) {
+ throw new StoppedExecutorException("Executor is stopped");
+ }
+ // Try core thread first, then queue, then extra thread
+ final int count = threadCount;
+ if (count < corePoolSize) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ // next queue...
+ final Queue<Runnable> queue = this.queue;
+ if (queue.offer(task)) {
+ enqueueCondition.signal();
+ return;
+ }
+ // extra threads?
+ if (count < maxPoolSize) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ if (blocking) {
+ try {
+ removeCondition.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ExecutionInterruptedException("Thread interrupted");
+ }
+ } else {
+ // delegate the task outside of the lock.
+ rejectCount++;
+ executor = handoffExecutor;
+ break;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (executor != null) {
+ executor.execute(task);
+ }
+ return;
+ }
+
+ /** {@inheritDoc} */
+ public void shutdown() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ if (! stop) {
+ stop = true;
+ // wake up the whole town
+ removeCondition.signalAll();
+ enqueueCondition.signalAll();
+ for (Thread worker : workers) {
+ worker.interrupt();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public List<Runnable> shutdownNow() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ stop = true;
+ removeCondition.signalAll();
+ enqueueCondition.signalAll();
+ for (Thread worker : workers) {
+ worker.interrupt();
+ }
+ final Queue<Runnable> queue = this.queue;
+ final ArrayList<Runnable> list = new ArrayList<Runnable>(queue);
+ queue.clear();
+ return list;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public boolean isShutdown() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return stop;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public boolean isTerminated() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return stop && threadCount == 0;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
+ final Lock lock = this.lock;
+ lock.lockInterruptibly();
+ try {
+ if (workers.contains(Thread.currentThread())) {
+ throw new IllegalStateException("Cannot await termination of a thread pool from one of its threads");
+ }
+ final long start = System.currentTimeMillis();
+ long elapsed = 0L;
+ while (! stop && threadCount > 0) {
+ final long remaining = timeout - elapsed;
+ if (remaining <= 0) {
+ return false;
+ }
+ threadExitCondition.await(remaining, unit);
+ elapsed = unit.convert(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
+ }
+ return true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public boolean isAllowCoreThreadTimeout() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return allowCoreThreadTimeout;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.allowCoreThreadTimeout = allowCoreThreadTimeout;
+ if (allowCoreThreadTimeout) {
+ // wake everyone up so core threads can time out
+ enqueueCondition.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public int getCorePoolSize() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return corePoolSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void setCorePoolSize(final int corePoolSize) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ final int oldLimit = this.corePoolSize;
+ if (maxPoolSize < corePoolSize) {
+ // don't let the max thread limit be less than the core thread limit.
+ // the called method will signal as needed
+ setMaxPoolSize(corePoolSize);
+ } else if (oldLimit < corePoolSize) {
+ // we're growing the number of core threads
+ // therefore signal anyone waiting to add tasks; there might be more threads to add
+ removeCondition.signalAll();
+ } else if (oldLimit > corePoolSize) {
+ // we're shrinking the number of core threads
+ // therefore signal anyone waiting to remove tasks so the pool can shrink properly
+ enqueueCondition.signalAll();
+ } else {
+ // we aren't changing anything...
+ return;
+ }
+ this.corePoolSize = corePoolSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public int getMaxPoolSize() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return maxPoolSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void setMaxPoolSize(final int maxPoolSize) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ final int oldLimit = this.maxPoolSize;
+ if (maxPoolSize < corePoolSize) {
+ // don't let the max thread limit be less than the core thread limit.
+ // the called method will signal as needed
+ setCorePoolSize(maxPoolSize);
+ } else if (oldLimit < maxPoolSize) {
+ // we're growing the number of extra threads
+ // therefore signal anyone waiting to add tasks; there might be more threads to add
+ removeCondition.signalAll();
+ } else if (oldLimit > maxPoolSize) {
+ // we're shrinking the number of extra threads
+ // therefore signal anyone waiting to remove tasks so the pool can shrink properly
+ enqueueCondition.signalAll();
+ } else {
+ // we aren't changing anything...
+ return;
+ }
+ this.maxPoolSize = maxPoolSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public long getKeepAliveTime() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return keepAliveTime;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set the keep-alive time to the given amount of time.
+ *
+ * @param keepAliveTime the amount of time
+ * @param keepAliveTimeUnit the unit of time
+ */
+ public void setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveTimeUnit) {
+ if (keepAliveTimeUnit == null) {
+ throw new NullPointerException("keepAliveTimeUnit is null");
+ }
+ if (keepAliveTime < 0L) {
+ throw new IllegalArgumentException("keepAliveTime is less than zero");
+ }
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.keepAliveTime = keepAliveTimeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public void setKeepAliveTime(final long milliseconds) {
+ setKeepAliveTime(milliseconds, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Determine whether this thread pool executor is set to block when a task cannot be accepted immediately.
+ *
+ * @return {@code true} if blocking is enabled, {@code false} if the handoff executor is used
+ */
+ public boolean isBlocking() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return blocking;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set whether this thread pool executor should be set to block when a task cannot be accepted immediately.
+ *
+ * @param blocking {@code true} if blocking is enabled, {@code false} if the handoff executor is used
+ */
+ public void setBlocking(boolean blocking) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.blocking = blocking;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Get the handoff executor which is called when a task cannot be accepted immediately.
+ *
+ * @return the handoff executor
+ */
+ public Executor getHandoffExecutor() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Set the handoff executor which is called when a task cannot be accepted immediately.
+ *
+ * @param handoffExecutor the handoff executor
+ */
+ public void setHandoffExecutor(final Executor handoffExecutor) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.handoffExecutor = handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // call with lock held!
+ private void startNewThread(final Runnable task) {
+ final Thread thread = threadFactory.newThread(new Worker(task));
+ workers.add(thread);
+ final int size = workers.size();
+ if (size > largestPoolSize) {
+ largestPoolSize = size;
+ }
+ thread.start();
+ }
+
+ // call with lock held!
+ private Runnable pollTask() {
+ final Runnable task = queue.poll();
+ if (task != null) {
+ removeCondition.signal();
+ return task;
+ } else {
+ if (-- threadCount == 0) {
+ threadExitCondition.signalAll();
+ }
+ return null;
+ }
+ }
+
+ // call with lock held!
+ private Runnable takeTask() {
+ final Condition removeCondition = this.removeCondition;
+ Runnable task = queue.poll();
+ if (task != null) {
+ removeCondition.signal();
+ return task;
+ } else {
+ final Condition enqueueCondition = this.enqueueCondition;
+ final long start = System.currentTimeMillis();
+ boolean intr = Thread.interrupted();
+ try {
+ long elapsed = 0L;
+ for (;;) {
+ // these parameters may change on each iteration
+ final int threadCount = this.threadCount;
+ final int coreThreadLimit = corePoolSize;
+ final boolean allowCoreThreadTimeout = this.allowCoreThreadTimeout;
+ if (stop || threadCount > maxPoolSize) {
+ // too many threads. Handle a task if there is one, otherwise exit
+ return pollTask();
+ } else if (!allowCoreThreadTimeout && threadCount < coreThreadLimit) {
+ // ignore timeout until we are not a core thread or until core threads are allowed to time out
+ try {
+ enqueueCondition.await();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ } else {
+ final TimeUnit timeUnit = keepAliveTimeUnit;
+ final long time = keepAliveTime;
+ final long remaining = time - timeUnit.convert(elapsed, TimeUnit.MILLISECONDS);
+ if (remaining <= 0L && (allowCoreThreadTimeout || threadCount > coreThreadLimit)) {
+ // the timeout has expired
+ return pollTask();
+ }
+ try {
+ enqueueCondition.await(remaining, timeUnit);
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ task = queue.poll();
+ if (task != null) {
+ removeCondition.signal();
+ return task;
+ }
+ elapsed = System.currentTimeMillis() - start;
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ public int getCurrentThreadCount() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return workers.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public int getLargestThreadCount() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return largestPoolSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ public int getRejectedCount() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return rejectCount;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void runTask(Runnable task) {
+ if (task != null) try {
+ task.run();
+ } catch (Throwable t) {
+ log.errorf(t, "Task execution failed for task %s", task);
+ }
+ }
+
+ private class Worker implements Runnable {
+
+ private volatile Runnable first;
+
+ public Worker(final Runnable command) {
+ first = command;
+ }
+
+ public void run() {
+ final Lock lock = QueueExecutor.this.lock;
+ try {
+ Runnable task = first;
+ // Release reference to task
+ first = null;
+ runTask(task);
+ for (;;) {
+ // don't hang on to task while we possibly block waiting for the next one
+ task = null;
+ lock.lock();
+ try {
+ if (stop) {
+ // drain queue
+ if ((task = pollTask()) == null) {
+ return;
+ }
+ Thread.currentThread().interrupt();
+ } else {
+ // get next task
+ if ((task = takeTask()) == null) {
+ return;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ runTask(task);
+ Thread.interrupted();
+ }
+ } finally {
+ lock.lock();
+ try {
+ workers.remove(Thread.currentThread());
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+}
Copied: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java (from rev 91991, projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java)
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/QueuelessExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,516 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.Set;
+import java.util.HashSet;
+import org.jboss.logging.Logger;
+import org.jboss.threads.management.BoundedThreadPoolExecutorMBean;
+
+/**
+ * A queueless thread pool. If one or more threads are waiting for work when a task is submitted, it will be used.
+ * Otherwise, if fewer than the maximum threads are started, a new thread is created.
+ */
+public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BoundedThreadPoolExecutorMBean {
+ private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
+
+ private final String name;
+ private final ThreadFactory threadFactory;
+ private final DirectExecutor taskExecutor;
+
+ private final Lock lock = new ReentrantLock(false);
+ private final Condition runnableDequeued = lock.newCondition();
+ private final Condition nextReady = lock.newCondition();
+ private final Condition workerDequeued = lock.newCondition();
+ private final Condition taskEnqueued = lock.newCondition();
+ private final Condition threadDeath = lock.newCondition();
+
+ /**
+ * Protected by {@link #lock}
+ */
+ private final Set<Thread> runningThreads = new HashSet<Thread>(256);
+
+ /**
+ * Protected by {@link #lock}, signal {@link #runnableDequeued} on clear
+ */
+ private Runnable workRunnable;
+
+ /**
+ * Protected by {@link #lock}, signal {@link #workerDequeued} on clear
+ */
+ private Worker waitingWorker;
+
+ /**
+ * Configuration value.
+ * Protected by {@link #lock}
+ */
+ private long idleTimeout;
+
+ /**
+ * Configuration value.
+ * Protected by {@link #lock}
+ */
+ private int maxThreads;
+
+ /**
+ * Specify whether this executor blocks when no threads are available.
+ * Protected by {@link #lock}
+ */
+ private boolean blocking;
+
+ private Executor handoffExecutor;
+
+ private boolean stop;
+
+ //-- statistics --
+ private int largestPoolSize;
+ private int rejectedCount;
+
+ public QueuelessExecutor(final String name, final ThreadFactory threadFactory, final DirectExecutor taskExecutor, final Executor handoffExecutor, final long idleTimeout) {
+ this.name = name;
+ this.threadFactory = threadFactory;
+ this.taskExecutor = taskExecutor;
+ this.handoffExecutor = handoffExecutor;
+ this.idleTimeout = idleTimeout;
+ }
+
+ public long getIdleTimeout() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return idleTimeout;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setIdleTimeout(final long idleTimeout) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.idleTimeout = idleTimeout;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getMaxThreads() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return maxThreads;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setMaxThreads(final int maxThreads) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.maxThreads = maxThreads;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getRunningThreads() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return runningThreads.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getCorePoolSize() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return maxThreads;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setCorePoolSize(final int newSize) {
+ if (newSize < 1) {
+ throw new IllegalArgumentException("Pool size must be at least 1");
+ }
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ maxThreads = newSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public long getKeepAliveTime() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return idleTimeout;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setKeepAliveTime(final long milliseconds) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ idleTimeout = milliseconds;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getCurrentThreadCount() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return runningThreads.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getLargestThreadCount() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return largestPoolSize;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int getRejectedCount() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return rejectedCount;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isBlocking() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return blocking;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setBlocking(final boolean blocking) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.blocking = blocking;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public Executor getHandoffExecutor() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void setHandoffExecutor(final Executor handoffExecutor) {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ this.handoffExecutor = handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void shutdown() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ if (! stop) {
+ for (Thread runningThread : runningThreads) {
+ runningThread.interrupt();
+ }
+ }
+ stop = true;
+ // wake up all waiters
+ runnableDequeued.signalAll();
+ nextReady.signalAll();
+ workerDequeued.signalAll();
+ taskEnqueued.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ if (! stop) {
+ throw new IllegalStateException("Not shut down");
+ }
+ final Date deadline = new Date(clipHigh(unit.toMillis(timeout) + System.currentTimeMillis()));
+ final Condition threadDeath = this.threadDeath;
+ while (! runningThreads.isEmpty() && threadDeath.awaitUntil(deadline));
+ return runningThreads.isEmpty();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public List<Runnable> shutdownNow() {
+ shutdown();
+ // tasks are never queued
+ return Collections.emptyList();
+ }
+
+ public boolean isShutdown() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return stop;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isTerminated() {
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ return stop && runningThreads.isEmpty();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void execute(final Runnable command) {
+ final Executor executor;
+ final Set<Thread> runningThreads = this.runningThreads;
+ final Condition runnableDequeued = this.runnableDequeued;
+ final Lock lock = this.lock;
+ Runnable workRunnable;
+ lock.lock();
+ try {
+ for (;;) {
+ if (stop) {
+ throw new StoppedExecutorException("Executor has been shut down");
+ }
+ final Worker waitingWorker;
+ if ((waitingWorker = this.waitingWorker) != null) {
+ // a worker thread was waiting for a task; give it the task and wake it up
+ waitingWorker.setRunnable(command);
+ taskEnqueued.signal();
+ this.waitingWorker = null;
+ return;
+ }
+ // no worker thread was waiting
+ final int currentSize = runningThreads.size();
+ if (currentSize < maxThreads) {
+ // if we haven't reached the thread limit yet, start up another thread
+ final Thread thread = threadFactory.newThread(new Worker(command));
+ if (thread == null) {
+ throw new ThreadCreationException();
+ }
+ if (! runningThreads.add(thread)) {
+ throw new ThreadCreationException("Unable to add new thread to the running set");
+ }
+ if (currentSize >= largestPoolSize) {
+ largestPoolSize = currentSize + 1;
+ }
+ thread.start();
+ return;
+ }
+ if (! blocking) {
+ // not blocking, not accepted
+ executor = handoffExecutor;
+ rejectedCount++;
+ // fall out to execute outside of lock
+ break;
+ }
+ workRunnable = this.workRunnable;
+ if (workRunnable != null) {
+ // someone else is waiting for a worker, so we wait for them
+ try {
+ nextReady.await();
+ continue;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ExecutionInterruptedException();
+ }
+ }
+ this.workRunnable = command;
+ try {
+ runnableDequeued.await();
+ if (this.workRunnable == null) {
+ // task was accepted
+ nextReady.signal();
+ return;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ExecutionInterruptedException();
+ } finally {
+ this.workRunnable = null;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ if (executor != null) {
+ executor.execute(command);
+ }
+ }
+
+ private static long clipHigh(long value) {
+ return value < 0 ? Long.MAX_VALUE : value;
+ }
+
+ private final class Worker implements Runnable {
+
+ private Runnable runnable;
+
+ private Worker(final Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ private void setRunnable(final Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ private boolean awaitTimed(Condition condition, long idleSince) {
+ final long end = clipHigh(System.currentTimeMillis() + idleTimeout);
+ long remaining = end - idleSince;
+ if (remaining < 0L) {
+ return false;
+ }
+ if (stop) return false;
+ try {
+ condition.await(remaining, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ }
+ return ! stop;
+ }
+
+ public void run() {
+ final Lock lock = QueuelessExecutor.this.lock;
+ final Condition workerDequeued = QueuelessExecutor.this.workerDequeued;
+ final Condition runnableDequeued = QueuelessExecutor.this.runnableDequeued;
+ final Condition taskEnqueued = QueuelessExecutor.this.taskEnqueued;
+ final Set<Thread> runningThreads = QueuelessExecutor.this.runningThreads;
+ final DirectExecutor taskExecutor = QueuelessExecutor.this.taskExecutor;
+ final Thread thread = Thread.currentThread();
+ long idleSince = Long.MAX_VALUE;
+ Runnable runnable = this.runnable;
+ this.runnable = null;
+ try {
+ MAIN: for (;;) {
+
+ // Run task
+ try {
+ taskExecutor.execute(runnable);
+ } catch (Throwable t) {
+ log.errorf(t, "Task execution failed for task %s", runnable);
+ }
+ idleSince = System.currentTimeMillis();
+ // Get next task
+ lock.lock();
+ try {
+ if (stop || runningThreads.size() > maxThreads) {
+ if (runningThreads.remove(thread) && runningThreads.isEmpty()) {
+ threadDeath.signalAll();
+ }
+ return;
+ }
+ if ((runnable = workRunnable) != null) {
+ // there's a task, take it and continue on to the top of the loop
+ workRunnable = null;
+ runnableDequeued.signal();
+ } else {
+ // no executors are waiting, so we wait instead for an executor
+ while (waitingWorker != null) {
+ // wait... to wait
+ if (! awaitTimed(workerDequeued, idleSince)) return;
+ if ((runnable = workRunnable) != null) {
+ // sniped an easy one by luck!
+ continue MAIN;
+ }
+ }
+ waitingWorker = this;
+ try {
+ do {
+ // wait for a job
+ if (! awaitTimed(taskEnqueued, idleSince)) return;
+ } while ((runnable = this.runnable) == null);
+ } finally {
+ waitingWorker = null;
+ workerDequeued.signal();
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ } finally {
+ lock.lock();
+ try {
+ if (stop && runningThreads.remove(thread) && runningThreads.isEmpty()) {
+ threadDeath.signalAll();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+ }
+}
Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/RejectionPolicy.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,51 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-/**
- * Specify the way a task rejection should be handled.
- */
-public enum RejectionPolicy {
-
- /**
- * Abort execution with an exception.
- */
- ABORT,
- /**
- * Wait until there is room in the queue, or the calling thread is interrupted.
- */
- BLOCK,
- /**
- * Discard the excess task silently.
- */
- DISCARD,
- /**
- * Discard the oldest task in the queue silently, and enqueue the new task. If the queue has no capacity
- * then the new task will be discarded.
- */
- DISCARD_OLDEST,
- /**
- * Hand off the task to another executor.
- */
- HANDOFF,
-}
Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/SimpleQueueExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,595 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.ExecutorService;
-import java.util.Queue;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-
-/**
- * An executor which uses a regular queue to hold tasks. The executor may be tuned at runtime in many ways.
- */
-public final class SimpleQueueExecutor extends AbstractExecutorService implements ExecutorService, ThreadPoolExecutorMBean {
- private final String name;
- private final Lock lock = new ReentrantLock();
- // signal when a task is written to the queue
- private final Condition enqueueCondition = lock.newCondition();
- // signal when the queue is read
- private final Condition removeCondition = lock.newCondition();
- // signalled when threads terminate
- private final Condition threadExitCondition = lock.newCondition();
- private final ThreadFactory threadFactory;
-
- // all protected by poolLock...
- private int corePoolSize;
- private int maxPoolSize;
- private int largestPoolSize;
- private int rejectCount;
- private boolean allowCoreThreadTimeout;
- private long keepAliveTime;
- private TimeUnit keepAliveTimeUnit;
- private RejectionPolicy rejectionPolicy;
- private Executor handoffExecutor;
-
- private int threadCount;
- private Set<Thread> workers = new HashSet<Thread>();
-
- private boolean stop;
- private boolean interrupt;
-
- private Queue<Runnable> queue;
-
- public SimpleQueueExecutor(final String name, final int corePoolSize, final int maxPoolSize, final long keepAliveTime, final TimeUnit keepAliveTimeUnit, final Queue<Runnable> queue, final ThreadFactory threadFactory, final RejectionPolicy rejectionPolicy, final Executor handoffExecutor) {
- this.name = name;
- if (threadFactory == null) {
- throw new NullPointerException("threadFactory is null");
- }
- if (queue == null) {
- throw new NullPointerException("queue is null");
- }
- if (keepAliveTimeUnit == null) {
- throw new NullPointerException("keepAliveTimeUnit is null");
- }
- if (rejectionPolicy == null) {
- throw new NullPointerException("rejectionPolicy is null");
- }
- if (rejectionPolicy == RejectionPolicy.HANDOFF && handoffExecutor == null) {
- throw new NullPointerException("handoffExecutor is null");
- }
- final Lock lock = this.lock;
- lock.lock();
- try {
- this.threadFactory = threadFactory;
- // configurable...
- this.keepAliveTime = keepAliveTime;
- this.keepAliveTimeUnit = keepAliveTimeUnit;
- this.corePoolSize = corePoolSize;
- this.maxPoolSize = maxPoolSize;
- this.queue = queue;
- this.rejectionPolicy = rejectionPolicy;
- this.handoffExecutor = handoffExecutor;
- } finally {
- lock.unlock();
- }
- }
-
- public void execute(final Runnable task) throws RejectedExecutionException {
- final Lock lock = this.lock;
- try {
- lock.lockInterruptibly();
- try {
- for (;;) {
- if (stop) {
- throw new RejectedExecutionException("Executor is stopped");
- }
- // Try core thread first, then queue, then extra thread
- final int count = threadCount;
- if (count < corePoolSize) {
- startNewThread(task);
- threadCount = count + 1;
- return;
- }
- // next queue...
- final Queue<Runnable> queue = this.queue;
- if (queue.offer(task)) {
- enqueueCondition.signal();
- return;
- }
- // extra threads?
- if (count < maxPoolSize) {
- startNewThread(task);
- threadCount = count + 1;
- return;
- }
- rejectCount++;
- // how to reject the task...
- switch (rejectionPolicy) {
- case ABORT:
- throw new RejectedExecutionException("Executor is busy");
- case BLOCK:
- removeCondition.await();
- break;
- case DISCARD:
- return;
- case DISCARD_OLDEST:
- if (queue.poll() != null) {
- queue.add(task);
- enqueueCondition.signal();
- }
- return;
- case HANDOFF:
- handoffExecutor.execute(task);
- return;
- }
- }
- } finally {
- lock.unlock();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new ExecutionInterruptedException("Thread interrupted");
- }
- }
-
- public void shutdown() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- if (! stop) {
- stop = true;
- // wake up the whole town
- removeCondition.signalAll();
- enqueueCondition.signalAll();
- }
- } finally {
- lock.unlock();
- }
- }
-
- public List<Runnable> shutdownNow() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- stop = true;
- interrupt = true;
- removeCondition.signalAll();
- enqueueCondition.signalAll();
- for (Thread worker : workers) {
- worker.interrupt();
- }
- final Queue<Runnable> queue = this.queue;
- final ArrayList<Runnable> list = new ArrayList<Runnable>(queue);
- queue.clear();
- return list;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isShutdown() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return stop;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isTerminated() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return stop && threadCount == 0;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean awaitTermination(final long timeout, final TimeUnit unit) throws InterruptedException {
- final Lock lock = this.lock;
- lock.lockInterruptibly();
- try {
- if (workers.contains(Thread.currentThread())) {
- throw new IllegalStateException("Cannot await termination of a thread pool from one of its threads");
- }
- final long start = System.currentTimeMillis();
- long elapsed = 0L;
- while (! stop && threadCount > 0) {
- final long remaining = timeout - elapsed;
- if (remaining <= 0) {
- return false;
- }
- threadExitCondition.await(remaining, unit);
- elapsed = unit.convert(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
- }
- return true;
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isAllowCoreThreadTimeout() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return allowCoreThreadTimeout;
- } finally {
- lock.unlock();
- }
- }
-
- public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
- final Lock lock = this.lock;
- lock.lock();
- try {
- this.allowCoreThreadTimeout = allowCoreThreadTimeout;
- if (allowCoreThreadTimeout) {
- // wake everyone up so core threads can time out
- enqueueCondition.signalAll();
- }
- } finally {
- lock.unlock();
- }
- }
-
- public int getCorePoolSize() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return corePoolSize;
- } finally {
- lock.unlock();
- }
- }
-
- public void setCorePoolSize(final int corePoolSize) {
- final Lock lock = this.lock;
- lock.lock();
- try {
- final int oldLimit = this.corePoolSize;
- if (maxPoolSize < corePoolSize) {
- // don't let the max thread limit be less than the core thread limit.
- // the called method will signal as needed
- setMaxPoolSize(corePoolSize);
- } else if (oldLimit < corePoolSize) {
- // we're growing the number of core threads
- // therefore signal anyone waiting to add tasks; there might be more threads to add
- removeCondition.signalAll();
- } else if (oldLimit > corePoolSize) {
- // we're shrinking the number of core threads
- // therefore signal anyone waiting to remove tasks so the pool can shrink properly
- enqueueCondition.signalAll();
- } else {
- // we aren't changing anything...
- return;
- }
- this.corePoolSize = corePoolSize;
- } finally {
- lock.unlock();
- }
- }
-
- public int getMaxPoolSize() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return maxPoolSize;
- } finally {
- lock.unlock();
- }
- }
-
- public void setMaxPoolSize(final int maxPoolSize) {
- final Lock lock = this.lock;
- lock.lock();
- try {
- final int oldLimit = this.maxPoolSize;
- if (maxPoolSize < corePoolSize) {
- // don't let the max thread limit be less than the core thread limit.
- // the called method will signal as needed
- setCorePoolSize(maxPoolSize);
- } else if (oldLimit < maxPoolSize) {
- // we're growing the number of extra threads
- // therefore signal anyone waiting to add tasks; there might be more threads to add
- removeCondition.signalAll();
- } else if (oldLimit > maxPoolSize) {
- // we're shrinking the number of extra threads
- // therefore signal anyone waiting to remove tasks so the pool can shrink properly
- enqueueCondition.signalAll();
- } else {
- // we aren't changing anything...
- return;
- }
- this.maxPoolSize = maxPoolSize;
- } finally {
- lock.unlock();
- }
- }
-
- public long getKeepAliveTime() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return keepAliveTime;
- } finally {
- lock.unlock();
- }
- }
-
- public void setKeepAliveTime(final long keepAliveTime, final TimeUnit keepAliveTimeUnit) {
- if (keepAliveTimeUnit == null) {
- throw new NullPointerException("keepAliveTimeUnit is null");
- }
- if (keepAliveTime < 0L) {
- throw new IllegalArgumentException("keepAliveTime is less than zero");
- }
- final Lock lock = this.lock;
- lock.lock();
- try {
- this.keepAliveTime = keepAliveTimeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
- } finally {
- lock.unlock();
- }
- }
-
- public void setKeepAliveTime(final long milliseconds) {
- setKeepAliveTime(milliseconds, TimeUnit.MILLISECONDS);
- }
-
- public RejectionPolicy getRejectionPolicy() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return rejectionPolicy;
- } finally {
- lock.unlock();
- }
- }
-
- public void setRejectionPolicy(final RejectionPolicy newPolicy, final Executor handoffExecutor) {
- if (newPolicy == null) {
- throw new NullPointerException("rejectionPolicy is null");
- }
- if (newPolicy == RejectionPolicy.HANDOFF && handoffExecutor == null) {
- throw new NullPointerException("handoffExecutor is null");
- }
- final Lock lock = this.lock;
- lock.lock();
- try {
- if (rejectionPolicy == RejectionPolicy.BLOCK && newPolicy != RejectionPolicy.BLOCK) {
- // there could be blocking .execute() calls out there; give them a nudge
- removeCondition.signalAll();
- }
- rejectionPolicy = newPolicy;
- this.handoffExecutor = handoffExecutor;
- } finally {
- lock.unlock();
- }
- }
-
- // container lifecycle methods
- public void stop() {
- shutdown();
- try {
- awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- // todo log if fails?
- } catch (InterruptedException e) {
- // todo log it?
- Thread.currentThread().interrupt();
- }
- }
-
- public void destroy() {
- // todo is this the right behavior?
- shutdownNow();
- try {
- awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
- // todo log if fails?
- } catch (InterruptedException e) {
- // todo log it?
- Thread.currentThread().interrupt();
- }
- }
-
- // call with lock held!
- private void startNewThread(final Runnable task) {
- final Thread thread = threadFactory.newThread(new Worker(task));
- workers.add(thread);
- final int size = workers.size();
- if (size > largestPoolSize) {
- largestPoolSize = size;
- }
- thread.start();
- }
-
- // call with lock held!
- private Runnable pollTask() {
- final Runnable task = queue.poll();
- if (task != null) {
- removeCondition.signal();
- return task;
- } else {
- if (-- threadCount == 0) {
- threadExitCondition.signalAll();
- }
- return null;
- }
- }
-
- // call with lock held!
- private Runnable takeTask() {
- final Condition removeCondition = this.removeCondition;
- Runnable task = queue.poll();
- if (task != null) {
- removeCondition.signal();
- return task;
- } else {
- final Condition enqueueCondition = this.enqueueCondition;
- final long start = System.currentTimeMillis();
- boolean intr = Thread.interrupted();
- try {
- long elapsed = 0L;
- for (;;) {
- // these parameters may change on each iteration
- final int threadCount = this.threadCount;
- final int coreThreadLimit = corePoolSize;
- final boolean allowCoreThreadTimeout = this.allowCoreThreadTimeout;
- if (stop || threadCount > maxPoolSize) {
- // too many threads. Handle a task if there is one, otherwise exit
- return pollTask();
- } else if (!allowCoreThreadTimeout && threadCount < coreThreadLimit) {
- // ignore timeout until we are not a core thread or until core threads are allowed to time out
- try {
- enqueueCondition.await();
- } catch (InterruptedException e) {
- intr = true;
- }
- } else {
- final TimeUnit timeUnit = keepAliveTimeUnit;
- final long time = keepAliveTime;
- final long remaining = time - timeUnit.convert(elapsed, TimeUnit.MILLISECONDS);
- if (remaining <= 0L && (allowCoreThreadTimeout || threadCount > coreThreadLimit)) {
- // the timeout has expired
- return pollTask();
- }
- try {
- enqueueCondition.await(remaining, timeUnit);
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- task = queue.poll();
- if (task != null) {
- removeCondition.signal();
- return task;
- }
- elapsed = System.currentTimeMillis() - start;
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- public String getName() {
- return name;
- }
-
- public int getCurrentPoolSize() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return workers.size();
- } finally {
- lock.unlock();
- }
- }
-
- public int getLargestPoolSize() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return largestPoolSize;
- } finally {
- lock.unlock();
- }
- }
-
- public int getRejectedCount() {
- final Lock lock = this.lock;
- lock.lock();
- try {
- return rejectCount;
- } finally {
- lock.unlock();
- }
- }
-
- private class Worker implements Runnable {
-
- private Runnable first;
-
- public Worker(final Runnable command) {
- first = command;
- }
-
- public void run() {
- final Lock lock = SimpleQueueExecutor.this.lock;
- Runnable task = first;
- // Release reference to task
- first = null;
- lock.lock();
- try {
- for (;;) {
- if (task != null) {
- try {
- if (interrupt) {
- Thread.currentThread().interrupt();
- }
- lock.unlock();
- try {
- task.run();
- } finally {
- // this is OK because it's in the finally block after lock.unlock()
- //noinspection LockAcquiredButNotSafelyReleased
- lock.lock();
- }
- } catch (Throwable t) {
- // todo - log the exception perhaps
- }
- // don't hang on to task while we possibly block down below
- task = null;
- }
- if (stop) {
- // drain queue
- if ((task = pollTask()) == null) {
- return;
- }
- } else {
- // get next task
- if ((task = takeTask()) == null) {
- return;
- }
- }
- }
- } finally {
- workers.remove(Thread.currentThread());
- lock.unlock();
- }
- }
- }
-}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/StoppedExecutorException.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,70 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Thrown when a task is submitted to an executor which is in the process of, or has completed shutting down.
+ */
+public final class StoppedExecutorException extends RejectedExecutionException {
+
+ private static final long serialVersionUID = 4815103522815471074L;
+
+ /**
+ * Constructs a {@code StoppedExecutorException} with no detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ public StoppedExecutorException() {
+ }
+
+ /**
+ * Constructs a {@code StoppedExecutorException} with the specified detail message. The cause is not initialized, and
+ * may subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public StoppedExecutorException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a {@code StoppedExecutorException} with the specified cause. The detail message is set to:
+ * <pre>(cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of {@code cause}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public StoppedExecutorException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a {@code StoppedExecutorException} with the specified detail message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public StoppedExecutorException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadCreationException.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Thrown when a thread factory refuses to create a thread for a thread pool.
+ */
+public final class ThreadCreationException extends RejectedExecutionException {
+
+ private static final long serialVersionUID = 5666802385744283783L;
+
+ /**
+ * Constructs a {@code ThreadCreationException} with no detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ public ThreadCreationException() {
+ }
+
+ /**
+ * Constructs a {@code ThreadCreationException} with the specified detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public ThreadCreationException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a {@code ThreadCreationException} with the specified cause. The detail message is set to:
+ * <pre>(cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of {@code cause}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ThreadCreationException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a {@code ThreadCreationException} with the specified detail message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ThreadCreationException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
Modified: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -23,25 +23,134 @@
package org.jboss.threads;
import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.jboss.threads.management.ThreadExecutorMBean;
-class ThreadFactoryExecutor implements Executor {
+class ThreadFactoryExecutor implements Executor, ThreadExecutorMBean {
private final ThreadFactory factory;
+ private final Semaphore limitSemaphore;
- ThreadFactoryExecutor(final ThreadFactory factory) {
+ private final String name;
+ private final Object lock = new Object();
+ private int maxThreads;
+ private int largestThreadCount;
+ private int currentThreadCount;
+ private final AtomicInteger rejected = new AtomicInteger();
+ private volatile boolean blocking;
+
+ ThreadFactoryExecutor(final String name, final ThreadFactory factory, int maxThreads, boolean blocking) {
+ this.name = name;
this.factory = factory;
+ this.maxThreads = maxThreads;
+ this.blocking = blocking;
+ limitSemaphore = new Semaphore(maxThreads);
}
+ public int getMaxThreads() {
+ synchronized (lock) {
+ return maxThreads;
+ }
+ }
+
+ public void setMaxThreads(final int maxThreads) {
+ if (maxThreads < 0) {
+ throw new IllegalArgumentException("Max threads must not be negative");
+ }
+ synchronized (lock) {
+ final int old = this.maxThreads;
+ final int diff = old - maxThreads;
+ if (diff < 0) {
+ limitSemaphore.release(-diff);
+ } else if (diff > 0) {
+ if (! limitSemaphore.tryAcquire(diff)) {
+ throw new IllegalArgumentException("Cannot reduce maximum threads below current number of running threads");
+ }
+ }
+ this.maxThreads = maxThreads;
+ }
+ }
+
public void execute(final Runnable command) {
- final Thread thread = factory.newThread(command);
- if (thread == null) {
- throw new RejectedExecutionException("No threads can be created");
+ try {
+ final Semaphore semaphore = limitSemaphore;
+ if (blocking) {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ExecutionInterruptedException();
+ }
+ } else {
+ if (! semaphore.tryAcquire()) {
+ throw new RejectedExecutionException("Task limit reached");
+ }
+ }
+ boolean ok = false;
+ try {
+ final Thread thread = factory.newThread(new Runnable() {
+ public void run() {
+ try {
+ synchronized (lock) {
+ int t = ++currentThreadCount;
+ if (t > largestThreadCount) {
+ largestThreadCount = t;
+ }
+ }
+ command.run();
+ synchronized (lock) {
+ currentThreadCount--;
+ }
+ } finally {
+ limitSemaphore.release();
+ }
+ }
+ });
+ if (thread == null) {
+ throw new ThreadCreationException("No threads can be created");
+ }
+ thread.start();
+ ok = true;
+ } finally {
+ if (! ok) semaphore.release();
+ }
+ } catch (RejectedExecutionException e) {
+ rejected.getAndIncrement();
+ throw e;
}
- thread.start();
}
+ public boolean isBlocking() {
+ return blocking;
+ }
+
+ public void setBlocking(final boolean blocking) {
+ this.blocking = blocking;
+ }
+
+ public int getLargestThreadCount() {
+ synchronized (lock) {
+ return largestThreadCount;
+ }
+ }
+
+ public int getCurrentThreadCount() {
+ synchronized (lock) {
+ return currentThreadCount;
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public int getRejectedCount() {
+ return rejected.get();
+ }
+
public String toString() {
return String.format("%s (%s)", super.toString(), factory);
}
Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPool.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,207 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.locks.Condition;
-
-/**
- * A queueless thread pool. If one or more threads are waiting for work when a task is submitted, it will be used.
- * Otherwise, if fewer than the maximum threads are started, a new thread is created.
- */
-public final class ThreadPool {
-
- private final ThreadFactory threadFactory;
-
- private volatile long idleTimeout;
- private int maxThreads;
- private int runningThreads;
-
- private Task parkedTask;
- private final Lock poolLock = new ReentrantLock();
- // signal when parking a task
- private final Condition park = poolLock.newCondition();
- // signal when unparking a task
- private final Condition unpark = poolLock.newCondition();
-
- private State state = State.RUNNING;
-
- private enum State {
- RUNNING,
- KILL,
- }
-
- public ThreadPool(final ThreadFactory threadFactory) {
- this.threadFactory = threadFactory;
- idleTimeout = 30000L;
- maxThreads = 20;
- runningThreads = 0;
- }
-
- public void executeBlocking(final DirectExecutor taskExecutor, final Runnable runnable) throws InterruptedException {
- if (runnable == null) {
- throw new NullPointerException("runnable is null");
- }
- final Lock poolLock = this.poolLock;
- final Condition park = this.park;
- final Condition unpark = this.unpark;
- Task task;
- poolLock.lockInterruptibly();
- try {
- for (;;) {
- if (state == State.KILL) {
- throw new RejectedExecutionException("Thread pool is shut down");
- } else if ((task = parkedTask) != null) {
- parkedTask = null;
- unpark.signal();
- break;
- } else {
- final int runningThreads = this.runningThreads;
- if (runningThreads < maxThreads) {
- task = new Task(runnable, taskExecutor);
- final ThreadFactory threadFactory = this.threadFactory;
- final Thread newThread = threadFactory.newThread(task);
- if (newThread == null) {
- throw new RejectedExecutionException("Thread factory " + threadFactory + " will not create a thread");
- }
- newThread.start();
- this.runningThreads = runningThreads + 1;
- return;
- } else {
- park.await();
- }
- }
- }
- } finally {
- poolLock.unlock();
- }
- synchronized (task) {
- task.runnable = runnable;
- task.taskExecutor = taskExecutor;
- task.notify();
- }
- }
-
- private final class Task implements Runnable {
-
- /**
- * Protected by {@code this}.
- */
- private Runnable runnable;
- private DirectExecutor taskExecutor;
-
- Task(final Runnable runnable, final DirectExecutor taskExecutor) {
- synchronized (this) {
- this.runnable = runnable;
- this.taskExecutor = taskExecutor;
- }
- }
-
- public void run() {
- final Lock poolLock = ThreadPool.this.poolLock;
- Runnable runnable;
- DirectExecutor taskExecutor;
- try {
- synchronized (this) {
- runnable = this.runnable;
- taskExecutor = this.taskExecutor;
- this.runnable = null;
- this.taskExecutor = null;
- }
- long idleSince = runnable == null ? System.currentTimeMillis() : Long.MAX_VALUE;
- for (;;) {
- while (runnable == null) {
- // no task; park
- poolLock.lock();
- try {
- if (state == State.KILL) {
- return;
- }
- // wait for the spot to open
- Task task;
- while ((task = parkedTask) != null && task != this) {
- try {
- final long remaining = idleTimeout - (System.currentTimeMillis() - idleSince);
- if (remaining < 0L) {
- // idle timeout; don't bother finishing parking
- return;
- }
- unpark.await(remaining, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- if (state == State.KILL) {
- return;
- }
- }
- }
- parkedTask = this;
- } finally {
- poolLock.unlock();
- }
- // parked! Now just wait for a task to show up
- synchronized (this) {
- while ((runnable = this.runnable) == null) {
- try {
- final long remaining = idleTimeout - (System.currentTimeMillis() - idleSince);
- if (remaining < 0L) {
- // idle timeout; unpark and return
- return;
- }
- wait(remaining);
- } catch (InterruptedException e) {
- break;
- }
- }
- taskExecutor = this.taskExecutor;
- this.runnable = null;
- this.taskExecutor = null;
- }
- }
- (taskExecutor == null ? JBossExecutors.directExecutor() : taskExecutor).execute(runnable);
- synchronized (this) {
- runnable = this.runnable;
- taskExecutor = this.taskExecutor;
- this.runnable = null;
- this.taskExecutor = null;
- }
- if (runnable == null) idleSince = System.currentTimeMillis();
- }
- } finally {
- poolLock.lock();
- try {
- // If this task is parked, unpark it so someone else can get in there
- if (parkedTask == this) {
- parkedTask = null;
- unpark.signal();
- }
- runningThreads--;
- } finally {
- poolLock.unlock();
- }
- }
- }
- }
-}
Deleted: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2009, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.threads;
-
-/**
- *
- */
-public interface ThreadPoolExecutorMBean {
- String getName();
-
- boolean isAllowCoreThreadTimeout();
-
- void setAllowCoreThreadTimeout(boolean allow);
-
- int getCorePoolSize();
-
- void setCorePoolSize(int newSize);
-
- int getMaxPoolSize();
-
- void setMaxPoolSize(int newSize);
-
- long getKeepAliveTime();
-
- void setKeepAliveTime(long milliseconds);
-
- int getCurrentPoolSize();
-
- int getLargestPoolSize();
-
- int getRejectedCount();
-}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/UninterruptibleExecutor.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.Executor;
+
+class UninterruptibleExecutor implements Executor {
+
+ private final Executor delegate;
+
+ UninterruptibleExecutor(final Executor delegate) {
+ this.delegate = delegate;
+ }
+
+ public void execute(final Runnable command) {
+ JBossExecutors.executeUninterruptibly(delegate, command);
+ }
+}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedQueueThreadPoolExecutorMBean.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface BoundedQueueThreadPoolExecutorMBean extends BoundedThreadPoolExecutorMBean {
+ boolean isAllowCoreThreadTimeout();
+
+ void setAllowCoreThreadTimeout(boolean allow);
+
+ int getMaxPoolSize();
+
+ void setMaxPoolSize(int newSize);
+
+
+}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/BoundedThreadPoolExecutorMBean.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface BoundedThreadPoolExecutorMBean extends ThreadPoolExecutorMBean {
+
+ boolean isBlocking();
+
+ void setBlocking(boolean blocking);
+
+}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadExecutorMBean.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface ThreadExecutorMBean {
+ String getName();
+
+ int getRejectedCount();
+
+ int getCurrentThreadCount();
+
+ int getLargestThreadCount();
+}
Copied: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java (from rev 92149, projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/ThreadPoolExecutorMBean.java)
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/management/ThreadPoolExecutorMBean.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -0,0 +1,36 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads.management;
+
+/**
+ *
+ */
+public interface ThreadPoolExecutorMBean extends ThreadExecutorMBean {
+ int getCorePoolSize();
+
+ void setCorePoolSize(int newSize);
+
+ long getKeepAliveTime();
+
+ void setKeepAliveTime(long milliseconds);
+}
Modified: projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java
===================================================================
--- projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java 2010-01-11 22:40:02 UTC (rev 99252)
+++ projects/jboss-threads/trunk/main/src/test/java/org/jboss/threads/ThreadPoolTestCase.java 2010-01-11 23:14:52 UTC (rev 99253)
@@ -62,7 +62,7 @@
final int cnt = 100;
final CountDownLatch taskUnfreezer = new CountDownLatch(1);
final CountDownLatch taskFinishLine = new CountDownLatch(cnt);
- final ExecutorService simpleQueueExecutor = new SimpleQueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, RejectionPolicy.BLOCK, null);
+ final ExecutorService simpleQueueExecutor = new QueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, true, null);
for (int i = 0; i < cnt; i ++) {
simpleQueueExecutor.execute(new SimpleTask(taskUnfreezer, taskFinishLine));
}
@@ -87,7 +87,7 @@
final AtomicBoolean ran = new AtomicBoolean();
final CountDownLatch startLatch = new CountDownLatch(1);
- final ExecutorService simpleQueueExecutor = new SimpleQueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, RejectionPolicy.BLOCK, null);
+ final ExecutorService simpleQueueExecutor = new QueueExecutor("test-pool", 5, 5, 500L, TimeUnit.MILLISECONDS, new LinkedList<Runnable>(), threadFactory, true, null);
simpleQueueExecutor.execute(new Runnable() {
public void run() {
try {
@@ -121,7 +121,7 @@
final int cnt = queueSize + coreThreads + extraThreads;
final CountDownLatch taskUnfreezer = new CountDownLatch(1);
final CountDownLatch taskFinishLine = new CountDownLatch(cnt);
- final ExecutorService simpleQueueExecutor = new SimpleQueueExecutor("test-pool", coreThreads, coreThreads + extraThreads, 500L, TimeUnit.MILLISECONDS, new ArrayQueue<Runnable>(queueSize), threadFactory, RejectionPolicy.BLOCK, null);
+ final ExecutorService simpleQueueExecutor = new QueueExecutor("test-pool", coreThreads, coreThreads + extraThreads, 500L, TimeUnit.MILLISECONDS, new ArrayQueue<Runnable>(queueSize), threadFactory, true, null);
for (int i = 0; i < cnt; i ++) {
simpleQueueExecutor.execute(new SimpleTask(taskUnfreezer, taskFinishLine));
}
More information about the jboss-cvs-commits
mailing list