[infinispan-commits] Infinispan SVN: r1396 - in trunk/core/src: test/java/org/infinispan/loaders/decorators and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Tue Jan 19 10:49:32 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-01-19 10:49:32 -0500 (Tue, 19 Jan 2010)
New Revision: 1396
Modified:
trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
Log:
Updated and fixed faulty test
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-01-19 14:59:37 UTC (rev 1395)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-01-19 15:49:32 UTC (rev 1396)
@@ -1,19 +1,18 @@
package org.infinispan.loaders.decorators;
import net.jcip.annotations.GuardedBy;
-
+import org.infinispan.Cache;
import org.infinispan.CacheException;
-import org.infinispan.Cache;
-import org.infinispan.marshall.Marshaller;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
-import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.modifications.Clear;
import org.infinispan.loaders.modifications.Modification;
import org.infinispan.loaders.modifications.PurgeExpired;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
+import org.infinispan.marshall.Marshaller;
import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -38,6 +37,8 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.infinispan.loaders.modifications.Modification.Type.*;
+
/**
* The AsyncStore is a delegating CacheStore that extends AbstractDelegatingStore, overriding methods to that should not
* just delegate the operation to the underlying store.
@@ -57,7 +58,7 @@
* <p/>
*
* @author Manik Surtani
- * @author Galder Zamarreño
+ * @author Galder Zamarreño
* @since 4.0
*/
public class AsyncStore extends AbstractDelegatingStore {
@@ -66,21 +67,24 @@
private static final AtomicInteger threadId = new AtomicInteger(0);
private final AtomicBoolean stopped = new AtomicBoolean(true);
private final AsyncStoreConfig asyncStoreConfig;
-
- /** Approximate count of number of modified keys. At points, it could contain negative values. */
+
+ /**
+ * Approximate count of number of modified keys. At points, it could contain negative values.
+ */
private final AtomicInteger count = new AtomicInteger(0);
private final ReentrantLock lock = new ReentrantLock();
- private final Condition notEmpty = lock.newCondition();
+ private final Condition notEmpty = lock.newCondition();
- private ExecutorService executor;
+ ExecutorService executor;
private List<Future> processorFutures;
private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
private final Lock read = mapLock.readLock();
private final Lock write = mapLock.writeLock();
private int concurrencyLevel;
- @GuardedBy("mapLock") protected ConcurrentMap<Object, Modification> state;
+ @GuardedBy("mapLock")
+ protected ConcurrentMap<Object, Modification> state;
private ReleaseAllLockContainer lockContainer;
-
+
public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
super(delegate);
this.asyncStoreConfig = asyncStoreConfig;
@@ -115,7 +119,7 @@
PurgeExpired purge = new PurgeExpired();
enqueue(purge, purge);
}
-
+
@Override
public void start() throws CacheLoaderException {
state = newStateMap();
@@ -152,7 +156,7 @@
executor = null;
super.stop();
}
-
+
protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
Set<Map.Entry<Object, Modification>> entries = mods.entrySet();
for (Map.Entry<Object, Modification> entry : entries) {
@@ -186,7 +190,7 @@
if (trace) log.trace("Enqueuing modification {0}", mod);
Modification prev = null;
int c = -1;
- boolean unlock = false;
+ boolean unlock = false;
try {
acquireLock(read);
unlock = true;
@@ -197,7 +201,7 @@
/* Increment can happen outside the lock cos worst case scenario a false not empty would
* be sent if the swap and decrement happened between the put and the increment. In this
* case, the corresponding processor would see the map empty and would wait again. This
- * means that we're allowing count to potentially go negative but that's not a problem. */
+ * means that we're allowing count to potentially go negative but that's not a problem. */
if (prev == null) c = count.getAndIncrement();
if (c == 0) signalNotEmpty();
} catch (Exception e) {
@@ -214,39 +218,39 @@
Thread.currentThread().interrupt();
}
}
-
+
private void signalNotEmpty() {
lock.lock();
try {
- notEmpty.signal();
+ notEmpty.signal();
} finally {
- lock.unlock();
+ lock.unlock();
}
}
-
+
private void awaitNotEmpty() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
while (count.get() == 0)
- notEmpty.await();
+ notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
- }
+ }
} finally {
lock.unlock();
}
}
-
+
private int decrementAndGet(int delta) {
- for (;;) {
+ for (; ;) {
int current = count.get();
int next = current - delta;
if (count.compareAndSet(current, next)) return next;
- }
+ }
}
-
+
/**
* Processes modifications taking the latest updates from a state map.
*/
@@ -273,11 +277,11 @@
if (trace) log.trace("Remaining interrupted");
}
}
-
+
void run0() throws InterruptedException {
if (trace) log.trace("Checking for modifications");
boolean unlock = false;
-
+
try {
acquireLock(write);
unlock = true;
@@ -303,29 +307,32 @@
try {
int size = swap.size();
- if (size == 0)
+ if (swap.isEmpty()) {
awaitNotEmpty();
- else
+ } else {
decrementAndGet(size);
- if (trace) log.trace("Apply {0} modifications", size);
- int maxRetries = 3;
- int attemptNumber = 0;
- boolean successful;
- do {
- if (attemptNumber > 0 && log.isDebugEnabled()) log.debug("Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
- successful = put(swap);
- attemptNumber++;
- } while (!successful && attemptNumber <= maxRetries);
+ if (trace) log.trace("Apply {0} modifications", size);
+ int maxRetries = 3;
+ int attemptNumber = 0;
+ boolean successful;
+ do {
+ if (attemptNumber > 0 && log.isDebugEnabled())
+ log.debug("Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
+ successful = put(swap);
+ attemptNumber++;
+ } while (!successful && attemptNumber <= maxRetries);
- if (!successful) log.warn("Unable to process some async modifications after " + maxRetries + " retries!");
+ if (!successful)
+ log.warn("Unable to process some async modifications after " + maxRetries + " retries!");
+ }
} finally {
lockContainer.releaseLocks(lockedKeys);
lockedKeys.clear();
}
}
-
+
boolean put(ConcurrentMap<Object, Modification> mods) {
try {
AsyncStore.this.applyModificationsSync(mods);
Modified: trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-01-19 14:59:37 UTC (rev 1395)
+++ trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-01-19 15:49:32 UTC (rev 1396)
@@ -11,7 +11,9 @@
import org.infinispan.test.TestingUtil;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -21,7 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
- at Test(groups = "unit", testName = "loaders.decorators.AsyncTest", enabled = false)
+ at Test(groups = "unit", testName = "loaders.decorators.AsyncTest")
public class AsyncTest extends AbstractInfinispanTest {
private static final Log log = LogFactory.getLog(AsyncTest.class);
AsyncStore store;
@@ -30,7 +32,7 @@
AsyncStoreConfig asyncConfig;
DummyInMemoryCacheStore.Cfg dummyCfg;
- @BeforeTest
+ @BeforeMethod
public void setUp() throws CacheLoaderException {
underlying = new DummyInMemoryCacheStore();
asyncConfig = new AsyncStoreConfig();
@@ -43,7 +45,7 @@
asyncExecutor = (ExecutorService) TestingUtil.extractField(store, "executor");
}
- @AfterTest
+ @AfterMethod
public void tearDown() throws CacheLoaderException {
if (store != null) store.stop();
}
@@ -55,7 +57,7 @@
doTestPut(number, key, value);
doTestRemove(number, key);
}
-
+
public void testPutClearPut() throws Exception {
final int number = 1000;
String key = "testPutClearPut-k-";
@@ -64,7 +66,7 @@
doTestClear(number, key);
value = "testPutClearPut-v[2]-";
doTestPut(number, key, value);
-
+
doTestRemove(number, key);
}
@@ -83,7 +85,7 @@
String key = "testRestrictionOnAddingToAsyncQueue-k";
String value = "testRestrictionOnAddingToAsyncQueue-v-";
doTestPut(number, key, value);
-
+
// stop the cache store
store.stop();
try {
@@ -99,35 +101,41 @@
}
public void testThreadSafetyWritingDiffValuesForKey(Method m) throws Exception {
- final String key = "k1";
- final CountDownLatch v1Latch = new CountDownLatch(1);
- final CountDownLatch v2Latch = new CountDownLatch(1);
- final CountDownLatch endLatch = new CountDownLatch(1);
- DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
- store = new MockAsyncStore(key, v1Latch, v2Latch, endLatch, underlying, asyncConfig);
- dummyCfg = new DummyInMemoryCacheStore.Cfg();
- dummyCfg.setStore(m.getName());
- store.init(dummyCfg, null, null);
- store.start();
-
- store.store(InternalEntryFactory.create(key, "v1"));
- v2Latch.await();
- store.store(InternalEntryFactory.create(key, "v2"));
- endLatch.await();
+ try {
+ final String key = "k1";
+ final CountDownLatch v1Latch = new CountDownLatch(1);
+ final CountDownLatch v2Latch = new CountDownLatch(1);
+ final CountDownLatch endLatch = new CountDownLatch(1);
+ DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+ store = new MockAsyncStore(key, v1Latch, v2Latch, endLatch, underlying, asyncConfig);
+ dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg.setStore(m.getName());
+ store.init(dummyCfg, null, null);
+ store.start();
- assert store.load(key).getValue().equals("v2");
+ store.store(InternalEntryFactory.create(key, "v1"));
+ v2Latch.await();
+ store.store(InternalEntryFactory.create(key, "v2"));
+ endLatch.await();
+
+ assert store.load(key).getValue().equals("v2");
+ } finally {
+ store.delegate.clear();
+ store.stop();
+ store = null;
+ }
}
private void doTestPut(int number, String key, String value) throws Exception {
for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
-
+
TestingUtil.sleepRandom(1000);
InternalCacheEntry[] entries = new InternalCacheEntry[number];
for (int i = 0; i < number; i++) {
entries[i] = store.load(key + i);
}
-
+
for (int i = 0; i < number; i++) {
InternalCacheEntry entry = entries[i];
if (entry != null) {
@@ -144,17 +152,18 @@
}
}
}
-
+
private void doTestSameKeyPut(int number, String key, String value) throws Exception {
for (int i = 0; i < number; i++)
store.store(InternalEntryFactory.create(key, value + i));
+ TestingUtil.sleepThread(5000);
InternalCacheEntry entry;
boolean success = false;
for (int i = 0; i < 120; i++) {
TestingUtil.sleepRandom(1000);
entry = store.load(key);
- success = entry.getValue().equals(value + (number-1));
+ success = entry.getValue().equals(value + (number - 1));
if (success) break;
}
assert success;
@@ -162,14 +171,14 @@
private void doTestRemove(int number, String key) throws Exception {
for (int i = 0; i < number; i++) store.remove(key + i);
-
+
TestingUtil.sleepRandom(1000);
InternalCacheEntry[] entries = new InternalCacheEntry[number];
for (int i = 0; i < number; i++) {
entries[i] = store.load(key + i);
}
-
+
for (int i = 0; i < number; i++) {
InternalCacheEntry entry = entries[i];
while (entry != null) {
@@ -179,7 +188,7 @@
}
}
}
-
+
private void doTestSameKeyRemove(String key) throws Exception {
store.remove(key);
InternalCacheEntry entry;
@@ -197,7 +206,7 @@
for (int i = 0; i < number; i++) {
entries[i] = store.load(key + i);
}
-
+
for (int i = 0; i < number; i++) {
InternalCacheEntry entry = entries[i];
while (entry != null) {
@@ -215,8 +224,8 @@
final CountDownLatch endLatch;
final Object key;
- MockAsyncStore(Object key, CountDownLatch v1Latch, CountDownLatch v2Latch, CountDownLatch endLatch,
- CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
+ MockAsyncStore(Object key, CountDownLatch v1Latch, CountDownLatch v2Latch, CountDownLatch endLatch,
+ CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
super(delegate, asyncStoreConfig);
this.v1Latch = v1Latch;
this.v2Latch = v2Latch;
@@ -242,6 +251,8 @@
endLatch.countDown();
}
}
-
- };
+
+ }
+
+ ;
}
More information about the infinispan-commits
mailing list