[jboss-cvs] JBossAS SVN: r86762 - projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Apr 3 14:56:12 EDT 2009


Author: david.lloyd at jboss.com
Date: 2009-04-03 14:56:12 -0400 (Fri, 03 Apr 2009)
New Revision: 86762

Added:
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java
   projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java
Log:
Atomic array utility methods; simple load-balancing executor

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/AtomicArray.java	2009-04-03 18:56:12 UTC (rev 86762)
@@ -0,0 +1,377 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.lang.reflect.Array;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+
+/**
+ * Utility for snapshot/copy-on-write arrays.  To use these methods, two things are required: an immutable array
+ * stored on a volatile field, and an instance of
+ * {@link java.util.concurrent.atomic.AtomicReferenceFieldUpdater AtomicReferenceFieldUpdater}
+ * which corresponds to that field.  Some of these methods perform multi-step operations; if the array field value is
+ * changed in the middle of such an operation, the operation is retried.  To avoid spinning, in some situations it
+ * may be advisable to hold a write lock to prevent multiple concurrent updates.
+ *
+ * @param <T> the type which contains the target field
+ * @param <V> the array value type
+ */
+public final class AtomicArray<T, V> {
+
+    private final AtomicReferenceFieldUpdater<T, V[]> updater;
+    private final Class<V> componentType;
+    private final V[] emptyArray;
+
+    /**
+     * Construct an instance.
+     *
+     * @param updater the field updater
+     * @param componentType the component class
+     */
+    public AtomicArray(AtomicReferenceFieldUpdater<T, V[]> updater, Class<V> componentType) {
+        this.updater = updater;
+        this.componentType = componentType;
+        emptyArray = newInstance(componentType, 0);
+    }
+
+    /**
+     * Convenience method to create an instance.
+     *
+     * @param updater the field updater
+     * @param componentType the component class
+     * @param <T> the type which contains the target field
+     * @param <V> the array value type
+     * @return the new instance
+     */
+    public static <T, V> AtomicArray<T, V> create(AtomicReferenceFieldUpdater<T, V[]> updater, Class<V> componentType) {
+        return new AtomicArray<T,V>(updater, componentType);
+    }
+
+    /**
+     * Convenience method to set the field value to the empty array.  Empty array instances are shared.
+     *
+     * @param instance the instance holding the field
+     */
+    public void clear(T instance) {
+        updater.set(instance, emptyArray);
+    }
+
+    /**
+     * Update the value of this array.
+     *
+     * @param instance the instance holding the field
+     * @param value the new value
+     */
+    public void set(T instance, V[] value) {
+        updater.set(instance, value);
+    }
+
+    /**
+     * Atomically get and update the value of this array.
+     *
+     * @param instance the instance holding the field
+     * @param value the new value
+     */
+    public V[] getAndSet(T instance, V[] value) {
+        return updater.getAndSet(instance, value);
+    }
+
+    /**
+     * Atomically replace the array with a new array which is one element longer, and which includes the given value.
+     *
+     * @param instance the instance holding the field
+     * @param value the updated value
+     */
+    public void add(T instance, V value) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            final V[] newVal = Arrays.copyOf(oldVal, oldLen + 1);
+            newVal[oldLen] = value;
+            if (updater.compareAndSet(instance, oldVal, newVal)) {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Atomically replace the array with a new array which is one element longer, and which includes the given value,
+     * if the value is not already present within the array.  This method does a linear search for the target value.
+     *
+     * @param instance the instance holding the field
+     * @param value the updated value
+     * @param identity {@code true} if comparisons should be done using reference identity, or {@code false} to use the {@code equals()} method
+     * @return {@code true} if the value was added, or {@code false} if it was already present
+     */
+    public boolean addIfAbsent(T instance, V value, boolean identity) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            if (identity || value == null) {
+                for (int i = 0; i < oldLen; i++) {
+                    if (oldVal[i] == value) {
+                        return false;
+                    }
+                }
+            } else {
+                for (int i = 0; i < oldLen; i++) {
+                    if (value.equals(oldVal[i])) {
+                        return false;
+                    }
+                }
+            }
+            final V[] newVal = Arrays.copyOf(oldVal, oldLen + 1);
+            newVal[oldLen] = value;
+            if (updater.compareAndSet(instance, oldVal, newVal)) {
+                return true;
+            }
+        }
+    }
+
+    /**
+     * Atomically replace the array with a new array which does not include the first occurrance of the given value, if
+     * the value is present in the array.
+     *
+     * @param instance the instance holding the field
+     * @param value the updated value
+     * @param identity {@code true} if comparisons should be done using reference identity, or {@code false} to use the {@code equals()} method
+     * @return {@code true} if the value was removed, or {@code false} if it was not present
+     */
+    public boolean remove(T instance, V value, boolean identity) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            if (oldLen == 0) {
+                return false;
+            } else {
+                int index = -1;
+                if (identity || value == null) {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (oldVal[i] == value) {
+                            index = i;
+                            break;
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (value.equals(oldVal[i])) {
+                            index = i;
+                            break;
+                        }
+                    }
+                }
+                if (index == -1) {
+                    return false;
+                }
+                final V[] newVal = newInstance(componentType, oldLen - 1);
+                System.arraycopy(oldVal, 0, newVal, 0, index);
+                System.arraycopy(oldVal, index + 1, newVal, index, oldLen - index - 1);
+                if (updater.compareAndSet(instance, oldVal, newVal)) {
+                    return true;
+                }
+            }
+        }
+    }
+
+    /**
+     * Atomically replace the array with a new array which does not include any occurrances of the given value, if
+     * the value is present in the array.
+     *
+     * @param instance the instance holding the field
+     * @param value the updated value
+     * @param identity {@code true} if comparisons should be done using reference identity, or {@code false} to use the {@code equals()} method
+     * @return the number of values removed
+     */
+    public int removeAll(T instance, V value, boolean identity) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            if (oldLen == 0) {
+                return 0;
+            } else {
+                final boolean[] removeSlots = new boolean[oldLen];
+                int removeCount = 0;
+                if (identity || value == null) {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (oldVal[i] == value) {
+                            removeSlots[i] = true;
+                            removeCount++;
+                        }
+                    }
+                } else {
+                    for (int i = 0; i < oldLen; i++) {
+                        if (value.equals(oldVal[i])) {
+                            removeSlots[i] = true;
+                            removeCount++;
+                        }
+                    }
+                }
+                if (removeCount == 0) {
+                    return 0;
+                }
+                final int newLen = oldLen - removeCount;
+                final V[] newVal;
+                if (newLen == 0) {
+                    newVal = emptyArray;
+                } else {
+                    newVal = newInstance(componentType, newLen);
+                    for (int i = 0, j = 0; i < oldLen; i ++) {
+                        if (! removeSlots[i]) {
+                            newVal[j++] = oldVal[i];
+                        }
+                    }
+                }
+                if (updater.compareAndSet(instance, oldVal, newVal)) {
+                    return removeCount;
+                }
+            }
+        }
+    }
+
+    private static final Comparator<Comparable> comparator = new Comparator<Comparable>() {
+        @SuppressWarnings({ "unchecked" })
+        public int compare(final Comparable o1, final Comparable o2) {
+            return o1.compareTo(o2);
+        }
+    };
+
+    @SuppressWarnings({ "unchecked" })
+    public static <V extends Comparable<? super V>> Comparator<V> naturalOrder() {
+        return (Comparator<V>) comparator;
+    }
+
+    /**
+     * Add a value to a sorted array.  Does not check for duplicates.
+     *
+     * @param instance the instance holding the field
+     * @param value the value to add
+     * @param comparator a comparator, or {@code null} to use natural ordering
+     */
+    public void add(T instance, V value, Comparator<? super V> comparator) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            final int pos = insertionPoint(Arrays.binarySearch(oldVal, value, comparator));
+            final V[] newVal = newInstance(componentType, oldLen + 1);
+            System.arraycopy(oldVal, 0, newVal, 0, pos);
+            newVal[pos] = value;
+            System.arraycopy(oldVal, pos, newVal, pos + 1, oldLen - pos);
+            if (updater.compareAndSet(instance, oldVal, newVal)) {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Add a value to a sorted array if it is not already present.  Does not check for duplicates.
+     *
+     * @param instance the instance holding the field
+     * @param value the value to add
+     * @param comparator a comparator, or {@code null} to use natural ordering
+     */
+    public boolean addIfAbsent(T instance, V value, Comparator<? super V> comparator) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            final int pos = Arrays.binarySearch(oldVal, value, comparator);
+            if (pos < 0) {
+                return false;
+            }
+            final V[] newVal = newInstance(componentType, oldLen + 1);
+            System.arraycopy(oldVal, 0, newVal, 0, pos);
+            newVal[pos] = value;
+            System.arraycopy(oldVal, pos, newVal, pos + 1, oldLen - pos);
+            if (updater.compareAndSet(instance, oldVal, newVal)) {
+                return true;
+            }
+        }
+    }
+
+    /**
+     * Remove a value to a sorted array.  Does not check for duplicates.  If there are multiple occurrances of a value,
+     * there is no guarantee as to which one is removed.
+     *
+     * @param instance the instance holding the field
+     * @param value the value to remove
+     * @param comparator a comparator, or {@code null} to use natural ordering
+     */
+    public boolean remove(T instance, V value, Comparator<? super V> comparator) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            final int oldLen = oldVal.length;
+            if (oldLen == 0) {
+                return false;
+            } else {
+                final int pos = Arrays.binarySearch(oldVal, value, comparator);
+                if (pos < 0) {
+                    return false;
+                }
+                final V[] newVal = newInstance(componentType, oldLen - 1);
+                System.arraycopy(oldVal, 0, newVal, 0, pos);
+                System.arraycopy(oldVal, pos + 1, newVal, pos, oldLen - pos - 1);
+                if (updater.compareAndSet(instance, oldVal, newVal)) {
+                    return true;
+                }
+            }
+        }
+    }
+
+    /**
+     * Sort an array.
+     *
+     * @param instance the instance holding the field
+     * @param comparator a comparator, or {@code null} to use natural ordering
+     */
+    public void sort(T instance, Comparator<? super V> comparator) {
+        final AtomicReferenceFieldUpdater<T, V[]> updater = this.updater;
+        for (;;) {
+            final V[] oldVal = updater.get(instance);
+            if (oldVal.length == 0) {
+                return;
+            }
+            final V[] newVal = oldVal.clone();
+            Arrays.sort(newVal, comparator);
+            if (updater.compareAndSet(instance, oldVal, newVal)) {
+                return;
+            }
+        }
+    }
+
+    private static int insertionPoint(int searchResult) {
+        return searchResult > 0 ? searchResult : - (searchResult + 1);
+    }
+
+    @SuppressWarnings({ "unchecked" })
+    private static <V> V[] newInstance(Class<V> componentType, int length) {
+        return (V[]) Array.newInstance(componentType, length);
+    }
+}

Added: projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java
===================================================================
--- projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java	                        (rev 0)
+++ projects/jboss-threads/trunk/main/src/main/java/org/jboss/threads/BalancingExecutor.java	2009-04-03 18:56:12 UTC (rev 86762)
@@ -0,0 +1,128 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.threads;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
+
+/**
+ * A simple load-balancing executor.  If no delegate executors are defined, then tasks are rejected.  Executors are
+ * chosen in a round-robin fashion.
+ */
+public class BalancingExecutor implements Executor {
+
+    private volatile Executor[] executors = null;
+    private final AtomicInteger seq = new AtomicInteger();
+    private final Lock writeLock = new ReentrantLock();
+
+    private static final AtomicArray<BalancingExecutor, Executor> executorsUpdater = AtomicArray.create(newUpdater(BalancingExecutor.class, Executor[].class, "executors"), Executor.class);
+
+    /**
+     * Construct a new instance.
+     */
+    public BalancingExecutor() {
+        executorsUpdater.clear(this);
+    }
+
+    /**
+     * Construct a new instance.
+     *
+     * @param executors the initial list of executors to delegate to
+     */
+    public BalancingExecutor(Executor... executors) {
+        if (executors != null && executors.length > 0) {
+            final Executor[] clone = executors.clone();
+            for (int i = 0; i < clone.length; i++) {
+                if (clone[i] == null) {
+                    throw new NullPointerException("executor at index " + i + " is null");
+                }
+            }
+            executorsUpdater.set(this, clone);
+        } else {
+            executorsUpdater.clear(this);
+        }
+    }
+
+    /**
+     * Execute a task.
+     *
+     * @param command the task to execute
+     * @throws RejectedExecutionException if no executors are available to run the task
+     */
+    public void execute(final Runnable command) throws RejectedExecutionException {
+        final Executor[] executors = this.executors;
+        final int len = executors.length;
+        if (len == 0) {
+            throw new RejectedExecutionException("No executors available to run task");
+        }
+        executors[seq.getAndIncrement() % len].execute(command);
+    }
+
+    /**
+     * Clear out all delegate executors at once.  Tasks will be rejected until another delegate executor is added.
+     */
+    public void clear() {
+        executorsUpdater.clear(this);
+    }
+
+    /**
+     * Add a delegate executor.
+     *
+     * @param executor the executor to add
+     */
+    public void addExecutor(final Executor executor) {
+        if (executor == null) {
+            throw new NullPointerException("executor is null");
+        }
+        final Lock lock = writeLock;
+        lock.lock();
+        try {
+            executorsUpdater.add(this, executor);
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * Remove a delegate executor.
+     *
+     * @param executor the executor to remove
+     */
+    public void removeExecutor(final Executor executor) {
+        if (executor == null) {
+            return;
+        }
+        final Lock lock = writeLock;
+        lock.lock();
+        try {
+            executorsUpdater.remove(this, executor, true);
+        } finally {
+            lock.unlock();
+        }
+    }
+}




More information about the jboss-cvs-commits mailing list