[jboss-cvs] JBossAS SVN: r99842 - projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 22 16:55:39 EST 2010
Author: david.lloyd at jboss.com
Date: 2010-01-22 16:55:38 -0500 (Fri, 22 Jan 2010)
New Revision: 99842
Added:
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java
Modified:
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java
projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
Log:
JBTHR-10: add a blocking executor type for JCA WorkManager
Added: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java (rev 0)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/BlockingExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -0,0 +1,87 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor which can optionally block or not block on task submission.
+ */
+public interface BlockingExecutor extends Executor {
+
+ /**
+ * Executes the given command at some time in the future. The command may execute in a new thread, in a pooled thread,
+ * or in the calling thread, at the discretion of the <tt>Executor</tt> implementation. The call may block
+ * or not block, depending on the configuration of the executor.
+ *
+ * @param command the task to submit
+ *
+ * @throws ExecutionInterruptedException if the executor is configured to block, and the thread was interrupted while waiting
+ * for the task to be accepted
+ * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws ThreadCreationException if a thread could not be created for some reason
+ * @throws RejectedExecutionException if execution is rejected for some other reason
+ * @throws NullPointerException if command is {@code null}
+ */
+ void execute(Runnable command);
+
+ /**
+ * Execute a task, blocking until it can be accepted, or until the calling thread is interrupted.
+ *
+ * @param task the task to submit
+ * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws ThreadCreationException if a thread could not be created for some reason
+ * @throws RejectedExecutionException if execution is rejected for some other reason
+ * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+ * @throws NullPointerException if command is {@code null}
+ */
+ void executeBlocking(Runnable task) throws RejectedExecutionException, InterruptedException;
+
+ /**
+ * Execute a task, blocking until it can be accepted, a timeout elapses, or the calling thread is interrupted.
+ *
+ * @param task the task to submit
+ * @param timeout the amount of time to wait
+ * @param unit the unit of time
+ * @throws ExecutionTimedOutException if the timeout elapsed before a task could be accepted
+ * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws ThreadCreationException if a thread could not be created for some reason
+ * @throws RejectedExecutionException if execution is rejected for some other reason
+ * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+ * @throws NullPointerException if command is {@code null}
+ */
+ void executeBlocking(Runnable task, long timeout, TimeUnit unit) throws RejectedExecutionException, InterruptedException;
+
+ /**
+ * Execute a task, without blocking.
+ *
+ * @param task the task to submit
+ * @throws StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws ThreadCreationException if a thread could not be created for some reason
+ * @throws RejectedExecutionException if execution is rejected for some other reason
+ * @throws NullPointerException if command is {@code null}
+ */
+ void executeNonBlocking(Runnable task) throws RejectedExecutionException;
+}
Copied: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java (from rev 99685, projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingExecutor.java)
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java (rev 0)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/DelegatingBlockingExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor that simply delegates to another executor. Use instances of this class to hide extra methods on
+ * another executor.
+ */
+class DelegatingBlockingExecutor implements BlockingExecutor {
+ private final BlockingExecutor delegate;
+
+ DelegatingBlockingExecutor(final BlockingExecutor delegate) {
+ this.delegate = delegate;
+ }
+
+ protected BlockingExecutor getDelegate() {
+ return delegate;
+ }
+
+ public void execute(final Runnable command) {
+ delegate.execute(command);
+ }
+
+ public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+ delegate.executeBlocking(task);
+ }
+
+ public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+ delegate.executeBlocking(task, timeout, unit);
+ }
+
+ public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+ delegate.executeNonBlocking(task);
+ }
+
+ public String toString() {
+ return String.format("%s -> %s", super.toString(), delegate);
+ }
+}
\ No newline at end of file
Added: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java (rev 0)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ExecutionTimedOutException.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.RejectedExecutionException;
+
+/**
+ * Thrown when an execute-with-timeout method is called and the timeout elapsed before a task could be <b>accepted</b>.
+ */
+public class ExecutionTimedOutException extends RejectedExecutionException {
+
+ private static final long serialVersionUID = 6577491781534695133L;
+
+ /**
+ * Constructs a {@code ExecutionTimedOutException} with no detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ public ExecutionTimedOutException() {
+ }
+
+ /**
+ * Constructs a {@code ExecutionTimedOutException} with the specified detail message. The cause is not initialized, and
+ * may subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public ExecutionTimedOutException(final String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a {@code ExecutionTimedOutException} with the specified cause. The detail message is set to:
+ * <pre>(cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of {@code cause}).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ExecutionTimedOutException(final Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a {@code ExecutionTimedOutException} with the specified detail message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ExecutionTimedOutException(final String msg, final Throwable cause) {
+ super(msg, cause);
+ }
+}
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossExecutors.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -324,7 +324,7 @@
* @param factory the thread factory to use
* @return the executor
*/
- public static Executor threadFactoryExecutor(final ThreadFactory factory) {
+ public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory) {
return new ThreadFactoryExecutor(factory, Integer.MAX_VALUE, false, directExecutor());
}
@@ -336,7 +336,7 @@
* @param maxThreads the maximum number of allowed threads
* @return the executor
*/
- public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads) {
+ public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads) {
return new ThreadFactoryExecutor(factory, maxThreads, false, directExecutor());
}
@@ -350,7 +350,7 @@
* @param blocking {@code true} if the submitter should block when the maximum number of threads has been reached
* @return the executor
*/
- public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking) {
+ public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking) {
return new ThreadFactoryExecutor(factory, maxThreads, blocking, directExecutor());
}
@@ -365,7 +365,7 @@
* @param taskExecutor the executor which should run each task
* @return the executor
*/
- public static Executor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking, final DirectExecutor taskExecutor) {
+ public static BlockingExecutor threadFactoryExecutor(final ThreadFactory factory, final int maxThreads, final boolean blocking, final DirectExecutor taskExecutor) {
return new ThreadFactoryExecutor(factory, maxThreads, blocking, taskExecutor);
}
@@ -433,6 +433,10 @@
// PROTECTED EXECUTOR SERVICE WRAPPERS
// ==================================================
+ public static BlockingExecutor protectedBlockingExecutor(final BlockingExecutor target) {
+ return new DelegatingBlockingExecutor(target);
+ }
+
/**
* Wrap an executor with an {@code ExecutorService} instance which supports all the features of {@code ExecutorService}
* except for shutting down the executor.
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/JBossThreadPoolExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -22,6 +22,7 @@
package org.jboss.threads;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.BlockingQueue;
@@ -33,7 +34,7 @@
/**
*
*/
-public final class JBossThreadPoolExecutor extends ThreadPoolExecutor implements BoundedQueueThreadPoolExecutorMBean {
+public final class JBossThreadPoolExecutor extends ThreadPoolExecutor implements BlockingExecutor, BoundedQueueThreadPoolExecutorMBean {
private final AtomicInteger rejectCount = new AtomicInteger();
@@ -57,10 +58,22 @@
setRejectedExecutionHandler(handler);
}
- public void execute(final Runnable command) {
- super.execute(command);
+ public void execute(final Runnable task) {
+ super.execute(task);
}
+ public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+ super.execute(task);
+ }
+
+ public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+ super.execute(task);
+ }
+
+ public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+ super.execute(task);
+ }
+
public int getLargestThreadCount() {
return super.getLargestPoolSize();
}
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/OrderedExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -23,9 +23,10 @@
package org.jboss.threads;
import java.util.ArrayDeque;
-import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
@@ -36,7 +37,7 @@
* More specifically, if a FIFO queue type is used, any call B to the {@link #execute(Runnable)} method that
* happens-after another call A to the same method, will result in B's task running after A's.
*/
-public final class OrderedExecutor implements Executor {
+public final class OrderedExecutor implements BlockingExecutor {
private final Executor parent;
private final Runnable runner = new Runner();
@@ -125,31 +126,62 @@
this(parent, new ArrayQueue<Runnable>(queueLength), blocking, handoffExecutor);
}
-
-
/**
* Run a task.
*
- * @param command the task to run.
+ * @param task the task to run.
*/
- public void execute(Runnable command) {
+ public void execute(Runnable task) {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
Executor executor;
OUT: for (;;) {
lock.lock();
try {
- while (! queue.offer(command)) {
+ if (! running) {
+ running = true;
+ boolean ok = false;
+ try {
+ parent.execute(runner);
+ ok = true;
+ } finally {
+ if (! ok) {
+ running = false;
+ }
+ }
+ }
+ if (! queue.offer(task)) {
if (blocking) {
try {
removeCondition.await();
+ continue;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ExecutionInterruptedException();
}
} else {
executor = handoffExecutor;
- break OUT;
+ break;
}
}
+ return;
+ } finally {
+ lock.unlock();
+ }
+ }
+ if (executor != null) {
+ executor.execute(task);
+ }
+ }
+
+ public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ OUT: for (;;) {
+ lock.lock();
+ try {
if (! running) {
running = true;
boolean ok = false;
@@ -162,13 +194,90 @@
}
}
}
+ if (! queue.offer(task)) {
+ removeCondition.await();
+ continue;
+ }
return;
} finally {
lock.unlock();
}
}
+ }
+
+ public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ long now = System.currentTimeMillis();
+ final long deadline = now + unit.toMillis(timeout);
+ if (deadline < 0L) {
+ executeBlocking(task);
+ return;
+ }
+ OUT: for (;;) {
+ lock.lock();
+ try {
+ if (! running) {
+ running = true;
+ boolean ok = false;
+ try {
+ parent.execute(runner);
+ ok = true;
+ } finally {
+ if (! ok) {
+ running = false;
+ }
+ }
+ }
+ if (! queue.offer(task)) {
+ final long remaining = deadline - now;
+ if (remaining <= 0L) {
+ throw new ExecutionTimedOutException();
+ }
+ removeCondition.await(remaining, TimeUnit.MILLISECONDS);
+ now = System.currentTimeMillis();
+ continue;
+ }
+ return;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ }
+
+ public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ Executor executor;
+ OUT: for (;;) {
+ lock.lock();
+ try {
+ if (! running) {
+ running = true;
+ boolean ok = false;
+ try {
+ parent.execute(runner);
+ ok = true;
+ } finally {
+ if (! ok) {
+ running = false;
+ }
+ }
+ }
+ if (! queue.offer(task)) {
+ executor = handoffExecutor;
+ break;
+ }
+ return;
+ } finally {
+ lock.unlock();
+ }
+ }
if (executor != null) {
- executor.execute(command);
+ executor.execute(task);
}
}
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueueExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -42,7 +42,7 @@
/**
* An executor which uses a regular queue to hold tasks. The executor may be tuned at runtime in many ways.
*/
-public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BoundedQueueThreadPoolExecutorMBean {
+public final class QueueExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedQueueThreadPoolExecutorMBean {
private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
private final Lock lock = new ReentrantLock();
@@ -155,6 +155,9 @@
* @throws ExecutionInterruptedException when blocking is enabled and the current thread is interrupted before a task could be accepted
*/
public void execute(final Runnable task) throws RejectedExecutionException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
final Executor executor;
final Lock lock = this.lock;
lock.lock();
@@ -205,6 +208,168 @@
return;
}
+ /**
+ * Execute a task, blocking until it can be accepted, or until the calling thread is interrupted.
+ *
+ * @param task the task to submit
+ *
+ * @throws org.jboss.threads.StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws org.jboss.threads.ThreadCreationException if a thread could not be created for some reason
+ * @throws java.util.concurrent.RejectedExecutionException if execution is rejected for some other reason
+ * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+ * @throws NullPointerException if command is {@code null}
+ */
+ public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ for (;;) {
+ if (stop) {
+ throw new StoppedExecutorException("Executor is stopped");
+ }
+ // Try core thread first, then queue, then extra thread
+ final int count = threadCount;
+ if (count < coreThreads) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ // next queue...
+ final Queue<Runnable> queue = this.queue;
+ if (queue.offer(task)) {
+ enqueueCondition.signal();
+ return;
+ }
+ // extra threads?
+ if (count < maxThreads) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ removeCondition.await();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Execute a task, blocking until it can be accepted, a timeout elapses, or the calling thread is interrupted.
+ *
+ * @param task the task to submit
+ * @param timeout the amount of time to wait
+ * @param unit the unit of time
+ *
+ * @throws org.jboss.threads.ExecutionTimedOutException if the timeout elapsed before a task could be accepted
+ * @throws org.jboss.threads.StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws org.jboss.threads.ThreadCreationException if a thread could not be created for some reason
+ * @throws java.util.concurrent.RejectedExecutionException if execution is rejected for some other reason
+ * @throws InterruptedException if the current thread was interrupted before the task could be accepted
+ * @throws NullPointerException if command is {@code null}
+ */
+ public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ long now = System.currentTimeMillis();
+ final long deadline = now + unit.toMillis(timeout);
+ if (deadline < 0L) {
+ executeBlocking(task);
+ return;
+ }
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ for (;;) {
+ if (stop) {
+ throw new StoppedExecutorException("Executor is stopped");
+ }
+ // Try core thread first, then queue, then extra thread
+ final int count = threadCount;
+ if (count < coreThreads) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ // next queue...
+ final Queue<Runnable> queue = this.queue;
+ if (queue.offer(task)) {
+ enqueueCondition.signal();
+ return;
+ }
+ // extra threads?
+ if (count < maxThreads) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ final long remaining = deadline - now;
+ if (remaining <= 0L) {
+ throw new ExecutionTimedOutException();
+ }
+ removeCondition.await(remaining, TimeUnit.MILLISECONDS);
+ now = System.currentTimeMillis();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Execute a task, without blocking.
+ *
+ * @param task the task to submit
+ *
+ * @throws org.jboss.threads.StoppedExecutorException if the executor was shut down before the task was accepted
+ * @throws org.jboss.threads.ThreadCreationException if a thread could not be created for some reason
+ * @throws java.util.concurrent.RejectedExecutionException if execution is rejected for some other reason
+ * @throws NullPointerException if command is {@code null}
+ */
+ public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ final Executor executor;
+ final Lock lock = this.lock;
+ lock.lock();
+ try {
+ if (stop) {
+ throw new StoppedExecutorException("Executor is stopped");
+ }
+ // Try core thread first, then queue, then extra thread
+ final int count = threadCount;
+ if (count < coreThreads) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ // next queue...
+ final Queue<Runnable> queue = this.queue;
+ if (queue.offer(task)) {
+ enqueueCondition.signal();
+ return;
+ }
+ // extra threads?
+ if (count < maxThreads) {
+ startNewThread(task);
+ threadCount = count + 1;
+ return;
+ }
+ // delegate the task outside of the lock.
+ rejectCount++;
+ executor = handoffExecutor;
+ } finally {
+ lock.unlock();
+ }
+ if (executor != null) {
+ executor.execute(task);
+ }
+ return;
+ }
+
/** {@inheritDoc} */
public void shutdown() {
final Lock lock = this.lock;
@@ -495,6 +660,9 @@
// call with lock held!
private void startNewThread(final Runnable task) {
final Thread thread = threadFactory.newThread(new Worker(task));
+ if (thread == null) {
+ throw new ThreadCreationException();
+ }
workers.add(thread);
final int size = workers.size();
if (size > largestPoolSize) {
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/QueuelessExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -43,7 +43,7 @@
* A queueless thread pool. If one or more threads are waiting for work when a task is submitted, it will be used.
* Otherwise, if fewer than the maximum threads are started, a new thread is created.
*/
-public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BoundedThreadPoolExecutorMBean {
+public final class QueuelessExecutor extends AbstractExecutorService implements ExecutorService, BlockingExecutor, BoundedThreadPoolExecutorMBean {
private static final Logger log = Logger.getLogger("org.jboss.threads.executor");
private final ThreadFactory threadFactory;
@@ -279,7 +279,10 @@
}
}
- public void execute(final Runnable command) {
+ public void execute(final Runnable task) {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
final Executor executor;
final Set<Thread> runningThreads = this.runningThreads;
final Condition runnableDequeued = this.runnableDequeued;
@@ -294,7 +297,7 @@
final Worker waitingWorker;
if ((waitingWorker = this.waitingWorker) != null) {
// a worker thread was waiting for a task; give it the task and wake it up
- waitingWorker.setRunnable(command);
+ waitingWorker.setRunnable(task);
taskEnqueued.signal();
this.waitingWorker = null;
return;
@@ -303,7 +306,7 @@
final int currentSize = runningThreads.size();
if (currentSize < maxThreads) {
// if we haven't reached the thread limit yet, start up another thread
- final Thread thread = threadFactory.newThread(new Worker(command));
+ final Thread thread = threadFactory.newThread(new Worker(task));
if (thread == null) {
throw new ThreadCreationException();
}
@@ -334,7 +337,7 @@
throw new ExecutionInterruptedException();
}
}
- this.workRunnable = command;
+ this.workRunnable = task;
try {
runnableDequeued.await();
if (this.workRunnable == null) {
@@ -353,12 +356,150 @@
lock.unlock();
}
if (executor != null) {
- executor.execute(command);
+ executor.execute(task);
} else {
throw new RejectedExecutionException();
}
}
+ public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ final Set<Thread> runningThreads = this.runningThreads;
+ final Condition runnableDequeued = this.runnableDequeued;
+ final Lock lock = this.lock;
+ Runnable workRunnable;
+ lock.lock();
+ try {
+ for (;;) {
+ if (stop) {
+ throw new StoppedExecutorException("Executor has been shut down");
+ }
+ final Worker waitingWorker;
+ if ((waitingWorker = this.waitingWorker) != null) {
+ // a worker thread was waiting for a task; give it the task and wake it up
+ waitingWorker.setRunnable(task);
+ taskEnqueued.signal();
+ this.waitingWorker = null;
+ return;
+ }
+ // no worker thread was waiting
+ final int currentSize = runningThreads.size();
+ if (currentSize < maxThreads) {
+ // if we haven't reached the thread limit yet, start up another thread
+ final Thread thread = threadFactory.newThread(new Worker(task));
+ if (thread == null) {
+ throw new ThreadCreationException();
+ }
+ if (! runningThreads.add(thread)) {
+ throw new ThreadCreationException("Unable to add new thread to the running set");
+ }
+ if (currentSize >= largestPoolSize) {
+ largestPoolSize = currentSize + 1;
+ }
+ thread.start();
+ return;
+ }
+ workRunnable = this.workRunnable;
+ if (workRunnable != null) {
+ // someone else is waiting for a worker, so we wait for them
+ nextReady.await();
+ continue;
+ }
+ this.workRunnable = task;
+ try {
+ runnableDequeued.await();
+ if (this.workRunnable == null) {
+ // task was accepted
+ nextReady.signal();
+ return;
+ }
+ } finally {
+ this.workRunnable = null;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ long now = System.currentTimeMillis();
+ final long deadline = now + unit.toMillis(timeout);
+ if (deadline < 0L) {
+ executeBlocking(task);
+ return;
+ }
+ final Set<Thread> runningThreads = this.runningThreads;
+ final Condition runnableDequeued = this.runnableDequeued;
+ final Lock lock = this.lock;
+ Runnable workRunnable;
+ lock.lock();
+ try {
+ for (;;) {
+ if (stop) {
+ throw new StoppedExecutorException("Executor has been shut down");
+ }
+ final Worker waitingWorker;
+ if ((waitingWorker = this.waitingWorker) != null) {
+ // a worker thread was waiting for a task; give it the task and wake it up
+ waitingWorker.setRunnable(task);
+ taskEnqueued.signal();
+ this.waitingWorker = null;
+ return;
+ }
+ // no worker thread was waiting
+ final int currentSize = runningThreads.size();
+ if (currentSize < maxThreads) {
+ // if we haven't reached the thread limit yet, start up another thread
+ final Thread thread = threadFactory.newThread(new Worker(task));
+ if (thread == null) {
+ throw new ThreadCreationException();
+ }
+ if (! runningThreads.add(thread)) {
+ throw new ThreadCreationException("Unable to add new thread to the running set");
+ }
+ if (currentSize >= largestPoolSize) {
+ largestPoolSize = currentSize + 1;
+ }
+ thread.start();
+ return;
+ }
+ workRunnable = this.workRunnable;
+ if (workRunnable != null) {
+ // someone else is waiting for a worker, so we wait for them
+ nextReady.await();
+ continue;
+ }
+ this.workRunnable = task;
+ try {
+ final long remaining = deadline - now;
+ if (remaining <= 0L) {
+ throw new ExecutionTimedOutException();
+ }
+ runnableDequeued.await(remaining, TimeUnit.MILLISECONDS);
+ now = System.currentTimeMillis();
+ if (this.workRunnable == null) {
+ // task was accepted
+ nextReady.signal();
+ return;
+ }
+ } finally {
+ this.workRunnable = null;
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+ }
+
private static long clipHigh(long value) {
return value < 0 ? Long.MAX_VALUE : value;
}
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/StoppedExecutorException.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -26,7 +26,7 @@
/**
* Thrown when a task is submitted to an executor which is in the process of, or has completed shutting down.
*/
-public final class StoppedExecutorException extends RejectedExecutionException {
+public class StoppedExecutorException extends RejectedExecutionException {
private static final long serialVersionUID = 4815103522815471074L;
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadCreationException.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -27,7 +27,7 @@
/**
* Thrown when a thread factory refuses to create a thread for a thread pool.
*/
-public final class ThreadCreationException extends RejectedExecutionException {
+public class ThreadCreationException extends RejectedExecutionException {
private static final long serialVersionUID = 5666802385744283783L;
Modified: projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java
===================================================================
--- projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java 2010-01-22 20:30:21 UTC (rev 99841)
+++ projects/jboss-threads/trunk/jboss-threads/src/main/java/org/jboss/threads/ThreadFactoryExecutor.java 2010-01-22 21:55:38 UTC (rev 99842)
@@ -22,14 +22,14 @@
package org.jboss.threads;
-import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.threads.management.BoundedThreadPoolExecutorMBean;
-class ThreadFactoryExecutor implements Executor, BoundedThreadPoolExecutorMBean {
+class ThreadFactoryExecutor implements BlockingExecutor, BoundedThreadPoolExecutorMBean {
private final ThreadFactory factory;
private final Semaphore limitSemaphore;
@@ -74,7 +74,10 @@
}
}
- public void execute(final Runnable command) {
+ public void execute(final Runnable task) {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
try {
final Semaphore semaphore = limitSemaphore;
if (blocking) {
@@ -100,7 +103,7 @@
largestThreadCount = t;
}
}
- taskExecutor.execute(command);
+ taskExecutor.execute(task);
synchronized (lock) {
currentThreadCount--;
}
@@ -123,6 +126,133 @@
}
}
+ public void executeBlocking(final Runnable task) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ try {
+ final Semaphore semaphore = limitSemaphore;
+ semaphore.acquire();
+ boolean ok = false;
+ try {
+ final Thread thread = factory.newThread(new Runnable() {
+ public void run() {
+ try {
+ synchronized (lock) {
+ int t = ++currentThreadCount;
+ if (t > largestThreadCount) {
+ largestThreadCount = t;
+ }
+ }
+ taskExecutor.execute(task);
+ synchronized (lock) {
+ currentThreadCount--;
+ }
+ } finally {
+ limitSemaphore.release();
+ }
+ }
+ });
+ if (thread == null) {
+ throw new ThreadCreationException("No threads can be created");
+ }
+ thread.start();
+ ok = true;
+ } finally {
+ if (! ok) semaphore.release();
+ }
+ } catch (RejectedExecutionException e) {
+ rejected.getAndIncrement();
+ throw e;
+ }
+ }
+
+ public void executeBlocking(final Runnable task, final long timeout, final TimeUnit unit) throws RejectedExecutionException, InterruptedException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ try {
+ final Semaphore semaphore = limitSemaphore;
+ if (! semaphore.tryAcquire(timeout, unit)) {
+ throw new ExecutionTimedOutException();
+ }
+ boolean ok = false;
+ try {
+ final Thread thread = factory.newThread(new Runnable() {
+ public void run() {
+ try {
+ synchronized (lock) {
+ int t = ++currentThreadCount;
+ if (t > largestThreadCount) {
+ largestThreadCount = t;
+ }
+ }
+ taskExecutor.execute(task);
+ synchronized (lock) {
+ currentThreadCount--;
+ }
+ } finally {
+ limitSemaphore.release();
+ }
+ }
+ });
+ if (thread == null) {
+ throw new ThreadCreationException("No threads can be created");
+ }
+ thread.start();
+ ok = true;
+ } finally {
+ if (! ok) semaphore.release();
+ }
+ } catch (RejectedExecutionException e) {
+ rejected.getAndIncrement();
+ throw e;
+ }
+ }
+
+ public void executeNonBlocking(final Runnable task) throws RejectedExecutionException {
+ if (task == null) {
+ throw new NullPointerException("task is null");
+ }
+ try {
+ final Semaphore semaphore = limitSemaphore;
+ if (! semaphore.tryAcquire()) {
+ throw new RejectedExecutionException("Task limit reached");
+ }
+ boolean ok = false;
+ try {
+ final Thread thread = factory.newThread(new Runnable() {
+ public void run() {
+ try {
+ synchronized (lock) {
+ int t = ++currentThreadCount;
+ if (t > largestThreadCount) {
+ largestThreadCount = t;
+ }
+ }
+ taskExecutor.execute(task);
+ synchronized (lock) {
+ currentThreadCount--;
+ }
+ } finally {
+ limitSemaphore.release();
+ }
+ }
+ });
+ if (thread == null) {
+ throw new ThreadCreationException("No threads can be created");
+ }
+ thread.start();
+ ok = true;
+ } finally {
+ if (! ok) semaphore.release();
+ }
+ } catch (RejectedExecutionException e) {
+ rejected.getAndIncrement();
+ throw e;
+ }
+ }
+
public boolean isBlocking() {
return blocking;
}
More information about the jboss-cvs-commits
mailing list