[infinispan-commits] Infinispan SVN: r1366 - trunk/core/src/main/java/org/infinispan/loaders/decorators.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Jan 12 15:09:32 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-01-12 15:09:31 -0500 (Tue, 12 Jan 2010)
New Revision: 1366
Modified:
trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java
trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
Log:
[ISPN-327] (Allow AsyncStore to retry and be extensible)
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java 2010-01-12 16:04:05 UTC (rev 1365)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java 2010-01-12 20:09:31 UTC (rev 1366)
@@ -78,7 +78,7 @@
delegate.prepare(list, tx, isOnePhase);
}
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException {
+ public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
delegate.init(config, cache, m);
}
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-01-12 16:04:05 UTC (rev 1365)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-01-12 20:09:31 UTC (rev 1366)
@@ -78,7 +78,7 @@
private final Lock read = mapLock.readLock();
private final Lock write = mapLock.writeLock();
private int concurrencyLevel;
- @GuardedBy("mapLock") private ConcurrentMap<Object, Modification> state;
+ @GuardedBy("mapLock") protected ConcurrentMap<Object, Modification> state;
private ReleaseAllLockContainer lockContainer;
public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
@@ -87,7 +87,7 @@
}
@Override
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) throws CacheLoaderException {
+ public void init(CacheLoaderConfig config, Cache<?, ?> cache, Marshaller m) throws CacheLoaderException {
super.init(config, cache, m);
concurrencyLevel = cache == null || cache.getConfiguration() == null ? 16 : cache.getConfiguration().getConcurrencyLevel();
lockContainer = new ReleaseAllLockContainer(concurrencyLevel);
@@ -309,19 +309,30 @@
decrementAndGet(size);
if (trace) log.trace("Apply {0} modifications", size);
- put(swap);
+ int maxRetries = 3;
+ int attemptNumber = 0;
+ boolean successful;
+ do {
+ if (attemptNumber > 0 && log.isDebugEnabled()) log.debug("Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
+ successful = put(swap);
+ attemptNumber++;
+ } while (!successful && attemptNumber <= maxRetries);
+
+ if (!successful) log.warn("Unable to process some async modifications after " + maxRetries + " retries!");
+
} finally {
lockContainer.releaseLocks(lockedKeys);
lockedKeys.clear();
}
}
- void put(ConcurrentMap<Object, Modification> mods) {
+ boolean put(ConcurrentMap<Object, Modification> mods) {
try {
AsyncStore.this.applyModificationsSync(mods);
+ return true;
} catch (Exception e) {
- if (log.isWarnEnabled()) log.warn("Failed to process async modifications", e);
- if (log.isDebugEnabled()) log.debug("Exception: ", e);
+ if (log.isDebugEnabled()) log.debug("Failed to process async modifications", e);
+ return false;
}
}
}
More information about the infinispan-commits
mailing list