[infinispan-commits] Infinispan SVN: r1471 - in trunk/core/src: test/java/org/infinispan/loaders/decorators and 1 other directory.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Feb 8 12:43:40 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-02-08 12:43:39 -0500 (Mon, 08 Feb 2010)
New Revision: 1471

Modified:
   trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
   trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
Log:
[ISPN-343] (Async store should coalesce the prepare list on commit) Fixed.

Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-02-08 17:42:51 UTC (rev 1470)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-02-08 17:43:39 UTC (rev 1471)
@@ -21,6 +21,7 @@
 import org.infinispan.util.logging.LogFactory;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -188,7 +189,8 @@
                super.purgeExpired();
                break;
             case PREPARE:
-               super.prepare(((Prepare) mod).getList(), ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
+               List<? extends Modification> coalesced = coalesceModificationList(((Prepare) mod).getList());
+               super.prepare(coalesced, ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
                break;
             case COMMIT:
                super.commit(((Commit) mod).getTx());
@@ -201,6 +203,33 @@
       return new AsyncProcessor();
    }
 
+   private List<? extends Modification> coalesceModificationList(List<? extends Modification> mods) {
+      Map<Object, Modification> keyMods = new HashMap<Object, Modification>();
+      List<Modification> coalesced = new ArrayList<Modification>();
+      for (Modification mod : mods) {
+         switch (mod.getType()) {
+            case STORE:
+               keyMods.put(((Store) mod).getStoredEntry().getKey(), mod);
+               break;
+            case CLEAR:
+               keyMods.clear(); // remove all pending key modifications
+               coalesced.add(mod); // add a clear so that future put/removes do not need to do anything
+               break;
+            case REMOVE:
+               if (!coalesced.isEmpty() && keyMods.containsKey(((Remove) mod).getKey())) {
+                  keyMods.remove(((Remove) mod).getKey()); // clear, p(k), r(k) sequence should result in no-op for k
+               } else if (coalesced.isEmpty()) {
+                  keyMods.put(((Remove) mod).getKey(), mod);
+               }
+               break;
+            default:
+               throw new IllegalArgumentException("Unknown modification type " + mod.getType());
+         }
+      }
+      coalesced.addAll(keyMods.values());
+      return coalesced;
+   }
+
    private void enqueue(Object key, Modification mod) {
       try {
          if (stopped.get()) {

Modified: trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-02-08 17:42:51 UTC (rev 1470)
+++ trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-02-08 17:43:39 UTC (rev 1471)
@@ -6,6 +6,7 @@
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
+import org.infinispan.loaders.modifications.Clear;
 import org.infinispan.loaders.modifications.Modification;
 import org.infinispan.loaders.modifications.Prepare;
 import org.infinispan.loaders.modifications.Remove;
@@ -31,6 +32,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.infinispan.test.TestingUtil.k;
 import static org.infinispan.test.TestingUtil.v;
@@ -138,7 +140,7 @@
       }
    }
 
-   public void testTransactionalModifications(Method m) throws Exception {
+   public void testTransactionalModificationsHappenInDiffThread(Method m) throws Exception {
       try {
          final GlobalTransactionFactory gtf = new GlobalTransactionFactory();
          final String k1 = k(m, "1"), k2 = k(m, "2"), v1 = v(m, "1"), v2 = v(m, "2");
@@ -151,13 +153,6 @@
                for (Map.Entry<Object, Modification> entry : mods.entrySet()) {
                   localMods.put(entry.getKey(), entry.getValue());
                }
-//               try {
-//                  barrier.await(5, TimeUnit.SECONDS);
-//               } catch (TimeoutException e) {
-//                  assert false : "Timed out waiting for modifications";
-//               } catch (Exception e) {
-//                  throw new CacheLoaderException("Barried failed", e);
-//               }
                super.applyModificationsSync(mods);
                try {
                   barrier.await(5, TimeUnit.SECONDS);
@@ -180,7 +175,6 @@
          GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
          store.prepare(mods, tx, false);
          barrier.await(5, TimeUnit.SECONDS);
-//         barrier.await(5, TimeUnit.SECONDS);
 
          assert 1 == localMods.size();
          assert localMods.entrySet().iterator().next().getKey() instanceof Prepare;
@@ -189,7 +183,6 @@
 
          store.commit(tx);
          barrier.await(5, TimeUnit.SECONDS);
-//         barrier.await(5, TimeUnit.SECONDS);
          assert store.load(k2).getValue().equals(v2);
          assert !store.containsKey(k1);
       } finally {
@@ -199,6 +192,154 @@
       }
    }
 
+   public void testTransactionalModificationsAreCoalesced(Method m) throws Exception {
+      try {
+         final GlobalTransactionFactory gtf = new GlobalTransactionFactory();
+         final String k1 = k(m, "1"), k2 = k(m, "2"), k3 = k(m, "3"), v1 = v(m, "1"), v2 = v(m, "2"), v3 = v(m, "3");
+         final AtomicInteger storeCount = new AtomicInteger();
+         final AtomicInteger removeCount = new AtomicInteger();
+         final AtomicInteger clearCount = new AtomicInteger();
+         final CyclicBarrier barrier = new CyclicBarrier(2);
+         DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore() {
+            @Override
+            public void store(InternalCacheEntry ed) {
+               super.store(ed);
+               storeCount.getAndIncrement();
+            }
+
+            @Override
+            public boolean remove(Object key) {
+               boolean ret = super.remove(key);
+               removeCount.getAndIncrement();
+               return ret;
+            }
+
+            @Override
+            public void clear() {
+               super.clear();
+               clearCount.getAndIncrement();
+            }
+         };
+         store = new AsyncStore(underlying, asyncConfig) {
+            @Override
+            protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
+               super.applyModificationsSync(mods);
+               try {
+                  barrier.await(5, TimeUnit.SECONDS);
+               } catch (TimeoutException e) {
+                  assert false : "Timed out applying for modifications";
+               } catch (Exception e) {
+                  throw new CacheLoaderException("Barried failed", e);
+               }
+            }
+         };
+         dummyCfg = new DummyInMemoryCacheStore.Cfg();
+         dummyCfg.setStore(m.getName());
+         store.init(dummyCfg, null, null);
+         store.start();
+
+         List<Modification> mods = new ArrayList<Modification>();
+         mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+         mods.add(new Store(InternalEntryFactory.create(k1, v2)));
+         mods.add(new Store(InternalEntryFactory.create(k2, v1)));
+         mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+         mods.add(new Remove(k1));
+         GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
+         store.prepare(mods, tx, false);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get();
+         assert 0 == removeCount.get();
+         assert 0 == clearCount.get();
+         store.commit(tx);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 1 == storeCount.get() : "Store count was " + storeCount.get();
+         assert 1 == removeCount.get();
+         assert 0 == clearCount.get();
+
+         storeCount.set(0);
+         removeCount.set(0);
+         clearCount.set(0);
+         mods = new ArrayList<Modification>();
+         mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+         mods.add(new Remove(k1));
+         mods.add(new Clear());
+         mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+         mods.add(new Remove(k2));
+         tx = gtf.newGlobalTransaction(null, false);
+         store.prepare(mods, tx, false);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get();
+         assert 0 == removeCount.get();
+         assert 0 == clearCount.get();
+         store.commit(tx);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get() : "Store count was " + storeCount.get();
+         assert 0 == removeCount.get();
+         assert 1 == clearCount.get();
+
+         storeCount.set(0);
+         removeCount.set(0);
+         clearCount.set(0);
+         mods = new ArrayList<Modification>();
+         mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+         mods.add(new Remove(k1));
+         mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+         mods.add(new Remove(k2));
+         mods.add(new Store(InternalEntryFactory.create(k3, v3)));         
+         tx = gtf.newGlobalTransaction(null, false);
+         store.prepare(mods, tx, false);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get();
+         assert 0 == removeCount.get();
+         assert 0 == clearCount.get();
+         store.commit(tx);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 1 == storeCount.get() : "Store count was " + storeCount.get();
+         assert 2 == removeCount.get();
+         assert 0 == clearCount.get();
+
+         storeCount.set(0);
+         removeCount.set(0);
+         clearCount.set(0);
+         mods = new ArrayList<Modification>();
+         mods.add(new Clear());
+         mods.add(new Remove(k1));
+         tx = gtf.newGlobalTransaction(null, false);
+         store.prepare(mods, tx, false);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get();
+         assert 0 == removeCount.get();
+         assert 0 == clearCount.get();
+         store.commit(tx);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get() : "Store count was " + storeCount.get();
+         assert 0 == removeCount.get();
+         assert 1 == clearCount.get();
+
+         storeCount.set(0);
+         removeCount.set(0);
+         clearCount.set(0);
+         mods = new ArrayList<Modification>();
+         mods.add(new Clear());
+         mods.add(new Store(InternalEntryFactory.create(k1, v1)));         
+         tx = gtf.newGlobalTransaction(null, false);
+         store.prepare(mods, tx, false);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 0 == storeCount.get();
+         assert 0 == removeCount.get();
+         assert 0 == clearCount.get();
+         store.commit(tx);
+         barrier.await(5, TimeUnit.SECONDS);
+         assert 1 == storeCount.get() : "Store count was " + storeCount.get();
+         assert 0 == removeCount.get();
+         assert 1 == clearCount.get();
+      } finally {
+         store.delegate.clear();
+         store.stop();
+         store = null;
+      }
+   }
+
    private void doTestPut(int number, String key, String value) throws Exception {
       for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
 



More information about the infinispan-commits mailing list