[jboss-cvs] JBossAS SVN: r86762 - projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Apr 3 14:56:12 EDT 2009
Author: david.lloyd at jboss.com
Date: 2009-04-03 14:56:12 -0400 (Fri, 03 Apr 2009)
New Revision: 86762
Added:
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java
projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java
Log:
Atomic array utility methods; simple load-balancing executor
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java 2009-04-03 18:56:12 UTC (rev 86762)
@@ -0,0 +1,377 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Utility for snapshot/copy-on-write arrays. To use these methods, two things are required: an immutable array
+ * stored on a volatile field, and an instance of
+ * {@link java.util.concurrent.atomic.AtomicReferenceFieldUpdater AtomicReferenceFieldUpdater}
+ * which corresponds to that field. Some of these methods perform multi-step operations; if the array field value is
+ * changed in the middle of such an operation, the operation is retried. To avoid spinning, in some situations it
+ * may be advisable to hold a write lock to prevent multiple concurrent updates.
+ *
+ * @param <T> the type which contains the target field
+ * @param <V> the array value type
+ */
+public final class AtomicArray<T, V> {
+
+ private final AtomicReferenceFieldUpdater<T, V[]> updater;
+ private final Class<V> componentType;
+ private final V[] emptyArray;
+
+ /**
+ * Construct an instance.
+ *
+ * @param updater the field updater
+ * @param componentType the component class
+ */
+ public AtomicArray(AtomicReferenceFieldUpdater<T, V[]> updater, Class<V> componentType) {
+ this.updater = updater;
+ this.componentType = componentType;
+ emptyArray = newInstance(componentType, 0);
+ }
+
+ /**
+ * Convenience method to create an instance.
+ *
+ * @param updater the field updater
+ * @param componentType the component class
+ * @param <T> the type which contains the target field
+ * @param <V> the array value type
+ * @return the new instance
+ */
+ public static <T, V> AtomicArray<T, V> create(AtomicReferenceFieldUpdater<T, V[]> updater, Class<V> componentType) {
+ return new AtomicArray<T,V>(updater, componentType);
+ }
+
+ /**
+ * Convenience method to set the field value to the empty array. Empty array instances are shared.
+ *
+ * @param instance the instance holding the field
+ */
+ public void clear(T instance) {
+ updater.set(instance, emptyArray);
+ }
+
+ /**
+ * Update the value of this array.
+ *
+ * @param instance the instance holding the field
+ * @param value the new value
+ */
+ public void set(T instance, V[] value) {
+ updater.set(instance, value);
+ }
+
+ /**
+ * Atomically get and update the value of this array.
+ *
+ * @param instance the instance holding the field
+ * @param value the new value
+ */
+ public V[] getAndSet(T instance, V[] value) {
+ return updater.getAndSet(instance, value);
+ }
+
+ /**
+ * Atomically replace the array with a new array which is one element longer, and which includes the given value.
+ *
+ * @param instance the instance holding the field
+ * @param value the updated value
+ */
+ public void add(T instance, V value) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ final V[] newVal = Arrays.copyOf(oldVal, oldLen + 1);
+ newVal[oldLen] = value;
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Atomically replace the array with a new array which is one element longer, and which includes the given value,
+ * if the value is not already present within the array. This method does a linear search for the target value.
+ *
+ * @param instance the instance holding the field
+ * @param value the updated value
+ * @param identity {@code true} if comparisons should be done using reference identity, or {@code false} to use the {@code equals()} method
+ * @return {@code true} if the value was added, or {@code false} if it was already present
+ */
+ public boolean addIfAbsent(T instance, V value, boolean identity) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ if (identity || value == null) {
+ for (int i = 0; i < oldLen; i++) {
+ if (oldVal[i] == value) {
+ return false;
+ }
+ }
+ } else {
+ for (int i = 0; i < oldLen; i++) {
+ if (value.equals(oldVal[i])) {
+ return false;
+ }
+ }
+ }
+ final V[] newVal = Arrays.copyOf(oldVal, oldLen + 1);
+ newVal[oldLen] = value;
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Atomically replace the array with a new array which does not include the first occurrance of the given value, if
+ * the value is present in the array.
+ *
+ * @param instance the instance holding the field
+ * @param value the updated value
+ * @param identity {@code true} if comparisons should be done using reference identity, or {@code false} to use the {@code equals()} method
+ * @return {@code true} if the value was removed, or {@code false} if it was not present
+ */
+ public boolean remove(T instance, V value, boolean identity) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ if (oldLen == 0) {
+ return false;
+ } else {
+ int index = -1;
+ if (identity || value == null) {
+ for (int i = 0; i < oldLen; i++) {
+ if (oldVal[i] == value) {
+ index = i;
+ break;
+ }
+ }
+ } else {
+ for (int i = 0; i < oldLen; i++) {
+ if (value.equals(oldVal[i])) {
+ index = i;
+ break;
+ }
+ }
+ }
+ if (index == -1) {
+ return false;
+ }
+ final V[] newVal = newInstance(componentType, oldLen - 1);
+ System.arraycopy(oldVal, 0, newVal, 0, index);
+ System.arraycopy(oldVal, index + 1, newVal, index, oldLen - index - 1);
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Atomically replace the array with a new array which does not include any occurrances of the given value, if
+ * the value is present in the array.
+ *
+ * @param instance the instance holding the field
+ * @param value the updated value
+ * @param identity {@code true} if comparisons should be done using reference identity, or {@code false} to use the {@code equals()} method
+ * @return the number of values removed
+ */
+ public int removeAll(T instance, V value, boolean identity) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ if (oldLen == 0) {
+ return 0;
+ } else {
+ final boolean[] removeSlots = new boolean[oldLen];
+ int removeCount = 0;
+ if (identity || value == null) {
+ for (int i = 0; i < oldLen; i++) {
+ if (oldVal[i] == value) {
+ removeSlots[i] = true;
+ removeCount++;
+ }
+ }
+ } else {
+ for (int i = 0; i < oldLen; i++) {
+ if (value.equals(oldVal[i])) {
+ removeSlots[i] = true;
+ removeCount++;
+ }
+ }
+ }
+ if (removeCount == 0) {
+ return 0;
+ }
+ final int newLen = oldLen - removeCount;
+ final V[] newVal;
+ if (newLen == 0) {
+ newVal = emptyArray;
+ } else {
+ newVal = newInstance(componentType, newLen);
+ for (int i = 0, j = 0; i < oldLen; i ++) {
+ if (! removeSlots[i]) {
+ newVal[j++] = oldVal[i];
+ }
+ }
+ }
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return removeCount;
+ }
+ }
+ }
+ }
+
+ private static final Comparator<Comparable> comparator = new Comparator<Comparable>() {
+ @SuppressWarnings({ "unchecked" })
+ public int compare(final Comparable o1, final Comparable o2) {
+ return o1.compareTo(o2);
+ }
+ };
+
+ @SuppressWarnings({ "unchecked" })
+ public static <V extends Comparable<? super V>> Comparator<V> naturalOrder() {
+ return (Comparator<V>) comparator;
+ }
+
+ /**
+ * Add a value to a sorted array. Does not check for duplicates.
+ *
+ * @param instance the instance holding the field
+ * @param value the value to add
+ * @param comparator a comparator, or {@code null} to use natural ordering
+ */
+ public void add(T instance, V value, Comparator<? super V> comparator) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ final int pos = insertionPoint(Arrays.binarySearch(oldVal, value, comparator));
+ final V[] newVal = newInstance(componentType, oldLen + 1);
+ System.arraycopy(oldVal, 0, newVal, 0, pos);
+ newVal[pos] = value;
+ System.arraycopy(oldVal, pos, newVal, pos + 1, oldLen - pos);
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Add a value to a sorted array if it is not already present. Does not check for duplicates.
+ *
+ * @param instance the instance holding the field
+ * @param value the value to add
+ * @param comparator a comparator, or {@code null} to use natural ordering
+ */
+ public boolean addIfAbsent(T instance, V value, Comparator<? super V> comparator) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ final int pos = Arrays.binarySearch(oldVal, value, comparator);
+ if (pos < 0) {
+ return false;
+ }
+ final V[] newVal = newInstance(componentType, oldLen + 1);
+ System.arraycopy(oldVal, 0, newVal, 0, pos);
+ newVal[pos] = value;
+ System.arraycopy(oldVal, pos, newVal, pos + 1, oldLen - pos);
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Remove a value to a sorted array. Does not check for duplicates. If there are multiple occurrances of a value,
+ * there is no guarantee as to which one is removed.
+ *
+ * @param instance the instance holding the field
+ * @param value the value to remove
+ * @param comparator a comparator, or {@code null} to use natural ordering
+ */
+ public boolean remove(T instance, V value, Comparator<? super V> comparator) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ final int oldLen = oldVal.length;
+ if (oldLen == 0) {
+ return false;
+ } else {
+ final int pos = Arrays.binarySearch(oldVal, value, comparator);
+ if (pos < 0) {
+ return false;
+ }
+ final V[] newVal = newInstance(componentType, oldLen - 1);
+ System.arraycopy(oldVal, 0, newVal, 0, pos);
+ System.arraycopy(oldVal, pos + 1, newVal, pos, oldLen - pos - 1);
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Sort an array.
+ *
+ * @param instance the instance holding the field
+ * @param comparator a comparator, or {@code null} to use natural ordering
+ */
+ public void sort(T instance, Comparator<? super V> comparator) {
+ final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+ for (;;) {
+ final V[] oldVal = updater.get(instance);
+ if (oldVal.length == 0) {
+ return;
+ }
+ final V[] newVal = oldVal.clone();
+ Arrays.sort(newVal, comparator);
+ if (updater.compareAndSet(instance, oldVal, newVal)) {
+ return;
+ }
+ }
+ }
+
+ private static int insertionPoint(int searchResult) {
+ return searchResult > 0 ? searchResult : - (searchResult + 1);
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private static <V> V[] newInstance(Class<V> componentType, int length) {
+ return (V[]) Array.newInstance(componentType, length);
+ }
+}
Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java 2009-04-03 18:56:12 UTC (rev 86762)
@@ -0,0 +1,128 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
+
+/**
+ * A simple load-balancing executor. If no delegate executors are defined, then tasks are rejected. Executors are
+ * chosen in a round-robin fashion.
+ */
+public class BalancingExecutor implements Executor {
+
+ private volatile Executor[] executors = null;
+ private final AtomicInteger seq = new AtomicInteger();
+ private final Lock writeLock = new ReentrantLock();
+
+ private static final AtomicArray<BalancingExecutor, Executor> executorsUpdater = AtomicArray.create(newUpdater(BalancingExecutor.class, Executor[].class, "executors"), Executor.class);
+
+ /**
+ * Construct a new instance.
+ */
+ public BalancingExecutor() {
+ executorsUpdater.clear(this);
+ }
+
+ /**
+ * Construct a new instance.
+ *
+ * @param executors the initial list of executors to delegate to
+ */
+ public BalancingExecutor(Executor... executors) {
+ if (executors != null && executors.length > 0) {
+ final Executor[] clone = executors.clone();
+ for (int i = 0; i < clone.length; i++) {
+ if (clone[i] == null) {
+ throw new NullPointerException("executor at index " + i + " is null");
+ }
+ }
+ executorsUpdater.set(this, clone);
+ } else {
+ executorsUpdater.clear(this);
+ }
+ }
+
+ /**
+ * Execute a task.
+ *
+ * @param command the task to execute
+ * @throws RejectedExecutionException if no executors are available to run the task
+ */
+ public void execute(final Runnable command) throws RejectedExecutionException {
+ final Executor[] executors = this.executors;
+ final int len = executors.length;
+ if (len == 0) {
+ throw new RejectedExecutionException("No executors available to run task");
+ }
+ executors[seq.getAndIncrement() % len].execute(command);
+ }
+
+ /**
+ * Clear out all delegate executors at once. Tasks will be rejected until another delegate executor is added.
+ */
+ public void clear() {
+ executorsUpdater.clear(this);
+ }
+
+ /**
+ * Add a delegate executor.
+ *
+ * @param executor the executor to add
+ */
+ public void addExecutor(final Executor executor) {
+ if (executor == null) {
+ throw new NullPointerException("executor is null");
+ }
+ final Lock lock = writeLock;
+ lock.lock();
+ try {
+ executorsUpdater.add(this, executor);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Remove a delegate executor.
+ *
+ * @param executor the executor to remove
+ */
+ public void removeExecutor(final Executor executor) {
+ if (executor == null) {
+ return;
+ }
+ final Lock lock = writeLock;
+ lock.lock();
+ try {
+ executorsUpdater.remove(this, executor, true);
+ } finally {
+ lock.unlock();
+ }
+ }
+}
More information about the jboss-cvs-commits
mailing list