[infinispan-commits] Infinispan SVN: r1396 - in trunk/core/src: test/java/org/infinispan/loaders/decorators and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Tue Jan 19 10:49:32 EST 2010


Author: manik.surtani at jboss.com
Date: 2010-01-19 10:49:32 -0500 (Tue, 19 Jan 2010)
New Revision: 1396

Modified:
   trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
   trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
Log:
Updated and fixed faulty test

Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-01-19 14:59:37 UTC (rev 1395)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-01-19 15:49:32 UTC (rev 1396)
@@ -1,19 +1,18 @@
 package org.infinispan.loaders.decorators;
 
 import net.jcip.annotations.GuardedBy;
-
+import org.infinispan.Cache;
 import org.infinispan.CacheException;
-import org.infinispan.Cache;
-import org.infinispan.marshall.Marshaller;
 import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.CacheLoaderConfig;
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
-import org.infinispan.loaders.CacheLoaderConfig;
 import org.infinispan.loaders.modifications.Clear;
 import org.infinispan.loaders.modifications.Modification;
 import org.infinispan.loaders.modifications.PurgeExpired;
 import org.infinispan.loaders.modifications.Remove;
 import org.infinispan.loaders.modifications.Store;
+import org.infinispan.marshall.Marshaller;
 import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -38,6 +37,8 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.infinispan.loaders.modifications.Modification.Type.*;
+
 /**
  * The AsyncStore is a delegating CacheStore that extends AbstractDelegatingStore, overriding methods to that should not
  * just delegate the operation to the underlying store.
@@ -57,7 +58,7 @@
  * <p/>
  *
  * @author Manik Surtani
- * @author Galder Zamarreño 
+ * @author Galder Zamarreño
  * @since 4.0
  */
 public class AsyncStore extends AbstractDelegatingStore {
@@ -66,21 +67,24 @@
    private static final AtomicInteger threadId = new AtomicInteger(0);
    private final AtomicBoolean stopped = new AtomicBoolean(true);
    private final AsyncStoreConfig asyncStoreConfig;
-   
-   /** Approximate count of number of modified keys. At points, it could contain negative values. */
+
+   /**
+    * Approximate count of number of modified keys. At points, it could contain negative values.
+    */
    private final AtomicInteger count = new AtomicInteger(0);
    private final ReentrantLock lock = new ReentrantLock();
-   private final Condition notEmpty  = lock.newCondition();
+   private final Condition notEmpty = lock.newCondition();
 
-   private ExecutorService executor;
+   ExecutorService executor;
    private List<Future> processorFutures;
    private final ReadWriteLock mapLock = new ReentrantReadWriteLock();
    private final Lock read = mapLock.readLock();
    private final Lock write = mapLock.writeLock();
    private int concurrencyLevel;
-   @GuardedBy("mapLock") protected ConcurrentMap<Object, Modification> state;
+   @GuardedBy("mapLock")
+   protected ConcurrentMap<Object, Modification> state;
    private ReleaseAllLockContainer lockContainer;
-   
+
    public AsyncStore(CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
       super(delegate);
       this.asyncStoreConfig = asyncStoreConfig;
@@ -115,7 +119,7 @@
       PurgeExpired purge = new PurgeExpired();
       enqueue(purge, purge);
    }
-   
+
    @Override
    public void start() throws CacheLoaderException {
       state = newStateMap();
@@ -152,7 +156,7 @@
       executor = null;
       super.stop();
    }
-   
+
    protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
       Set<Map.Entry<Object, Modification>> entries = mods.entrySet();
       for (Map.Entry<Object, Modification> entry : entries) {
@@ -186,7 +190,7 @@
          if (trace) log.trace("Enqueuing modification {0}", mod);
          Modification prev = null;
          int c = -1;
-         boolean unlock = false;      
+         boolean unlock = false;
          try {
             acquireLock(read);
             unlock = true;
@@ -197,7 +201,7 @@
          /* Increment can happen outside the lock cos worst case scenario a false not empty would 
           * be sent if the swap and decrement happened between the put and the increment. In this 
           * case, the corresponding processor would see the map empty and would wait again. This 
-          * means that we're allowing count to potentially go negative but that's not a problem. */  
+          * means that we're allowing count to potentially go negative but that's not a problem. */
          if (prev == null) c = count.getAndIncrement();
          if (c == 0) signalNotEmpty();
       } catch (Exception e) {
@@ -214,39 +218,39 @@
          Thread.currentThread().interrupt();
       }
    }
-   
+
    private void signalNotEmpty() {
       lock.lock();
       try {
-          notEmpty.signal();
+         notEmpty.signal();
       } finally {
-          lock.unlock();
+         lock.unlock();
       }
    }
-   
+
    private void awaitNotEmpty() throws InterruptedException {
       lock.lockInterruptibly();
       try {
          try {
             while (count.get() == 0)
-                notEmpty.await();
+               notEmpty.await();
          } catch (InterruptedException ie) {
             notEmpty.signal(); // propagate to a non-interrupted thread
             throw ie;
-         }         
+         }
       } finally {
          lock.unlock();
       }
    }
-   
+
    private int decrementAndGet(int delta) {
-      for (;;) {
+      for (; ;) {
          int current = count.get();
          int next = current - delta;
          if (count.compareAndSet(current, next)) return next;
-     }
+      }
    }
-   
+
    /**
     * Processes modifications taking the latest updates from a state map.
     */
@@ -273,11 +277,11 @@
             if (trace) log.trace("Remaining interrupted");
          }
       }
-      
+
       void run0() throws InterruptedException {
          if (trace) log.trace("Checking for modifications");
          boolean unlock = false;
-         
+
          try {
             acquireLock(write);
             unlock = true;
@@ -303,29 +307,32 @@
 
          try {
             int size = swap.size();
-            if (size == 0) 
+            if (swap.isEmpty()) {
                awaitNotEmpty();
-            else 
+            } else {
                decrementAndGet(size);
 
-            if (trace) log.trace("Apply {0} modifications", size);
-            int maxRetries = 3;
-            int attemptNumber = 0;
-            boolean successful;
-            do {
-               if (attemptNumber > 0 && log.isDebugEnabled()) log.debug("Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
-               successful = put(swap);
-               attemptNumber++;
-            } while (!successful && attemptNumber <= maxRetries);
+               if (trace) log.trace("Apply {0} modifications", size);
+               int maxRetries = 3;
+               int attemptNumber = 0;
+               boolean successful;
+               do {
+                  if (attemptNumber > 0 && log.isDebugEnabled())
+                     log.debug("Retrying due to previous failure. {0} attempts left.", maxRetries - attemptNumber);
+                  successful = put(swap);
+                  attemptNumber++;
+               } while (!successful && attemptNumber <= maxRetries);
 
-            if (!successful) log.warn("Unable to process some async modifications after " + maxRetries + " retries!");
+               if (!successful)
+                  log.warn("Unable to process some async modifications after " + maxRetries + " retries!");
 
+            }
          } finally {
             lockContainer.releaseLocks(lockedKeys);
             lockedKeys.clear();
          }
       }
-      
+
       boolean put(ConcurrentMap<Object, Modification> mods) {
          try {
             AsyncStore.this.applyModificationsSync(mods);

Modified: trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-01-19 14:59:37 UTC (rev 1395)
+++ trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-01-19 15:49:32 UTC (rev 1396)
@@ -11,7 +11,9 @@
 import org.infinispan.test.TestingUtil;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
@@ -21,7 +23,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
- at Test(groups = "unit", testName = "loaders.decorators.AsyncTest", enabled = false)
+ at Test(groups = "unit", testName = "loaders.decorators.AsyncTest")
 public class AsyncTest extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(AsyncTest.class);
    AsyncStore store;
@@ -30,7 +32,7 @@
    AsyncStoreConfig asyncConfig;
    DummyInMemoryCacheStore.Cfg dummyCfg;
 
-   @BeforeTest
+   @BeforeMethod
    public void setUp() throws CacheLoaderException {
       underlying = new DummyInMemoryCacheStore();
       asyncConfig = new AsyncStoreConfig();
@@ -43,7 +45,7 @@
       asyncExecutor = (ExecutorService) TestingUtil.extractField(store, "executor");
    }
 
-   @AfterTest
+   @AfterMethod
    public void tearDown() throws CacheLoaderException {
       if (store != null) store.stop();
    }
@@ -55,7 +57,7 @@
       doTestPut(number, key, value);
       doTestRemove(number, key);
    }
-   
+
    public void testPutClearPut() throws Exception {
       final int number = 1000;
       String key = "testPutClearPut-k-";
@@ -64,7 +66,7 @@
       doTestClear(number, key);
       value = "testPutClearPut-v[2]-";
       doTestPut(number, key, value);
-      
+
       doTestRemove(number, key);
    }
 
@@ -83,7 +85,7 @@
       String key = "testRestrictionOnAddingToAsyncQueue-k";
       String value = "testRestrictionOnAddingToAsyncQueue-v-";
       doTestPut(number, key, value);
-      
+
       // stop the cache store
       store.stop();
       try {
@@ -99,35 +101,41 @@
    }
 
    public void testThreadSafetyWritingDiffValuesForKey(Method m) throws Exception {
-      final String key = "k1";
-      final CountDownLatch v1Latch = new CountDownLatch(1);
-      final CountDownLatch v2Latch = new CountDownLatch(1);
-      final CountDownLatch endLatch = new CountDownLatch(1);
-      DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
-      store = new MockAsyncStore(key, v1Latch, v2Latch, endLatch, underlying, asyncConfig);
-      dummyCfg = new DummyInMemoryCacheStore.Cfg();
-      dummyCfg.setStore(m.getName());
-      store.init(dummyCfg, null, null);
-      store.start();
-      
-      store.store(InternalEntryFactory.create(key, "v1"));
-      v2Latch.await();
-      store.store(InternalEntryFactory.create(key, "v2"));
-      endLatch.await();
+      try {
+         final String key = "k1";
+         final CountDownLatch v1Latch = new CountDownLatch(1);
+         final CountDownLatch v2Latch = new CountDownLatch(1);
+         final CountDownLatch endLatch = new CountDownLatch(1);
+         DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+         store = new MockAsyncStore(key, v1Latch, v2Latch, endLatch, underlying, asyncConfig);
+         dummyCfg = new DummyInMemoryCacheStore.Cfg();
+         dummyCfg.setStore(m.getName());
+         store.init(dummyCfg, null, null);
+         store.start();
 
-      assert store.load(key).getValue().equals("v2");
+         store.store(InternalEntryFactory.create(key, "v1"));
+         v2Latch.await();
+         store.store(InternalEntryFactory.create(key, "v2"));
+         endLatch.await();
+
+         assert store.load(key).getValue().equals("v2");
+      } finally {
+         store.delegate.clear();
+         store.stop();
+         store = null;
+      }
    }
 
    private void doTestPut(int number, String key, String value) throws Exception {
       for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
-      
+
       TestingUtil.sleepRandom(1000);
 
       InternalCacheEntry[] entries = new InternalCacheEntry[number];
       for (int i = 0; i < number; i++) {
          entries[i] = store.load(key + i);
       }
-      
+
       for (int i = 0; i < number; i++) {
          InternalCacheEntry entry = entries[i];
          if (entry != null) {
@@ -144,17 +152,18 @@
          }
       }
    }
-   
+
    private void doTestSameKeyPut(int number, String key, String value) throws Exception {
       for (int i = 0; i < number; i++)
          store.store(InternalEntryFactory.create(key, value + i));
 
+      TestingUtil.sleepThread(5000);
       InternalCacheEntry entry;
       boolean success = false;
       for (int i = 0; i < 120; i++) {
          TestingUtil.sleepRandom(1000);
          entry = store.load(key);
-         success = entry.getValue().equals(value + (number-1));
+         success = entry.getValue().equals(value + (number - 1));
          if (success) break;
       }
       assert success;
@@ -162,14 +171,14 @@
 
    private void doTestRemove(int number, String key) throws Exception {
       for (int i = 0; i < number; i++) store.remove(key + i);
-      
+
       TestingUtil.sleepRandom(1000);
 
       InternalCacheEntry[] entries = new InternalCacheEntry[number];
       for (int i = 0; i < number; i++) {
          entries[i] = store.load(key + i);
       }
-      
+
       for (int i = 0; i < number; i++) {
          InternalCacheEntry entry = entries[i];
          while (entry != null) {
@@ -179,7 +188,7 @@
          }
       }
    }
-   
+
    private void doTestSameKeyRemove(String key) throws Exception {
       store.remove(key);
       InternalCacheEntry entry;
@@ -197,7 +206,7 @@
       for (int i = 0; i < number; i++) {
          entries[i] = store.load(key + i);
       }
-      
+
       for (int i = 0; i < number; i++) {
          InternalCacheEntry entry = entries[i];
          while (entry != null) {
@@ -215,8 +224,8 @@
       final CountDownLatch endLatch;
       final Object key;
 
-      MockAsyncStore(Object key, CountDownLatch v1Latch, CountDownLatch v2Latch, CountDownLatch endLatch, 
-               CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
+      MockAsyncStore(Object key, CountDownLatch v1Latch, CountDownLatch v2Latch, CountDownLatch endLatch,
+                     CacheStore delegate, AsyncStoreConfig asyncStoreConfig) {
          super(delegate, asyncStoreConfig);
          this.v1Latch = v1Latch;
          this.v2Latch = v2Latch;
@@ -242,6 +251,8 @@
             endLatch.countDown();
          }
       }
-      
-   };
+
+   }
+
+   ;
 }



More information about the infinispan-commits mailing list