[infinispan-commits] Infinispan SVN: r328 - in trunk/core/src/main/java/org/infinispan: distribution and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed May 20 11:18:23 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-20 11:18:23 -0400 (Wed, 20 May 2009)
New Revision: 328

Added:
   trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java
   trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java
Removed:
   trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
   trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
Modified:
   trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java
   trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java
   trunk/core/src/main/java/org/infinispan/AdvancedCache.java
   trunk/core/src/main/java/org/infinispan/Cache.java
   trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
   trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
   trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Log:
Async map code

Modified: trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -8,11 +8,11 @@
 import org.infinispan.factories.ComponentRegistry;
 import org.infinispan.interceptors.base.CommandInterceptor;
 import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -130,51 +130,51 @@
       return cache.replace(k, oV, nV, lifespan, lifespanUnit, maxIdle, maxIdleUnit, flags);
    }
 
-   public Future<V> putAsync(K key, V value, Flag... flags) {
+   public NotifyingFuture<V> putAsync(K key, V value, Flag... flags) {
       return cache.putAsync(key, value, flags);
    }
 
-   public Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
+   public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
       return cache.putAsync(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
    }
 
-   public Future<V> putIfAbsentAsync(K key, V value, Flag... flags) {
+   public NotifyingFuture<V> putIfAbsentAsync(K key, V value, Flag... flags) {
       return cache.putIfAbsentAsync(key, value, flags);
    }
 
-   public Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
+   public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
       return cache.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
    }
 
-   public Future<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags) {
+   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags) {
       return cache.putAllAsync(map, flags);
    }
 
-   public Future<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
+   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
       return cache.putAllAsync(map, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
    }
 
-   public Future<V> removeAsync(Object key, Flag... flags) {
+   public NotifyingFuture<V> removeAsync(Object key, Flag... flags) {
       return cache.removeAsync(key, flags);
    }
 
-   public Future<Void> clearAsync(Flag... flags) {
+   public NotifyingFuture<Void> clearAsync(Flag... flags) {
       return cache.clearAsync(flags);
    }
 
-   public Future<V> replaceAsync(K k, V v, Flag... flags) {
+   public NotifyingFuture<V> replaceAsync(K k, V v, Flag... flags) {
       return cache.replaceAsync(k, v, flags);
    }
 
-   public Future<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags) {
+   public NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags) {
       return cache.replaceAsync(k, oV, nV, flags);
    }
 
-   public Future<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
+   public NotifyingFuture<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
       return cache.replaceAsync(k, v, lifespan, lifespanUnit, maxIdle, maxIdleUnit, flags);
    }
 
-   public Future<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
+   public NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
       return cache.replaceAsync(k, oV, nV, lifespan, lifespanUnit, maxIdle, maxIdleUnit, flags);
    }
 
@@ -186,78 +186,6 @@
       return cache.get(key, flags);
    }
 
-   public Future<V> putAsync(K key, V value) {
-      return cache.putAsync(key, value);
-   }
-
-   public Future<V> putAsync(K key, V value, long lifespan, TimeUnit unit) {
-      return cache.putAsync(key, value, lifespan, unit);
-   }
-
-   public Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return cache.putAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
-   }
-
-   public Future<Void> putAllAsync(Map<? extends K, ? extends V> data) {
-      return cache.putAllAsync(data);
-   }
-
-   public Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit) {
-      return cache.putAllAsync(data, lifespan, unit);
-   }
-
-   public Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return cache.putAllAsync(data, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
-   }
-
-   public Future<Void> clearAsync() {
-      return cache.clearAsync();
-   }
-
-   public Future<V> putIfAbsentAsync(K key, V value) {
-      return cache.putIfAbsentAsync(key, value);
-   }
-
-   public Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit) {
-      return cache.putIfAbsentAsync(key, value, lifespan, unit);
-   }
-
-   public Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return cache.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
-   }
-
-   public Future<V> removeAsync(Object key) {
-      return cache.removeAsync(key);
-   }
-
-   public Future<Boolean> removeAsync(Object key, Object value) {
-      return cache.removeAsync(key, value);
-   }
-
-   public Future<V> replaceAsync(K key, V value) {
-      return cache.replaceAsync(key, value);
-   }
-
-   public Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit) {
-      return cache.replaceAsync(key, value, lifespan, unit);
-   }
-
-   public Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return cache.replaceAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
-   }
-
-   public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
-      return cache.replaceAsync(key, oldValue, newValue);
-   }
-
-   public Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit) {
-      return cache.replaceAsync(key, oldValue, newValue, lifespan, unit);
-   }
-
-   public Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
-      return cache.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
-   }
-
    public void lock(K key) {
       cache.lock(key);
    }

Modified: trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -3,22 +3,23 @@
 import org.infinispan.config.Configuration;
 import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.manager.CacheManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
 
 import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-/**                                                                 
- * This is a convenient base class for implementing a cache delegate. The only constructor takes a {@link Cache} argument, to
- * which each method call is delegated. One can extend this class and override the method sub-set it is interested in.
- * There is also an similar implmentation for {@link org.infinispan.AdvancedCache}:
- * {@link org.infinispan.AbstractDelegatingAdvancedCache}.
+/**
+ * This is a convenient base class for implementing a cache delegate. The only constructor takes a {@link Cache}
+ * argument, to which each method call is delegated. One can extend this class and override the method sub-set it is
+ * interested in. There is also an similar implmentation for {@link org.infinispan.AdvancedCache}: {@link
+ * org.infinispan.AbstractDelegatingAdvancedCache}.
  *
+ * @author Mircea.Markus at jboss.com
  * @see org.infinispan.AbstractDelegatingAdvancedCache
- * @author Mircea.Markus at jboss.com
  */
-public abstract class AbstractDelegatingCache<K, V> implements Cache<K, V> {
+public class AbstractDelegatingCache<K, V> implements Cache<K, V> {
 
    private Cache<K, V> cache;
 
@@ -98,6 +99,78 @@
       return cache.replace(key, oldValue, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
    }
 
+   public NotifyingFuture<V> putAsync(K key, V value) {
+      return cache.putAsync(key, value);
+   }
+
+   public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit unit) {
+      return cache.putAsync(key, value, lifespan, unit);
+   }
+
+   public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return cache.putAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+   }
+
+   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data) {
+      return cache.putAllAsync(data);
+   }
+
+   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit) {
+      return cache.putAllAsync(data, lifespan, unit);
+   }
+
+   public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return cache.putAllAsync(data, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+   }
+
+   public NotifyingFuture<Void> clearAsync() {
+      return cache.clearAsync();
+   }
+
+   public NotifyingFuture<V> putIfAbsentAsync(K key, V value) {
+      return cache.putIfAbsentAsync(key, value);
+   }
+
+   public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit) {
+      return cache.putIfAbsentAsync(key, value, lifespan, unit);
+   }
+
+   public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return cache.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+   }
+
+   public NotifyingFuture<V> removeAsync(Object key) {
+      return cache.removeAsync(key);
+   }
+
+   public NotifyingFuture<Boolean> removeAsync(Object key, Object value) {
+      return cache.removeAsync(key, value);
+   }
+
+   public NotifyingFuture<V> replaceAsync(K key, V value) {
+      return cache.replaceAsync(key, value);
+   }
+
+   public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit) {
+      return cache.replaceAsync(key, value, lifespan, unit);
+   }
+
+   public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return cache.replaceAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+   }
+
+   public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) {
+      return cache.replaceAsync(key, oldValue, newValue);
+   }
+
+   public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit) {
+      return cache.replaceAsync(key, oldValue, newValue, lifespan, unit);
+   }
+
+   public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+      return cache.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+   }
+
    public AdvancedCache<K, V> getAdvancedCache() {
       return cache.getAdvancedCache();
    }

Modified: trunk/core/src/main/java/org/infinispan/AdvancedCache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AdvancedCache.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AdvancedCache.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -8,11 +8,11 @@
 import org.infinispan.factories.ComponentRegistry;
 import org.infinispan.interceptors.base.CommandInterceptor;
 import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -123,29 +123,29 @@
 
 
    // -- async methods --
-   Future<V> putAsync(K key, V value, Flag... flags);
+   NotifyingFuture<V> putAsync(K key, V value, Flag... flags);
 
-   Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
+   NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
 
-   Future<V> putIfAbsentAsync(K key, V value, Flag... flags);
+   NotifyingFuture<V> putIfAbsentAsync(K key, V value, Flag... flags);
 
-   Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
+   NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
 
-   Future<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags);
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags);
 
-   Future<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
 
-   Future<V> removeAsync(Object key, Flag... flags);
+   NotifyingFuture<V> removeAsync(Object key, Flag... flags);
 
-   Future<Void> clearAsync(Flag... flags);
+   NotifyingFuture<Void> clearAsync(Flag... flags);
 
-   Future<V> replaceAsync(K k, V v, Flag... flags);
+   NotifyingFuture<V> replaceAsync(K k, V v, Flag... flags);
 
-   Future<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags);
+   NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags);
 
-   Future<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
+   NotifyingFuture<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
 
-   Future<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
+   NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
 
    boolean containsKey(Object key, Flag... flags);
 

Deleted: trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -1,44 +0,0 @@
-package org.infinispan;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Wraps up return values for the asunc API
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class AsyncReturnValue implements Future<Object> {
-   final Future<Object> networkCallFuture;
-   final Object actualReturnValue;
-
-   public AsyncReturnValue(Future<Object> networkCallFuture, Object actualReturnValue) {
-      this.networkCallFuture = networkCallFuture;
-      this.actualReturnValue = actualReturnValue;
-   }
-
-   public boolean cancel(boolean mayInterruptIfRunning) {
-      return networkCallFuture.cancel(mayInterruptIfRunning);
-   }
-
-   public boolean isCancelled() {
-      return networkCallFuture.isCancelled();
-   }
-
-   public boolean isDone() {
-      return networkCallFuture.isDone();
-   }
-
-   public Object get() throws InterruptedException, ExecutionException {
-      networkCallFuture.get();
-      return actualReturnValue;
-   }
-
-   public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-      networkCallFuture.get(timeout, unit);
-      return actualReturnValue;
-   }
-}

Modified: trunk/core/src/main/java/org/infinispan/Cache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/Cache.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/Cache.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -28,10 +28,10 @@
 import org.infinispan.manager.CacheManager;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.notifications.Listenable;
+import org.infinispan.util.concurrent.NotifyingFuture;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -54,15 +54,16 @@
  * <h3>Asynchronous operations</h3> Cache also supports the use of "async" remote operations.  Note that these methods
  * only really make sense if you are using a clustered cache.  I.e., when used in LOCAL mode, these "async" operations
  * offer no benefit whatsoever.  These methods, such as {@link #putAsync(Object, Object)} offer the best of both worlds
- * between a fully synchronous and a fully asynchronous cache in that a {@link Future} is returned.  The <tt>Future</tt>
- * can then be ignored or thrown away for typical asynchronous behaviour, or queried for synchronous behaviour, which
- * would block until any remote calls complete.  Note that all remote calls are, as far as the transport is concerned,
- * synchronous.  This allows you the guarantees that remote calls succeed, while not blocking your application thread
- * unnecessarily.  For example, usage such as the following could benefit from the async operations:
+ * between a fully synchronous and a fully asynchronous cache in that a {@link NotifyingFuture} is returned.  The
+ * <tt>NotifyingFuture</tt> can then be ignored or thrown away for typical asynchronous behaviour, or queried for
+ * synchronous behaviour, which would block until any remote calls complete.  Note that all remote calls are, as far as
+ * the transport is concerned, synchronous.  This allows you the guarantees that remote calls succeed, while not
+ * blocking your application thread unnecessarily.  For example, usage such as the following could benefit from the
+ * async operations:
  * <pre>
- *   Future f1 = cache.putAsync("key1", "value1");
- *   Future f2 = cache.putAsync("key2", "value2");
- *   Future f3 = cache.putAsync("key3", "value3");
+ *   NotifyingFuture f1 = cache.putAsync("key1", "value1");
+ *   NotifyingFuture f2 = cache.putAsync("key2", "value2");
+ *   NotifyingFuture f3 = cache.putAsync("key3", "value3");
  *   f1.get();
  *   f2.get();
  *   f3.get();
@@ -72,8 +73,8 @@
  * especially advantageous if the cache uses distribution and the three keys map to different cache instances in the
  * cluster.
  * <p/>
- * Also, the use of async operations when within a transaction return your local value only, as expected.  A Future is
- * still returned though for API consistency.
+ * Also, the use of async operations when within a transaction return your local value only, as expected.  A
+ * NotifyingFuture is still returned though for API consistency.
  * <p/>
  * <h3>Constructing a Cache</h3> An instance of the Cache is usually obtained by using a {@link CacheManager}.
  * <pre>
@@ -310,7 +311,7 @@
     * @param value value to store
     * @return a future containing the old value replaced.
     */
-   Future<V> putAsync(K key, V value);
+   NotifyingFuture<V> putAsync(K key, V value);
 
    /**
     * Asynchronous version of {@link #put(Object, Object, long, TimeUnit)} .  This method does not block on remote
@@ -323,7 +324,7 @@
     * @param unit     time unit for lifespan
     * @return a future containing the old value replaced
     */
-   Future<V> putAsync(K key, V value, long lifespan, TimeUnit unit);
+   NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit unit);
 
    /**
     * Asynchronous version of {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)}.  This method does not block
@@ -339,7 +340,7 @@
     * @param maxIdleUnit  time unit for max idle time
     * @return a future containing the old value replaced
     */
-   Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+   NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
 
    /**
     * Asynchronous version of {@link #putAll(Map)}.  This method does not block on remote calls, even if your cache mode
@@ -348,7 +349,7 @@
     * @param data to store
     * @return a future containing a void return type
     */
-   Future<Void> putAllAsync(Map<? extends K, ? extends V> data);
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data);
 
    /**
     * Asynchronous version of {@link #putAll(Map, long, TimeUnit)}.  This method does not block on remote calls, even if
@@ -359,7 +360,7 @@
     * @param unit     time unit for lifespan
     * @return a future containing a void return type
     */
-   Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit);
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit);
 
    /**
     * Asynchronous version of {@link #putAll(Map, long, TimeUnit, long, TimeUnit)}.  This method does not block on
@@ -374,7 +375,7 @@
     * @param maxIdleUnit  time unit for max idle time
     * @return a future containing a void return type
     */
-   Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+   NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
 
    /**
     * Asynchronous version of {@link #clear()}.  This method does not block on remote calls, even if your cache mode is
@@ -382,7 +383,7 @@
     *
     * @return a future containing a void return type
     */
-   Future<Void> clearAsync();
+   NotifyingFuture<Void> clearAsync();
 
    /**
     * Asynchronous version of {@link #putIfAbsent(Object, Object)}.  This method does not block on remote calls, even if
@@ -393,7 +394,7 @@
     * @param value value to store
     * @return a future containing the old value replaced.
     */
-   Future<V> putIfAbsentAsync(K key, V value);
+   NotifyingFuture<V> putIfAbsentAsync(K key, V value);
 
    /**
     * Asynchronous version of {@link #putIfAbsent(Object, Object, long, TimeUnit)} .  This method does not block on
@@ -406,7 +407,7 @@
     * @param unit     time unit for lifespan
     * @return a future containing the old value replaced
     */
-   Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit);
+   NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit);
 
    /**
     * Asynchronous version of {@link #putIfAbsent(Object, Object, long, TimeUnit, long, TimeUnit)}.  This method does
@@ -422,7 +423,7 @@
     * @param maxIdleUnit  time unit for max idle time
     * @return a future containing the old value replaced
     */
-   Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+   NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
 
    /**
     * Asynchronous version of {@link #remove(Object)}.  This method does not block on remote calls, even if your cache
@@ -431,7 +432,7 @@
     * @param key key to remove
     * @return a future containing the value removed
     */
-   Future<V> removeAsync(Object key);
+   NotifyingFuture<V> removeAsync(Object key);
 
    /**
     * Asynchronous version of {@link #remove(Object, Object)}.  This method does not block on remote calls, even if your
@@ -441,7 +442,7 @@
     * @param value value to match on
     * @return a future containing a boolean, indicating whether the entry was removed or not
     */
-   Future<Boolean> removeAsync(Object key, Object value);
+   NotifyingFuture<Boolean> removeAsync(Object key, Object value);
 
    /**
     * Asynchronous version of {@link #replace(Object, Object)}.  This method does not block on remote calls, even if
@@ -451,7 +452,7 @@
     * @param value value to store
     * @return a future containing the previous value overwritten
     */
-   Future<V> replaceAsync(K key, V value);
+   NotifyingFuture<V> replaceAsync(K key, V value);
 
    /**
     * Asynchronous version of {@link #replace(Object, Object, long, TimeUnit)}.  This method does not block on remote
@@ -464,7 +465,7 @@
     * @param unit     time unit for lifespan
     * @return a future containing the previous value overwritten
     */
-   Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit);
+   NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit);
 
    /**
     * Asynchronous version of {@link #replace(Object, Object, long, TimeUnit, long, TimeUnit)}.  This method does not
@@ -480,7 +481,7 @@
     * @param maxIdleUnit  time unit for max idle time
     * @return a future containing the previous value overwritten
     */
-   Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+   NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
 
    /**
     * Asynchronous version of {@link #replace(Object, Object, Object)}.  This method does not block on remote calls,
@@ -492,7 +493,7 @@
     * @param newValue value to store
     * @return a future containing a boolean, indicating whether the entry was replaced or not
     */
-   Future<Boolean> replaceAsync(K key, V oldValue, V newValue);
+   NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
 
    /**
     * Asynchronous version of {@link #replace(Object, Object, Object, long, TimeUnit)}.  This method does not block on
@@ -506,7 +507,7 @@
     * @param unit     time unit for lifespan
     * @return a future containing a boolean, indicating whether the entry was replaced or not
     */
-   Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit);
+   NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit);
 
    /**
     * Asynchronous version of {@link #replace(Object, Object, Object, long, TimeUnit, long, TimeUnit)}.  This method
@@ -523,7 +524,7 @@
     * @param maxIdleUnit  time unit for max idle time
     * @return a future containing a boolean, indicating whether the entry was replaced or not
     */
-   Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+   NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
 
 
    AdvancedCache<K, V> getAdvancedCache();

Deleted: trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -1,49 +0,0 @@
-package org.infinispan.distribution;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A version of the async return values for dist
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class DistAsyncReturnValue implements Future<Object> {
-   final Future<Object> invalFuture, replFuture;
-   final Object returnValue;
-
-   public DistAsyncReturnValue(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
-      this.invalFuture = invalFuture;
-      this.replFuture = replFuture;
-      this.returnValue = returnValue;
-   }
-
-   public boolean cancel(boolean mayInterruptIfRunning) {
-      boolean invalCancelled = true;
-      if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
-      return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
-   }
-
-   public boolean isCancelled() {
-      return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
-   }
-
-   public boolean isDone() {
-      return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
-   }
-
-   public Object get() throws InterruptedException, ExecutionException {
-      if (invalFuture != null) invalFuture.get();
-      replFuture.get();
-      return returnValue;
-   }
-
-   public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-      if (invalFuture != null) invalFuture.get(timeout, unit);
-      replFuture.get(timeout, unit);
-      return returnValue;
-   }
-}

Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -17,13 +17,15 @@
 import org.infinispan.context.Flag;
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.impl.TxInvocationContext;
-import org.infinispan.distribution.DistAsyncReturnValue;
 import org.infinispan.distribution.DistributionManager;
 import org.infinispan.factories.annotations.Inject;
 import org.infinispan.factories.annotations.Start;
 import org.infinispan.interceptors.base.BaseRpcInterceptor;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.util.Immutables;
+import org.infinispan.util.concurrent.AggregatingNotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -31,7 +33,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.Future;
 
 /**
  * The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
@@ -233,39 +234,37 @@
             if (ctx.isOriginLocal()) {
                List<Address> rec = recipientGenerator.generateRecipients();
                if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
-               Future<Object> f1 = null, f2;
                boolean useFuture = ctx.isUseFutureReturnType();
                boolean sync = isSynchronous(ctx);
-
+               NotifyingNotifiableFuture<Object> future = null;
                // if L1 caching is used make sure we broadcast an invalidate message
                if (isL1CacheEnabled && rec != null && rpcManager.getTransport().getMembers().size() > rec.size()) {
                   InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
-                  f1 = submitRpc(null, ic, sync, useFuture);
+                  if (useFuture) {
+                     future = new AggregatingNotifyingFutureImpl(returnValue, 2);
+                     rpcManager.broadcastRpcCommandInFuture(ic, future);
+                  } else {
+                     rpcManager.broadcastRpcCommand(ic, sync);
+                  }
                }
-               f2 = submitRpc(rec, command, sync, useFuture);
 
-               if (f2 != null) return new DistAsyncReturnValue(f1, f2, returnValue);
+               if (useFuture) {
+                  if (future == null) future = new NotifyingFutureImpl(returnValue);
+                  rpcManager.anycastRpcCommandInFuture(rec, command, future);
+                  return future;
+               } else {
+                  rpcManager.anycastRpcCommand(rec, command, sync);
+               }
             }
          } else {
             if (!localModeForced) {
                ((TxInvocationContext) ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
-            } else {
-               // add to list of participants
             }
          }
       }
       return returnValue;
    }
 
-   private Future<Object> submitRpc(final List<Address> recipients, final WriteCommand cmd, final boolean sync, boolean useFuture) {
-      if (useFuture) {
-         return rpcManager.anycastRpcCommandInFuture(recipients, cmd);
-      } else {
-         rpcManager.anycastRpcCommand(recipients, cmd, sync);
-         return null;
-      }
-   }
-
    interface RecipientGenerator {
       List<Address> generateRecipients();
 

Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -21,7 +21,6 @@
  */
 package org.infinispan.interceptors;
 
-import org.infinispan.AsyncReturnValue;
 import org.infinispan.commands.AbstractVisitor;
 import org.infinispan.commands.CommandsFactory;
 import org.infinispan.commands.VisitableCommand;
@@ -41,6 +40,8 @@
 import org.infinispan.jmx.annotations.ManagedAttribute;
 import org.infinispan.jmx.annotations.ManagedOperation;
 import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
@@ -195,7 +196,9 @@
             log.debug("Cache [" + rpcManager.getTransport().getAddress() + "] replicating " + command);
          // voila, invalidated!
          if (useFuture) {
-            return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), retvalForFuture);
+            NotifyingNotifiableFuture<Object> future = new NotifyingFutureImpl(retvalForFuture);
+            rpcManager.broadcastRpcCommandInFuture(command, future);
+            return future;
          } else {
             rpcManager.broadcastRpcCommand(command, synchronous);
          }

Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -21,7 +21,6 @@
  */
 package org.infinispan.interceptors;
 
-import org.infinispan.AsyncReturnValue;
 import org.infinispan.commands.LockControlCommand;
 import org.infinispan.commands.tx.CommitCommand;
 import org.infinispan.commands.tx.PrepareCommand;
@@ -36,6 +35,8 @@
 import org.infinispan.context.InvocationContext;
 import org.infinispan.context.impl.TxInvocationContext;
 import org.infinispan.interceptors.base.BaseRpcInterceptor;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
 /**
  * Takes care of replicating modifications to other caches in a cluster. Also listens for prepare(), commit() and
@@ -116,7 +117,9 @@
       final Object returnValue = invokeNextInterceptor(ctx, command);
       if (!isLocalModeForced(ctx) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
          if (ctx.isUseFutureReturnType()) {
-            return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), returnValue);
+            NotifyingNotifiableFuture<Object> future = new NotifyingFutureImpl(returnValue);
+            rpcManager.broadcastRpcCommandInFuture(command, future);
+            return future;
          } else {
             rpcManager.broadcastRpcCommand(command, isSynchronous(ctx));
          }

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -27,9 +27,9 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
 import org.infinispan.statetransfer.StateTransferException;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 
 import java.util.List;
-import java.util.concurrent.Future;
 
 /**
  * Provides a mechanism for communicating with other caches in the cluster, by formatting and passing requests down to
@@ -119,10 +119,10 @@
     * is passed to the transport executor and a Future is returned.  The transport always deals with this
     * synchronously.
     *
-    * @param rpc command to execute remotely
-    * @return a future
+    * @param rpc    command to execute remotely
+    * @param future the future which will be passed back to the user
     */
-   Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc);
+   void broadcastRpcCommandInFuture(ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future);
 
    /**
     * The same as {@link #broadcastRpcCommand(org.infinispan.commands.ReplicableCommand, boolean, boolean)} except that
@@ -131,9 +131,9 @@
     *
     * @param rpc              command to execute remotely
     * @param usePriorityQueue if true, a priority queue is used
-    * @return a future
+    * @param future           the future which will be passed back to the user
     */
-   Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue);
+   void broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future);
 
    /**
     * Broadcasts an RPC command to a specified set of recipients
@@ -164,9 +164,9 @@
     *
     * @param recipients recipients to invoke remote call on
     * @param rpc        command to execute remotely
-    * @return a future
+    * @param future     the future which will be passed back to the user
     */
-   Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc);
+   void anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future);
 
    /**
     * The same as {@link #anycastRpcCommand(java.util.List, org.infinispan.commands.ReplicableCommand, boolean)} except
@@ -176,9 +176,9 @@
     * @param recipients       recipients to invoke remote call on
     * @param rpc              command to execute remotely
     * @param usePriorityQueue if true, a priority queue is used
-    * @return a future
+    * @param future           the future which will be passed back to the user
     */
-   Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue);
+   void anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future);
 
    /**
     * @return a reference to the underlying transport.

Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -18,6 +18,7 @@
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
 import org.infinispan.statetransfer.StateTransferException;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -27,7 +28,6 @@
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -180,12 +180,12 @@
       }
    }
 
-   public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc) {
-      return broadcastRpcCommandInFuture(rpc, false);
+   public final void broadcastRpcCommandInFuture(ReplicableCommand rpc, NotifyingNotifiableFuture<Object> l) {
+      broadcastRpcCommandInFuture(rpc, false, l);
    }
 
-   public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue) {
-      return anycastRpcCommandInFuture(null, rpc, usePriorityQueue);
+   public final void broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> l) {
+      anycastRpcCommandInFuture(null, rpc, usePriorityQueue, l);
    }
 
    public final void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException {
@@ -219,18 +219,19 @@
       }
    }
 
-   public final Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc) {
-      return anycastRpcCommandInFuture(recipients, rpc, false);
+   public final void anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> l) {
+      anycastRpcCommandInFuture(recipients, rpc, false, l);
    }
 
-   public final Future<Object> anycastRpcCommandInFuture(final List<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue) {
+   public final void anycastRpcCommandInFuture(final List<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l) {
       Callable<Object> c = new Callable<Object>() {
          public Object call() {
             anycastRpcCommand(recipients, rpc, true, usePriorityQueue);
+            l.notifyDone();
             return null;
          }
       };
-      return asyncExecutor.submit(c);
+      l.setNetworkFuture(asyncExecutor.submit(c));
    }
 
    public Transport getTransport() {

Added: trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,69 @@
+package org.infinispan.util.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * For use with > 1 underlying network future
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class AggregatingNotifyingFutureImpl extends NotifyingFutureImpl {
+   final List<Future<Object>> futures;
+   final AtomicInteger awaitingCompletions = new AtomicInteger();
+
+   public AggregatingNotifyingFutureImpl(Object actualReturnValue, int maxFutures) {
+      super(actualReturnValue);
+      futures = new ArrayList<Future<Object>>(maxFutures);
+      awaitingCompletions.set(maxFutures);
+   }
+
+   @Override
+   public void setNetworkFuture(Future<Object> future) {
+      futures.add(future);
+   }
+
+   @Override
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      boolean aggregateValue = false;
+      for (Future<Object> f : futures) aggregateValue = f.cancel(mayInterruptIfRunning) && aggregateValue;
+      return aggregateValue;
+   }
+
+   @Override
+   public boolean isCancelled() {
+      for (Future<Object> f : futures) if (f.isCancelled()) return true;
+      return false;
+   }
+
+   @Override
+   public boolean isDone() {
+      for (Future<Object> f : futures) if (!f.isDone()) return false;
+      return true;
+   }
+
+   @Override
+   public Object get() throws InterruptedException, ExecutionException {
+      for (Future<Object> f : futures) f.get();
+      return actualReturnValue;
+   }
+
+   @Override
+   public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+      for (Future<Object> f : futures) f.get(timeout, unit);
+      return actualReturnValue;
+   }
+
+   @Override
+   public void notifyDone() {
+      if (awaitingCompletions.decrementAndGet() == 0) {
+         callCompleted = true;
+         for (FutureListener<Object> l : listeners) l.futureDone(this);
+      }
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,18 @@
+package org.infinispan.util.concurrent;
+
+import java.util.concurrent.Future;
+
+/**
+ * A listener that is called back when a future is done.  FutureListener instances are attached to {@link
+ * NotifyingFuture}s by passing them in to {@link NotifyingFuture#attachListener(FutureListener)}
+ * <p/>
+ * Note that the {@link #futureDone(Future)} callback is invoked when the future completes, regardless of how the future
+ * completes (i.e., normally, due to an exception, or cancelled}.  As such, implementations should check the future
+ * passed in by calling <tt>future.get()</tt>.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public interface FutureListener<T> {
+   void futureDone(Future<T> future);
+}


Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,29 @@
+package org.infinispan.util.concurrent;
+
+import java.util.concurrent.Future;
+
+/**
+ * A sub-interface of a Future, that allows for listeners to be attached so that observers can be notified of when the
+ * future completes.
+ * <p/>
+ * See {@link FutureListener} for more details.
+ * <p/>
+ * {@link #attachListener(FutureListener)} returns the same future instance, which is useful for 'building' a future.
+ * E.g.,
+ * <p/>
+ * <code> Future<Void> f = cache.clearAsync().attachListener(new MyCustomListener()); </code>
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public interface NotifyingFuture<T> extends Future<T> {
+
+   /**
+    * Attaches a listener and returns the same future instance, to allow for 'building'.
+    *
+    * @param listener listener to attach
+    * @return the same future instance
+    */
+   NotifyingFuture<T> attachListener(FutureListener<T> listener);
+
+}


Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,67 @@
+package org.infinispan.util.concurrent;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Constructs an instance of a {@link org.infinispan.util.concurrent.NotifyingFuture}.
+ * <p/>
+ * Typical usage:
+ * <p/>
+ * <code> Object retval = .... // do some work here NotifyingFuture nf = new NotifyingFutureImpl(retval);
+ * rpcManager.broadcastRpcCommandInFuture(nf, command); return nf; </code>
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class NotifyingFutureImpl implements NotifyingNotifiableFuture<Object> {
+
+   final Object actualReturnValue;
+   volatile Future<Object> ioFuture;
+   volatile boolean callCompleted = false;
+   final Set<FutureListener<Object>> listeners = new CopyOnWriteArraySet<FutureListener<Object>>();
+
+   public NotifyingFutureImpl(Object actualReturnValue) {
+      this.actualReturnValue = actualReturnValue;
+   }
+
+   public void setNetworkFuture(Future<Object> future) {
+      this.ioFuture = future;
+   }
+
+   public boolean cancel(boolean mayInterruptIfRunning) {
+      return ioFuture.cancel(mayInterruptIfRunning);
+   }
+
+   public boolean isCancelled() {
+      return ioFuture.isCancelled();
+   }
+
+   public boolean isDone() {
+      return ioFuture.isDone();
+   }
+
+   public Object get() throws InterruptedException, ExecutionException {
+      ioFuture.get();
+      return actualReturnValue;
+   }
+
+   public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+      ioFuture.get(timeout, unit);
+      return actualReturnValue;
+   }
+
+   public void notifyDone() {
+      callCompleted = true;
+      for (FutureListener<Object> l : listeners) l.futureDone(this);
+   }
+
+   public NotifyingFuture<Object> attachListener(FutureListener<Object> objectFutureListener) {
+      if (!callCompleted) listeners.add(objectFutureListener);
+      if (callCompleted) objectFutureListener.futureDone(this);
+      return this;
+   }
+}
\ No newline at end of file


Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java	2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,15 @@
+package org.infinispan.util.concurrent;
+
+import java.util.concurrent.Future;
+
+/**
+ * An internal interface which adds the ability to inform the future of completion.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public interface NotifyingNotifiableFuture<Object> extends NotifyingFuture<Object> {
+   void notifyDone();
+
+   void setNetworkFuture(Future<java.lang.Object> future);
+}


Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF




More information about the infinispan-commits mailing list