[infinispan-commits] Infinispan SVN: r1471 - in trunk/core/src: test/java/org/infinispan/loaders/decorators and 1 other directory.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Feb 8 12:43:40 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-02-08 12:43:39 -0500 (Mon, 08 Feb 2010)
New Revision: 1471
Modified:
trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
Log:
[ISPN-343] (Async store should coalesce the prepare list on commit) Fixed.
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-02-08 17:42:51 UTC (rev 1470)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-02-08 17:43:39 UTC (rev 1471)
@@ -21,6 +21,7 @@
import org.infinispan.util.logging.LogFactory;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -188,7 +189,8 @@
super.purgeExpired();
break;
case PREPARE:
- super.prepare(((Prepare) mod).getList(), ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
+ List<? extends Modification> coalesced = coalesceModificationList(((Prepare) mod).getList());
+ super.prepare(coalesced, ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
break;
case COMMIT:
super.commit(((Commit) mod).getTx());
@@ -201,6 +203,33 @@
return new AsyncProcessor();
}
+ private List<? extends Modification> coalesceModificationList(List<? extends Modification> mods) {
+ Map<Object, Modification> keyMods = new HashMap<Object, Modification>();
+ List<Modification> coalesced = new ArrayList<Modification>();
+ for (Modification mod : mods) {
+ switch (mod.getType()) {
+ case STORE:
+ keyMods.put(((Store) mod).getStoredEntry().getKey(), mod);
+ break;
+ case CLEAR:
+ keyMods.clear(); // remove all pending key modifications
+ coalesced.add(mod); // add a clear so that future put/removes do not need to do anything
+ break;
+ case REMOVE:
+ if (!coalesced.isEmpty() && keyMods.containsKey(((Remove) mod).getKey())) {
+ keyMods.remove(((Remove) mod).getKey()); // clear, p(k), r(k) sequence should result in no-op for k
+ } else if (coalesced.isEmpty()) {
+ keyMods.put(((Remove) mod).getKey(), mod);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown modification type " + mod.getType());
+ }
+ }
+ coalesced.addAll(keyMods.values());
+ return coalesced;
+ }
+
private void enqueue(Object key, Modification mod) {
try {
if (stopped.get()) {
Modified: trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-02-08 17:42:51 UTC (rev 1470)
+++ trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-02-08 17:43:39 UTC (rev 1471)
@@ -6,6 +6,7 @@
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
+import org.infinispan.loaders.modifications.Clear;
import org.infinispan.loaders.modifications.Modification;
import org.infinispan.loaders.modifications.Prepare;
import org.infinispan.loaders.modifications.Remove;
@@ -31,6 +32,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.infinispan.test.TestingUtil.k;
import static org.infinispan.test.TestingUtil.v;
@@ -138,7 +140,7 @@
}
}
- public void testTransactionalModifications(Method m) throws Exception {
+ public void testTransactionalModificationsHappenInDiffThread(Method m) throws Exception {
try {
final GlobalTransactionFactory gtf = new GlobalTransactionFactory();
final String k1 = k(m, "1"), k2 = k(m, "2"), v1 = v(m, "1"), v2 = v(m, "2");
@@ -151,13 +153,6 @@
for (Map.Entry<Object, Modification> entry : mods.entrySet()) {
localMods.put(entry.getKey(), entry.getValue());
}
-// try {
-// barrier.await(5, TimeUnit.SECONDS);
-// } catch (TimeoutException e) {
-// assert false : "Timed out waiting for modifications";
-// } catch (Exception e) {
-// throw new CacheLoaderException("Barried failed", e);
-// }
super.applyModificationsSync(mods);
try {
barrier.await(5, TimeUnit.SECONDS);
@@ -180,7 +175,6 @@
GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
store.prepare(mods, tx, false);
barrier.await(5, TimeUnit.SECONDS);
-// barrier.await(5, TimeUnit.SECONDS);
assert 1 == localMods.size();
assert localMods.entrySet().iterator().next().getKey() instanceof Prepare;
@@ -189,7 +183,6 @@
store.commit(tx);
barrier.await(5, TimeUnit.SECONDS);
-// barrier.await(5, TimeUnit.SECONDS);
assert store.load(k2).getValue().equals(v2);
assert !store.containsKey(k1);
} finally {
@@ -199,6 +192,154 @@
}
}
+ public void testTransactionalModificationsAreCoalesced(Method m) throws Exception {
+ try {
+ final GlobalTransactionFactory gtf = new GlobalTransactionFactory();
+ final String k1 = k(m, "1"), k2 = k(m, "2"), k3 = k(m, "3"), v1 = v(m, "1"), v2 = v(m, "2"), v3 = v(m, "3");
+ final AtomicInteger storeCount = new AtomicInteger();
+ final AtomicInteger removeCount = new AtomicInteger();
+ final AtomicInteger clearCount = new AtomicInteger();
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore() {
+ @Override
+ public void store(InternalCacheEntry ed) {
+ super.store(ed);
+ storeCount.getAndIncrement();
+ }
+
+ @Override
+ public boolean remove(Object key) {
+ boolean ret = super.remove(key);
+ removeCount.getAndIncrement();
+ return ret;
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ clearCount.getAndIncrement();
+ }
+ };
+ store = new AsyncStore(underlying, asyncConfig) {
+ @Override
+ protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
+ super.applyModificationsSync(mods);
+ try {
+ barrier.await(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ assert false : "Timed out applying for modifications";
+ } catch (Exception e) {
+ throw new CacheLoaderException("Barried failed", e);
+ }
+ }
+ };
+ dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg.setStore(m.getName());
+ store.init(dummyCfg, null, null);
+ store.start();
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+ mods.add(new Store(InternalEntryFactory.create(k1, v2)));
+ mods.add(new Store(InternalEntryFactory.create(k2, v1)));
+ mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+ mods.add(new Remove(k1));
+ GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
+ store.prepare(mods, tx, false);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get();
+ assert 0 == removeCount.get();
+ assert 0 == clearCount.get();
+ store.commit(tx);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 1 == storeCount.get() : "Store count was " + storeCount.get();
+ assert 1 == removeCount.get();
+ assert 0 == clearCount.get();
+
+ storeCount.set(0);
+ removeCount.set(0);
+ clearCount.set(0);
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+ mods.add(new Remove(k1));
+ mods.add(new Clear());
+ mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+ mods.add(new Remove(k2));
+ tx = gtf.newGlobalTransaction(null, false);
+ store.prepare(mods, tx, false);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get();
+ assert 0 == removeCount.get();
+ assert 0 == clearCount.get();
+ store.commit(tx);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get() : "Store count was " + storeCount.get();
+ assert 0 == removeCount.get();
+ assert 1 == clearCount.get();
+
+ storeCount.set(0);
+ removeCount.set(0);
+ clearCount.set(0);
+ mods = new ArrayList<Modification>();
+ mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+ mods.add(new Remove(k1));
+ mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+ mods.add(new Remove(k2));
+ mods.add(new Store(InternalEntryFactory.create(k3, v3)));
+ tx = gtf.newGlobalTransaction(null, false);
+ store.prepare(mods, tx, false);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get();
+ assert 0 == removeCount.get();
+ assert 0 == clearCount.get();
+ store.commit(tx);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 1 == storeCount.get() : "Store count was " + storeCount.get();
+ assert 2 == removeCount.get();
+ assert 0 == clearCount.get();
+
+ storeCount.set(0);
+ removeCount.set(0);
+ clearCount.set(0);
+ mods = new ArrayList<Modification>();
+ mods.add(new Clear());
+ mods.add(new Remove(k1));
+ tx = gtf.newGlobalTransaction(null, false);
+ store.prepare(mods, tx, false);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get();
+ assert 0 == removeCount.get();
+ assert 0 == clearCount.get();
+ store.commit(tx);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get() : "Store count was " + storeCount.get();
+ assert 0 == removeCount.get();
+ assert 1 == clearCount.get();
+
+ storeCount.set(0);
+ removeCount.set(0);
+ clearCount.set(0);
+ mods = new ArrayList<Modification>();
+ mods.add(new Clear());
+ mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+ tx = gtf.newGlobalTransaction(null, false);
+ store.prepare(mods, tx, false);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 0 == storeCount.get();
+ assert 0 == removeCount.get();
+ assert 0 == clearCount.get();
+ store.commit(tx);
+ barrier.await(5, TimeUnit.SECONDS);
+ assert 1 == storeCount.get() : "Store count was " + storeCount.get();
+ assert 0 == removeCount.get();
+ assert 1 == clearCount.get();
+ } finally {
+ store.delegate.clear();
+ store.stop();
+ store = null;
+ }
+ }
+
private void doTestPut(int number, String key, String value) throws Exception {
for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
More information about the infinispan-commits
mailing list