[infinispan-commits] Infinispan SVN: r2229 - in branches/4.1.x/core/src: test/java/org/infinispan/loaders and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Aug 16 11:12:42 EDT 2010
Author: mircea.markus
Date: 2010-08-16 11:12:42 -0400 (Mon, 16 Aug 2010)
New Revision: 2229
Added:
branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
Modified:
branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
Log:
[ISPN-580] - neverending AsyncStore.awaitNotEmpty during stop() of AsyncStore
- applying Sanne's fix
Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-08-16 13:07:14 UTC (rev 2228)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-08-16 15:12:42 UTC (rev 2229)
@@ -276,12 +276,17 @@
}
}
- private void awaitNotEmpty() throws InterruptedException {
+ private void awaitNotEmptyOrStopped() throws InterruptedException {
lock.lockInterruptibly();
try {
try {
- while (count.get() == 0)
+ while (count.get() == 0) {
+ if (stopped.get()) {
+ notEmpty.signal();
+ return;
+ }
notEmpty.await();
+ }
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
@@ -307,7 +312,7 @@
private final Set<Object> lockedKeys = new HashSet<Object>();
public void run() {
- while (!Thread.interrupted()) {
+ while (!Thread.interrupted() && !stopped.get()) {
try {
run0();
}
@@ -356,7 +361,7 @@
try {
int size = swap.size();
if (swap.isEmpty()) {
- awaitNotEmpty();
+ awaitNotEmptyOrStopped();
} else {
decrementAndGet(size);
Added: branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java (rev 0)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java 2010-08-16 15:12:42 UTC (rev 2229)
@@ -0,0 +1,96 @@
+package org.infinispan.loaders;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.math.stat.inference.TestUtils;
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.bucket.Bucket;
+import org.infinispan.loaders.decorators.AsyncStoreConfig;
+import org.infinispan.loaders.file.FileCacheStore;
+import org.infinispan.loaders.file.FileCacheStoreConfig;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+/**
+ * FlushingAsyncStoreTest.
+ *
+ * @author Sanne Grinovero
+ */
+ at Test(groups = "functional", testName = "loaders.FlushingAsyncStoreTest", sequential = true)
+public class FlushingAsyncStoreTest extends SingleCacheManagerTest {
+
+ /** to assert the test methods are run in proper order **/
+ private boolean storeWasRun = false;
+
+ public FlushingAsyncStoreTest() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ @Override
+ protected EmbeddedCacheManager createCacheManager() throws Exception {
+ Configuration config = getDefaultStandaloneConfig(false);
+ FileCacheStoreConfig fcsConfig = new FileCacheStoreConfig();
+ fcsConfig.setCacheLoaderClassName(SlowFileCacheStore.class.getName());
+ AsyncStoreConfig storeConfig = new AsyncStoreConfig();
+ storeConfig.setEnabled(true);
+ storeConfig.setThreadPoolSize(1);
+ fcsConfig.setAsyncStoreConfig(storeConfig);
+ CacheLoaderManagerConfig clmConfig = new CacheLoaderManagerConfig();
+ clmConfig.getCacheLoaderConfigs().add(fcsConfig);
+ config.setCacheLoaderManagerConfig(clmConfig);
+ return TestCacheManagerFactory.createCacheManager(config);
+ }
+
+ @Test (timeOut = 10000)
+ public void writeOnStorage() throws IOException, ClassNotFoundException, SQLException, InterruptedException {
+ cache = cacheManager.getCache("AsyncStoreInMemory");
+ cache.put("key1", "value");
+ cache.stop();
+ storeWasRun = true;
+ }
+
+ @Test(dependsOnMethods = "writeOnStorage")
+ public void verifyStorageContent() throws IOException {
+ assert storeWasRun;
+ cache = cacheManager.getCache("AsyncStoreInMemory");
+ assert "value".equals(cache.get("key1"));
+ }
+
+ @AfterClass
+ public void removeStore(){
+ TestUtils a;
+ }
+
+ public static class SlowFileCacheStore extends FileCacheStore {
+ private void insertDelay() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+
+ @Override
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ insertDelay();
+ super.insertBucket(bucket);
+ }
+
+ @Override
+ protected boolean removeLockSafe(Object key, String lockingKey) throws CacheLoaderException {
+ insertDelay();
+ return super.removeLockSafe(key, lockingKey);
+ }
+
+ @Override
+ protected void storeLockSafe(InternalCacheEntry entry, String lockingKey) throws CacheLoaderException {
+ insertDelay();
+ super.storeLockSafe(entry, lockingKey);
+ }
+ }
+}
More information about the infinispan-commits
mailing list