[infinispan-commits] Infinispan SVN: r328 - in trunk/core/src/main/java/org/infinispan: distribution and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed May 20 11:18:23 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-20 11:18:23 -0400 (Wed, 20 May 2009)
New Revision: 328
Added:
trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java
trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java
trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java
trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java
trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java
Removed:
trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
Modified:
trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java
trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java
trunk/core/src/main/java/org/infinispan/AdvancedCache.java
trunk/core/src/main/java/org/infinispan/Cache.java
trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Log:
Async map code
Modified: trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AbstractDelegatingAdvancedCache.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -8,11 +8,11 @@
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -130,51 +130,51 @@
return cache.replace(k, oV, nV, lifespan, lifespanUnit, maxIdle, maxIdleUnit, flags);
}
- public Future<V> putAsync(K key, V value, Flag... flags) {
+ public NotifyingFuture<V> putAsync(K key, V value, Flag... flags) {
return cache.putAsync(key, value, flags);
}
- public Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
+ public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
return cache.putAsync(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
}
- public Future<V> putIfAbsentAsync(K key, V value, Flag... flags) {
+ public NotifyingFuture<V> putIfAbsentAsync(K key, V value, Flag... flags) {
return cache.putIfAbsentAsync(key, value, flags);
}
- public Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
+ public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
return cache.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
}
- public Future<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags) {
+ public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags) {
return cache.putAllAsync(map, flags);
}
- public Future<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
+ public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags) {
return cache.putAllAsync(map, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit, flags);
}
- public Future<V> removeAsync(Object key, Flag... flags) {
+ public NotifyingFuture<V> removeAsync(Object key, Flag... flags) {
return cache.removeAsync(key, flags);
}
- public Future<Void> clearAsync(Flag... flags) {
+ public NotifyingFuture<Void> clearAsync(Flag... flags) {
return cache.clearAsync(flags);
}
- public Future<V> replaceAsync(K k, V v, Flag... flags) {
+ public NotifyingFuture<V> replaceAsync(K k, V v, Flag... flags) {
return cache.replaceAsync(k, v, flags);
}
- public Future<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags) {
+ public NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags) {
return cache.replaceAsync(k, oV, nV, flags);
}
- public Future<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
+ public NotifyingFuture<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
return cache.replaceAsync(k, v, lifespan, lifespanUnit, maxIdle, maxIdleUnit, flags);
}
- public Future<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
+ public NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags) {
return cache.replaceAsync(k, oV, nV, lifespan, lifespanUnit, maxIdle, maxIdleUnit, flags);
}
@@ -186,78 +186,6 @@
return cache.get(key, flags);
}
- public Future<V> putAsync(K key, V value) {
- return cache.putAsync(key, value);
- }
-
- public Future<V> putAsync(K key, V value, long lifespan, TimeUnit unit) {
- return cache.putAsync(key, value, lifespan, unit);
- }
-
- public Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
- return cache.putAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
- }
-
- public Future<Void> putAllAsync(Map<? extends K, ? extends V> data) {
- return cache.putAllAsync(data);
- }
-
- public Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit) {
- return cache.putAllAsync(data, lifespan, unit);
- }
-
- public Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
- return cache.putAllAsync(data, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
- }
-
- public Future<Void> clearAsync() {
- return cache.clearAsync();
- }
-
- public Future<V> putIfAbsentAsync(K key, V value) {
- return cache.putIfAbsentAsync(key, value);
- }
-
- public Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit) {
- return cache.putIfAbsentAsync(key, value, lifespan, unit);
- }
-
- public Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
- return cache.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
- }
-
- public Future<V> removeAsync(Object key) {
- return cache.removeAsync(key);
- }
-
- public Future<Boolean> removeAsync(Object key, Object value) {
- return cache.removeAsync(key, value);
- }
-
- public Future<V> replaceAsync(K key, V value) {
- return cache.replaceAsync(key, value);
- }
-
- public Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit) {
- return cache.replaceAsync(key, value, lifespan, unit);
- }
-
- public Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
- return cache.replaceAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
- }
-
- public Future<Boolean> replaceAsync(K key, V oldValue, V newValue) {
- return cache.replaceAsync(key, oldValue, newValue);
- }
-
- public Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit) {
- return cache.replaceAsync(key, oldValue, newValue, lifespan, unit);
- }
-
- public Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
- return cache.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
- }
-
public void lock(K key) {
cache.lock(key);
}
Modified: trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AbstractDelegatingCache.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -3,22 +3,23 @@
import org.infinispan.config.Configuration;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.manager.CacheManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-/**
- * This is a convenient base class for implementing a cache delegate. The only constructor takes a {@link Cache} argument, to
- * which each method call is delegated. One can extend this class and override the method sub-set it is interested in.
- * There is also an similar implmentation for {@link org.infinispan.AdvancedCache}:
- * {@link org.infinispan.AbstractDelegatingAdvancedCache}.
+/**
+ * This is a convenient base class for implementing a cache delegate. The only constructor takes a {@link Cache}
+ * argument, to which each method call is delegated. One can extend this class and override the method sub-set it is
+ * interested in. There is also an similar implmentation for {@link org.infinispan.AdvancedCache}: {@link
+ * org.infinispan.AbstractDelegatingAdvancedCache}.
*
+ * @author Mircea.Markus at jboss.com
* @see org.infinispan.AbstractDelegatingAdvancedCache
- * @author Mircea.Markus at jboss.com
*/
-public abstract class AbstractDelegatingCache<K, V> implements Cache<K, V> {
+public class AbstractDelegatingCache<K, V> implements Cache<K, V> {
private Cache<K, V> cache;
@@ -98,6 +99,78 @@
return cache.replace(key, oldValue, value, lifespan, lifespanUnit, maxIdleTime, maxIdleTimeUnit);
}
+ public NotifyingFuture<V> putAsync(K key, V value) {
+ return cache.putAsync(key, value);
+ }
+
+ public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit unit) {
+ return cache.putAsync(key, value, lifespan, unit);
+ }
+
+ public NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return cache.putAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+ }
+
+ public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data) {
+ return cache.putAllAsync(data);
+ }
+
+ public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit) {
+ return cache.putAllAsync(data, lifespan, unit);
+ }
+
+ public NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return cache.putAllAsync(data, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+ }
+
+ public NotifyingFuture<Void> clearAsync() {
+ return cache.clearAsync();
+ }
+
+ public NotifyingFuture<V> putIfAbsentAsync(K key, V value) {
+ return cache.putIfAbsentAsync(key, value);
+ }
+
+ public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit) {
+ return cache.putIfAbsentAsync(key, value, lifespan, unit);
+ }
+
+ public NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return cache.putIfAbsentAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+ }
+
+ public NotifyingFuture<V> removeAsync(Object key) {
+ return cache.removeAsync(key);
+ }
+
+ public NotifyingFuture<Boolean> removeAsync(Object key, Object value) {
+ return cache.removeAsync(key, value);
+ }
+
+ public NotifyingFuture<V> replaceAsync(K key, V value) {
+ return cache.replaceAsync(key, value);
+ }
+
+ public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit) {
+ return cache.replaceAsync(key, value, lifespan, unit);
+ }
+
+ public NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return cache.replaceAsync(key, value, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+ }
+
+ public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue) {
+ return cache.replaceAsync(key, oldValue, newValue);
+ }
+
+ public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit) {
+ return cache.replaceAsync(key, oldValue, newValue, lifespan, unit);
+ }
+
+ public NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit) {
+ return cache.replaceAsync(key, oldValue, newValue, lifespan, lifespanUnit, maxIdle, maxIdleUnit);
+ }
+
public AdvancedCache<K, V> getAdvancedCache() {
return cache.getAdvancedCache();
}
Modified: trunk/core/src/main/java/org/infinispan/AdvancedCache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AdvancedCache.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AdvancedCache.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -8,11 +8,11 @@
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.remoting.rpc.RpcManager;
+import org.infinispan.util.concurrent.NotifyingFuture;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -123,29 +123,29 @@
// -- async methods --
- Future<V> putAsync(K key, V value, Flag... flags);
+ NotifyingFuture<V> putAsync(K key, V value, Flag... flags);
- Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
+ NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
- Future<V> putIfAbsentAsync(K key, V value, Flag... flags);
+ NotifyingFuture<V> putIfAbsentAsync(K key, V value, Flag... flags);
- Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
+ NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
- Future<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags);
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, Flag... flags);
- Future<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> map, long lifespan, TimeUnit lifespanUnit, long maxIdleTime, TimeUnit maxIdleTimeUnit, Flag... flags);
- Future<V> removeAsync(Object key, Flag... flags);
+ NotifyingFuture<V> removeAsync(Object key, Flag... flags);
- Future<Void> clearAsync(Flag... flags);
+ NotifyingFuture<Void> clearAsync(Flag... flags);
- Future<V> replaceAsync(K k, V v, Flag... flags);
+ NotifyingFuture<V> replaceAsync(K k, V v, Flag... flags);
- Future<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags);
+ NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, Flag... flags);
- Future<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
+ NotifyingFuture<V> replaceAsync(K k, V v, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
- Future<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
+ NotifyingFuture<Boolean> replaceAsync(K k, V oV, V nV, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit, Flag... flags);
boolean containsKey(Object key, Flag... flags);
Deleted: trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/AsyncReturnValue.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -1,44 +0,0 @@
-package org.infinispan;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Wraps up return values for the asunc API
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class AsyncReturnValue implements Future<Object> {
- final Future<Object> networkCallFuture;
- final Object actualReturnValue;
-
- public AsyncReturnValue(Future<Object> networkCallFuture, Object actualReturnValue) {
- this.networkCallFuture = networkCallFuture;
- this.actualReturnValue = actualReturnValue;
- }
-
- public boolean cancel(boolean mayInterruptIfRunning) {
- return networkCallFuture.cancel(mayInterruptIfRunning);
- }
-
- public boolean isCancelled() {
- return networkCallFuture.isCancelled();
- }
-
- public boolean isDone() {
- return networkCallFuture.isDone();
- }
-
- public Object get() throws InterruptedException, ExecutionException {
- networkCallFuture.get();
- return actualReturnValue;
- }
-
- public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- networkCallFuture.get(timeout, unit);
- return actualReturnValue;
- }
-}
Modified: trunk/core/src/main/java/org/infinispan/Cache.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/Cache.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/Cache.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -28,10 +28,10 @@
import org.infinispan.manager.CacheManager;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.notifications.Listenable;
+import org.infinispan.util.concurrent.NotifyingFuture;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@@ -54,15 +54,16 @@
* <h3>Asynchronous operations</h3> Cache also supports the use of "async" remote operations. Note that these methods
* only really make sense if you are using a clustered cache. I.e., when used in LOCAL mode, these "async" operations
* offer no benefit whatsoever. These methods, such as {@link #putAsync(Object, Object)} offer the best of both worlds
- * between a fully synchronous and a fully asynchronous cache in that a {@link Future} is returned. The <tt>Future</tt>
- * can then be ignored or thrown away for typical asynchronous behaviour, or queried for synchronous behaviour, which
- * would block until any remote calls complete. Note that all remote calls are, as far as the transport is concerned,
- * synchronous. This allows you the guarantees that remote calls succeed, while not blocking your application thread
- * unnecessarily. For example, usage such as the following could benefit from the async operations:
+ * between a fully synchronous and a fully asynchronous cache in that a {@link NotifyingFuture} is returned. The
+ * <tt>NotifyingFuture</tt> can then be ignored or thrown away for typical asynchronous behaviour, or queried for
+ * synchronous behaviour, which would block until any remote calls complete. Note that all remote calls are, as far as
+ * the transport is concerned, synchronous. This allows you the guarantees that remote calls succeed, while not
+ * blocking your application thread unnecessarily. For example, usage such as the following could benefit from the
+ * async operations:
* <pre>
- * Future f1 = cache.putAsync("key1", "value1");
- * Future f2 = cache.putAsync("key2", "value2");
- * Future f3 = cache.putAsync("key3", "value3");
+ * NotifyingFuture f1 = cache.putAsync("key1", "value1");
+ * NotifyingFuture f2 = cache.putAsync("key2", "value2");
+ * NotifyingFuture f3 = cache.putAsync("key3", "value3");
* f1.get();
* f2.get();
* f3.get();
@@ -72,8 +73,8 @@
* especially advantageous if the cache uses distribution and the three keys map to different cache instances in the
* cluster.
* <p/>
- * Also, the use of async operations when within a transaction return your local value only, as expected. A Future is
- * still returned though for API consistency.
+ * Also, the use of async operations when within a transaction return your local value only, as expected. A
+ * NotifyingFuture is still returned though for API consistency.
* <p/>
* <h3>Constructing a Cache</h3> An instance of the Cache is usually obtained by using a {@link CacheManager}.
* <pre>
@@ -310,7 +311,7 @@
* @param value value to store
* @return a future containing the old value replaced.
*/
- Future<V> putAsync(K key, V value);
+ NotifyingFuture<V> putAsync(K key, V value);
/**
* Asynchronous version of {@link #put(Object, Object, long, TimeUnit)} . This method does not block on remote
@@ -323,7 +324,7 @@
* @param unit time unit for lifespan
* @return a future containing the old value replaced
*/
- Future<V> putAsync(K key, V value, long lifespan, TimeUnit unit);
+ NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit unit);
/**
* Asynchronous version of {@link #put(Object, Object, long, TimeUnit, long, TimeUnit)}. This method does not block
@@ -339,7 +340,7 @@
* @param maxIdleUnit time unit for max idle time
* @return a future containing the old value replaced
*/
- Future<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+ NotifyingFuture<V> putAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
/**
* Asynchronous version of {@link #putAll(Map)}. This method does not block on remote calls, even if your cache mode
@@ -348,7 +349,7 @@
* @param data to store
* @return a future containing a void return type
*/
- Future<Void> putAllAsync(Map<? extends K, ? extends V> data);
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data);
/**
* Asynchronous version of {@link #putAll(Map, long, TimeUnit)}. This method does not block on remote calls, even if
@@ -359,7 +360,7 @@
* @param unit time unit for lifespan
* @return a future containing a void return type
*/
- Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit);
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit unit);
/**
* Asynchronous version of {@link #putAll(Map, long, TimeUnit, long, TimeUnit)}. This method does not block on
@@ -374,7 +375,7 @@
* @param maxIdleUnit time unit for max idle time
* @return a future containing a void return type
*/
- Future<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+ NotifyingFuture<Void> putAllAsync(Map<? extends K, ? extends V> data, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
/**
* Asynchronous version of {@link #clear()}. This method does not block on remote calls, even if your cache mode is
@@ -382,7 +383,7 @@
*
* @return a future containing a void return type
*/
- Future<Void> clearAsync();
+ NotifyingFuture<Void> clearAsync();
/**
* Asynchronous version of {@link #putIfAbsent(Object, Object)}. This method does not block on remote calls, even if
@@ -393,7 +394,7 @@
* @param value value to store
* @return a future containing the old value replaced.
*/
- Future<V> putIfAbsentAsync(K key, V value);
+ NotifyingFuture<V> putIfAbsentAsync(K key, V value);
/**
* Asynchronous version of {@link #putIfAbsent(Object, Object, long, TimeUnit)} . This method does not block on
@@ -406,7 +407,7 @@
* @param unit time unit for lifespan
* @return a future containing the old value replaced
*/
- Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit);
+ NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit unit);
/**
* Asynchronous version of {@link #putIfAbsent(Object, Object, long, TimeUnit, long, TimeUnit)}. This method does
@@ -422,7 +423,7 @@
* @param maxIdleUnit time unit for max idle time
* @return a future containing the old value replaced
*/
- Future<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+ NotifyingFuture<V> putIfAbsentAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
/**
* Asynchronous version of {@link #remove(Object)}. This method does not block on remote calls, even if your cache
@@ -431,7 +432,7 @@
* @param key key to remove
* @return a future containing the value removed
*/
- Future<V> removeAsync(Object key);
+ NotifyingFuture<V> removeAsync(Object key);
/**
* Asynchronous version of {@link #remove(Object, Object)}. This method does not block on remote calls, even if your
@@ -441,7 +442,7 @@
* @param value value to match on
* @return a future containing a boolean, indicating whether the entry was removed or not
*/
- Future<Boolean> removeAsync(Object key, Object value);
+ NotifyingFuture<Boolean> removeAsync(Object key, Object value);
/**
* Asynchronous version of {@link #replace(Object, Object)}. This method does not block on remote calls, even if
@@ -451,7 +452,7 @@
* @param value value to store
* @return a future containing the previous value overwritten
*/
- Future<V> replaceAsync(K key, V value);
+ NotifyingFuture<V> replaceAsync(K key, V value);
/**
* Asynchronous version of {@link #replace(Object, Object, long, TimeUnit)}. This method does not block on remote
@@ -464,7 +465,7 @@
* @param unit time unit for lifespan
* @return a future containing the previous value overwritten
*/
- Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit);
+ NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit unit);
/**
* Asynchronous version of {@link #replace(Object, Object, long, TimeUnit, long, TimeUnit)}. This method does not
@@ -480,7 +481,7 @@
* @param maxIdleUnit time unit for max idle time
* @return a future containing the previous value overwritten
*/
- Future<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+ NotifyingFuture<V> replaceAsync(K key, V value, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
/**
* Asynchronous version of {@link #replace(Object, Object, Object)}. This method does not block on remote calls,
@@ -492,7 +493,7 @@
* @param newValue value to store
* @return a future containing a boolean, indicating whether the entry was replaced or not
*/
- Future<Boolean> replaceAsync(K key, V oldValue, V newValue);
+ NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue);
/**
* Asynchronous version of {@link #replace(Object, Object, Object, long, TimeUnit)}. This method does not block on
@@ -506,7 +507,7 @@
* @param unit time unit for lifespan
* @return a future containing a boolean, indicating whether the entry was replaced or not
*/
- Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit);
+ NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit unit);
/**
* Asynchronous version of {@link #replace(Object, Object, Object, long, TimeUnit, long, TimeUnit)}. This method
@@ -523,7 +524,7 @@
* @param maxIdleUnit time unit for max idle time
* @return a future containing a boolean, indicating whether the entry was replaced or not
*/
- Future<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
+ NotifyingFuture<Boolean> replaceAsync(K key, V oldValue, V newValue, long lifespan, TimeUnit lifespanUnit, long maxIdle, TimeUnit maxIdleUnit);
AdvancedCache<K, V> getAdvancedCache();
Deleted: trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistAsyncReturnValue.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -1,49 +0,0 @@
-package org.infinispan.distribution;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A version of the async return values for dist
- *
- * @author Manik Surtani
- * @since 4.0
- */
-public class DistAsyncReturnValue implements Future<Object> {
- final Future<Object> invalFuture, replFuture;
- final Object returnValue;
-
- public DistAsyncReturnValue(Future<Object> invalFuture, Future<Object> replFuture, Object returnValue) {
- this.invalFuture = invalFuture;
- this.replFuture = replFuture;
- this.returnValue = returnValue;
- }
-
- public boolean cancel(boolean mayInterruptIfRunning) {
- boolean invalCancelled = true;
- if (invalFuture != null) invalCancelled = invalFuture.cancel(mayInterruptIfRunning);
- return replFuture.cancel(mayInterruptIfRunning) && invalCancelled;
- }
-
- public boolean isCancelled() {
- return replFuture.isCancelled() && (invalFuture == null || invalFuture.isCancelled());
- }
-
- public boolean isDone() {
- return replFuture.isDone() && (invalFuture == null || invalFuture.isDone());
- }
-
- public Object get() throws InterruptedException, ExecutionException {
- if (invalFuture != null) invalFuture.get();
- replFuture.get();
- return returnValue;
- }
-
- public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- if (invalFuture != null) invalFuture.get(timeout, unit);
- replFuture.get(timeout, unit);
- return returnValue;
- }
-}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/interceptors/DistributionInterceptor.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -17,13 +17,15 @@
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
-import org.infinispan.distribution.DistAsyncReturnValue;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.Immutables;
+import org.infinispan.util.concurrent.AggregatingNotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import java.util.ArrayList;
import java.util.Collection;
@@ -31,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Future;
/**
* The interceptor that handles distribution of entries across a cluster, as well as transparent lookup
@@ -233,39 +234,37 @@
if (ctx.isOriginLocal()) {
List<Address> rec = recipientGenerator.generateRecipients();
if (trace) log.trace("Invoking command {0} on hosts {1}", command, rec);
- Future<Object> f1 = null, f2;
boolean useFuture = ctx.isUseFutureReturnType();
boolean sync = isSynchronous(ctx);
-
+ NotifyingNotifiableFuture<Object> future = null;
// if L1 caching is used make sure we broadcast an invalidate message
if (isL1CacheEnabled && rec != null && rpcManager.getTransport().getMembers().size() > rec.size()) {
InvalidateCommand ic = cf.buildInvalidateFromL1Command(recipientGenerator.getKeys());
- f1 = submitRpc(null, ic, sync, useFuture);
+ if (useFuture) {
+ future = new AggregatingNotifyingFutureImpl(returnValue, 2);
+ rpcManager.broadcastRpcCommandInFuture(ic, future);
+ } else {
+ rpcManager.broadcastRpcCommand(ic, sync);
+ }
}
- f2 = submitRpc(rec, command, sync, useFuture);
- if (f2 != null) return new DistAsyncReturnValue(f1, f2, returnValue);
+ if (useFuture) {
+ if (future == null) future = new NotifyingFutureImpl(returnValue);
+ rpcManager.anycastRpcCommandInFuture(rec, command, future);
+ return future;
+ } else {
+ rpcManager.anycastRpcCommand(rec, command, sync);
+ }
}
} else {
if (!localModeForced) {
((TxInvocationContext) ctx).addTransactionParticipants(recipientGenerator.generateRecipients());
- } else {
- // add to list of participants
}
}
}
return returnValue;
}
- private Future<Object> submitRpc(final List<Address> recipients, final WriteCommand cmd, final boolean sync, boolean useFuture) {
- if (useFuture) {
- return rpcManager.anycastRpcCommandInFuture(recipients, cmd);
- } else {
- rpcManager.anycastRpcCommand(recipients, cmd, sync);
- return null;
- }
- }
-
interface RecipientGenerator {
List<Address> generateRecipients();
Modified: trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/interceptors/InvalidationInterceptor.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -21,7 +21,6 @@
*/
package org.infinispan.interceptors;
-import org.infinispan.AsyncReturnValue;
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.VisitableCommand;
@@ -41,6 +40,8 @@
import org.infinispan.jmx.annotations.ManagedAttribute;
import org.infinispan.jmx.annotations.ManagedOperation;
import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
@@ -195,7 +196,9 @@
log.debug("Cache [" + rpcManager.getTransport().getAddress() + "] replicating " + command);
// voila, invalidated!
if (useFuture) {
- return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), retvalForFuture);
+ NotifyingNotifiableFuture<Object> future = new NotifyingFutureImpl(retvalForFuture);
+ rpcManager.broadcastRpcCommandInFuture(command, future);
+ return future;
} else {
rpcManager.broadcastRpcCommand(command, synchronous);
}
Modified: trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/interceptors/ReplicationInterceptor.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -21,7 +21,6 @@
*/
package org.infinispan.interceptors;
-import org.infinispan.AsyncReturnValue;
import org.infinispan.commands.LockControlCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
@@ -36,6 +35,8 @@
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
+import org.infinispan.util.concurrent.NotifyingFutureImpl;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
/**
* Takes care of replicating modifications to other caches in a cluster. Also listens for prepare(), commit() and
@@ -116,7 +117,9 @@
final Object returnValue = invokeNextInterceptor(ctx, command);
if (!isLocalModeForced(ctx) && command.isSuccessful() && ctx.isOriginLocal() && !ctx.isInTxScope()) {
if (ctx.isUseFutureReturnType()) {
- return new AsyncReturnValue(rpcManager.broadcastRpcCommandInFuture(command), returnValue);
+ NotifyingNotifiableFuture<Object> future = new NotifyingFutureImpl(returnValue);
+ rpcManager.broadcastRpcCommandInFuture(command, future);
+ return future;
} else {
rpcManager.broadcastRpcCommand(command, isSynchronous(ctx));
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManager.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -27,9 +27,9 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferException;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import java.util.List;
-import java.util.concurrent.Future;
/**
* Provides a mechanism for communicating with other caches in the cluster, by formatting and passing requests down to
@@ -119,10 +119,10 @@
* is passed to the transport executor and a Future is returned. The transport always deals with this
* synchronously.
*
- * @param rpc command to execute remotely
- * @return a future
+ * @param rpc command to execute remotely
+ * @param future the future which will be passed back to the user
*/
- Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc);
+ void broadcastRpcCommandInFuture(ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future);
/**
* The same as {@link #broadcastRpcCommand(org.infinispan.commands.ReplicableCommand, boolean, boolean)} except that
@@ -131,9 +131,9 @@
*
* @param rpc command to execute remotely
* @param usePriorityQueue if true, a priority queue is used
- * @return a future
+ * @param future the future which will be passed back to the user
*/
- Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue);
+ void broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future);
/**
* Broadcasts an RPC command to a specified set of recipients
@@ -164,9 +164,9 @@
*
* @param recipients recipients to invoke remote call on
* @param rpc command to execute remotely
- * @return a future
+ * @param future the future which will be passed back to the user
*/
- Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc);
+ void anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> future);
/**
* The same as {@link #anycastRpcCommand(java.util.List, org.infinispan.commands.ReplicableCommand, boolean)} except
@@ -176,9 +176,9 @@
* @param recipients recipients to invoke remote call on
* @param rpc command to execute remotely
* @param usePriorityQueue if true, a priority queue is used
- * @return a future
+ * @param future the future which will be passed back to the user
*/
- Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue);
+ void anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> future);
/**
* @return a reference to the underlying transport.
Modified: trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-05-20 14:59:54 UTC (rev 327)
+++ trunk/core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -18,6 +18,7 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.StateTransferException;
+import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -27,7 +28,6 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -180,12 +180,12 @@
}
}
- public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc) {
- return broadcastRpcCommandInFuture(rpc, false);
+ public final void broadcastRpcCommandInFuture(ReplicableCommand rpc, NotifyingNotifiableFuture<Object> l) {
+ broadcastRpcCommandInFuture(rpc, false, l);
}
- public final Future<Object> broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue) {
- return anycastRpcCommandInFuture(null, rpc, usePriorityQueue);
+ public final void broadcastRpcCommandInFuture(ReplicableCommand rpc, boolean usePriorityQueue, NotifyingNotifiableFuture<Object> l) {
+ anycastRpcCommandInFuture(null, rpc, usePriorityQueue, l);
}
public final void anycastRpcCommand(List<Address> recipients, ReplicableCommand rpc, boolean sync) throws ReplicationException {
@@ -219,18 +219,19 @@
}
}
- public final Future<Object> anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc) {
- return anycastRpcCommandInFuture(recipients, rpc, false);
+ public final void anycastRpcCommandInFuture(List<Address> recipients, ReplicableCommand rpc, NotifyingNotifiableFuture<Object> l) {
+ anycastRpcCommandInFuture(recipients, rpc, false, l);
}
- public final Future<Object> anycastRpcCommandInFuture(final List<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue) {
+ public final void anycastRpcCommandInFuture(final List<Address> recipients, final ReplicableCommand rpc, final boolean usePriorityQueue, final NotifyingNotifiableFuture<Object> l) {
Callable<Object> c = new Callable<Object>() {
public Object call() {
anycastRpcCommand(recipients, rpc, true, usePriorityQueue);
+ l.notifyDone();
return null;
}
};
- return asyncExecutor.submit(c);
+ l.setNetworkFuture(asyncExecutor.submit(c));
}
public Transport getTransport() {
Added: trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,69 @@
+package org.infinispan.util.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * For use with > 1 underlying network future
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class AggregatingNotifyingFutureImpl extends NotifyingFutureImpl {
+ final List<Future<Object>> futures;
+ final AtomicInteger awaitingCompletions = new AtomicInteger();
+
+ public AggregatingNotifyingFutureImpl(Object actualReturnValue, int maxFutures) {
+ super(actualReturnValue);
+ futures = new ArrayList<Future<Object>>(maxFutures);
+ awaitingCompletions.set(maxFutures);
+ }
+
+ @Override
+ public void setNetworkFuture(Future<Object> future) {
+ futures.add(future);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean aggregateValue = false;
+ for (Future<Object> f : futures) aggregateValue = f.cancel(mayInterruptIfRunning) && aggregateValue;
+ return aggregateValue;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ for (Future<Object> f : futures) if (f.isCancelled()) return true;
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ for (Future<Object> f : futures) if (!f.isDone()) return false;
+ return true;
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ for (Future<Object> f : futures) f.get();
+ return actualReturnValue;
+ }
+
+ @Override
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+ for (Future<Object> f : futures) f.get(timeout, unit);
+ return actualReturnValue;
+ }
+
+ @Override
+ public void notifyDone() {
+ if (awaitingCompletions.decrementAndGet() == 0) {
+ callCompleted = true;
+ for (FutureListener<Object> l : listeners) l.futureDone(this);
+ }
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/AggregatingNotifyingFutureImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,18 @@
+package org.infinispan.util.concurrent;
+
+import java.util.concurrent.Future;
+
+/**
+ * A listener that is called back when a future is done. FutureListener instances are attached to {@link
+ * NotifyingFuture}s by passing them in to {@link NotifyingFuture#attachListener(FutureListener)}
+ * <p/>
+ * Note that the {@link #futureDone(Future)} callback is invoked when the future completes, regardless of how the future
+ * completes (i.e., normally, due to an exception, or cancelled}. As such, implementations should check the future
+ * passed in by calling <tt>future.get()</tt>.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public interface FutureListener<T> {
+ void futureDone(Future<T> future);
+}
Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/FutureListener.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,29 @@
+package org.infinispan.util.concurrent;
+
+import java.util.concurrent.Future;
+
+/**
+ * A sub-interface of a Future, that allows for listeners to be attached so that observers can be notified of when the
+ * future completes.
+ * <p/>
+ * See {@link FutureListener} for more details.
+ * <p/>
+ * {@link #attachListener(FutureListener)} returns the same future instance, which is useful for 'building' a future.
+ * E.g.,
+ * <p/>
+ * <code> Future<Void> f = cache.clearAsync().attachListener(new MyCustomListener()); </code>
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public interface NotifyingFuture<T> extends Future<T> {
+
+ /**
+ * Attaches a listener and returns the same future instance, to allow for 'building'.
+ *
+ * @param listener listener to attach
+ * @return the same future instance
+ */
+ NotifyingFuture<T> attachListener(FutureListener<T> listener);
+
+}
Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFuture.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,67 @@
+package org.infinispan.util.concurrent;
+
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Constructs an instance of a {@link org.infinispan.util.concurrent.NotifyingFuture}.
+ * <p/>
+ * Typical usage:
+ * <p/>
+ * <code> Object retval = .... // do some work here NotifyingFuture nf = new NotifyingFutureImpl(retval);
+ * rpcManager.broadcastRpcCommandInFuture(nf, command); return nf; </code>
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public class NotifyingFutureImpl implements NotifyingNotifiableFuture<Object> {
+
+ final Object actualReturnValue;
+ volatile Future<Object> ioFuture;
+ volatile boolean callCompleted = false;
+ final Set<FutureListener<Object>> listeners = new CopyOnWriteArraySet<FutureListener<Object>>();
+
+ public NotifyingFutureImpl(Object actualReturnValue) {
+ this.actualReturnValue = actualReturnValue;
+ }
+
+ public void setNetworkFuture(Future<Object> future) {
+ this.ioFuture = future;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return ioFuture.cancel(mayInterruptIfRunning);
+ }
+
+ public boolean isCancelled() {
+ return ioFuture.isCancelled();
+ }
+
+ public boolean isDone() {
+ return ioFuture.isDone();
+ }
+
+ public Object get() throws InterruptedException, ExecutionException {
+ ioFuture.get();
+ return actualReturnValue;
+ }
+
+ public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, java.util.concurrent.TimeoutException {
+ ioFuture.get(timeout, unit);
+ return actualReturnValue;
+ }
+
+ public void notifyDone() {
+ callCompleted = true;
+ for (FutureListener<Object> l : listeners) l.futureDone(this);
+ }
+
+ public NotifyingFuture<Object> attachListener(FutureListener<Object> objectFutureListener) {
+ if (!callCompleted) listeners.add(objectFutureListener);
+ if (callCompleted) objectFutureListener.futureDone(this);
+ return this;
+ }
+}
\ No newline at end of file
Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingFutureImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java 2009-05-20 15:18:23 UTC (rev 328)
@@ -0,0 +1,15 @@
+package org.infinispan.util.concurrent;
+
+import java.util.concurrent.Future;
+
+/**
+ * An internal interface which adds the ability to inform the future of completion.
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+public interface NotifyingNotifiableFuture<Object> extends NotifyingFuture<Object> {
+ void notifyDone();
+
+ void setNetworkFuture(Future<java.lang.Object> future);
+}
Property changes on: trunk/core/src/main/java/org/infinispan/util/concurrent/NotifyingNotifiableFuture.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list