[infinispan-commits] Infinispan SVN: r2229 - in branches/4.1.x/core/src: test/java/org/infinispan/loaders and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Aug 16 11:12:42 EDT 2010


Author: mircea.markus
Date: 2010-08-16 11:12:42 -0400 (Mon, 16 Aug 2010)
New Revision: 2229

Added:
   branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
Modified:
   branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
Log:
[ISPN-580] - neverending AsyncStore.awaitNotEmpty during stop() of AsyncStore
	- applying Sanne's fix

Modified: branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-08-16 13:07:14 UTC (rev 2228)
+++ branches/4.1.x/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-08-16 15:12:42 UTC (rev 2229)
@@ -276,12 +276,17 @@
       }
    }
 
-   private void awaitNotEmpty() throws InterruptedException {
+   private void awaitNotEmptyOrStopped() throws InterruptedException {
       lock.lockInterruptibly();
       try {
          try {
-            while (count.get() == 0)
+            while (count.get() == 0) {
+               if (stopped.get()) {
+                  notEmpty.signal();
+                  return;
+               }
                notEmpty.await();
+            }
          } catch (InterruptedException ie) {
             notEmpty.signal(); // propagate to a non-interrupted thread
             throw ie;
@@ -307,7 +312,7 @@
       private final Set<Object> lockedKeys = new HashSet<Object>();
 
       public void run() {
-         while (!Thread.interrupted()) {
+         while (!Thread.interrupted() && !stopped.get()) {
             try {
                run0();
             }
@@ -356,7 +361,7 @@
          try {
             int size = swap.size();
             if (swap.isEmpty()) {
-               awaitNotEmpty();
+               awaitNotEmptyOrStopped();
             } else {
                decrementAndGet(size);
 

Added: branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java
===================================================================
--- branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java	                        (rev 0)
+++ branches/4.1.x/core/src/test/java/org/infinispan/loaders/FlushingAsyncStoreTest.java	2010-08-16 15:12:42 UTC (rev 2229)
@@ -0,0 +1,96 @@
+package org.infinispan.loaders;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+import org.apache.commons.math.stat.inference.TestUtils;
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.bucket.Bucket;
+import org.infinispan.loaders.decorators.AsyncStoreConfig;
+import org.infinispan.loaders.file.FileCacheStore;
+import org.infinispan.loaders.file.FileCacheStoreConfig;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+/**
+ * FlushingAsyncStoreTest.
+ * 
+ * @author Sanne Grinovero
+ */
+ at Test(groups = "functional", testName = "loaders.FlushingAsyncStoreTest", sequential = true)
+public class FlushingAsyncStoreTest extends SingleCacheManagerTest {
+
+   /** to assert the test methods are run in proper order **/
+   private boolean storeWasRun = false;
+
+   public FlushingAsyncStoreTest() {
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
+
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      Configuration config = getDefaultStandaloneConfig(false);
+      FileCacheStoreConfig fcsConfig = new FileCacheStoreConfig();
+      fcsConfig.setCacheLoaderClassName(SlowFileCacheStore.class.getName());
+      AsyncStoreConfig storeConfig = new AsyncStoreConfig();
+      storeConfig.setEnabled(true);
+      storeConfig.setThreadPoolSize(1);
+      fcsConfig.setAsyncStoreConfig(storeConfig);
+      CacheLoaderManagerConfig clmConfig = new CacheLoaderManagerConfig();
+      clmConfig.getCacheLoaderConfigs().add(fcsConfig);
+      config.setCacheLoaderManagerConfig(clmConfig);
+      return TestCacheManagerFactory.createCacheManager(config);
+   }
+
+   @Test (timeOut = 10000)
+   public void writeOnStorage() throws IOException, ClassNotFoundException, SQLException, InterruptedException {
+      cache = cacheManager.getCache("AsyncStoreInMemory");
+      cache.put("key1", "value");
+      cache.stop();
+      storeWasRun = true;
+   }
+
+   @Test(dependsOnMethods = "writeOnStorage")
+   public void verifyStorageContent() throws IOException {
+      assert storeWasRun;
+      cache = cacheManager.getCache("AsyncStoreInMemory");
+      assert "value".equals(cache.get("key1"));
+   }
+   
+   @AfterClass
+   public void removeStore(){
+      TestUtils a; 
+   }
+
+   public static class SlowFileCacheStore extends FileCacheStore {
+      private void insertDelay() {
+         try {
+            Thread.sleep(1000);
+         } catch (InterruptedException e) {
+         }
+      }
+
+      @Override
+      protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+         insertDelay();
+         super.insertBucket(bucket);
+      }
+
+      @Override
+      protected boolean removeLockSafe(Object key, String lockingKey) throws CacheLoaderException {
+         insertDelay();
+         return super.removeLockSafe(key, lockingKey);
+      }
+
+      @Override
+      protected void storeLockSafe(InternalCacheEntry entry, String lockingKey) throws CacheLoaderException {
+         insertDelay();
+         super.storeLockSafe(entry, lockingKey);
+      }
+   }
+}



More information about the infinispan-commits mailing list