[infinispan-commits] Infinispan SVN: r1422 - in trunk/core/src: main/java/org/infinispan/loaders/modifications and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Jan 28 12:02:34 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-01-28 12:02:33 -0500 (Thu, 28 Jan 2010)
New Revision: 1422
Added:
trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
Modified:
trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
trunk/core/src/test/java/org/infinispan/test/TestingUtil.java
Log:
[ISPN-340] (Using Transactions Cache Store doesn't work Async Anymore) Fixed.
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java 2010-01-28 17:02:33 UTC (rev 1422)
@@ -8,11 +8,14 @@
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.modifications.Clear;
+import org.infinispan.loaders.modifications.Commit;
import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Prepare;
import org.infinispan.loaders.modifications.PurgeExpired;
import org.infinispan.loaders.modifications.Remove;
import org.infinispan.loaders.modifications.Store;
import org.infinispan.marshall.Marshaller;
+import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -37,8 +40,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.infinispan.loaders.modifications.Modification.Type.*;
-
/**
* The AsyncStore is a delegating CacheStore that extends AbstractDelegatingStore, overriding methods to that should not
* just delegate the operation to the underlying store.
@@ -121,6 +122,18 @@
}
@Override
+ public void prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
+ Prepare prepare = new Prepare(list, tx, isOnePhase);
+ enqueue(prepare, prepare);
+ }
+
+ @Override
+ public void commit(GlobalTransaction tx) throws CacheLoaderException {
+ Commit commit = new Commit(tx);
+ enqueue(commit, commit);
+ }
+
+ @Override
public void start() throws CacheLoaderException {
state = newStateMap();
log.info("Async cache loader starting {0}", this);
@@ -163,7 +176,7 @@
Modification mod = entry.getValue();
switch (mod.getType()) {
case STORE:
- super.store(((Store)mod).getStoredEntry());
+ super.store(((Store) mod).getStoredEntry());
break;
case REMOVE:
super.remove(entry.getKey());
@@ -174,6 +187,12 @@
case PURGE_EXPIRED:
super.purgeExpired();
break;
+ case PREPARE:
+ super.prepare(((Prepare) mod).getList(), ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
+ break;
+ case COMMIT:
+ super.commit(((Commit) mod).getTx());
+ break;
}
}
}
Added: trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java 2010-01-28 17:02:33 UTC (rev 1422)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.modifications;
+
+import org.infinispan.transaction.xa.GlobalTransaction;
+
+/**
+ * Commit.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class Commit implements Modification {
+ final GlobalTransaction tx;
+
+ public Commit(GlobalTransaction tx) {
+ this.tx = tx;
+ }
+
+ public GlobalTransaction getTx() {
+ return tx;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.COMMIT;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + tx.hashCode();
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+ if (!(obj instanceof Commit))
+ return false;
+ Commit other = (Commit) obj;
+ return tx.equals(other.tx);
+ }
+
+}
Modified: trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java 2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java 2010-01-28 17:02:33 UTC (rev 1422)
@@ -8,7 +8,7 @@
*/
public interface Modification {
public static enum Type {
- STORE, REMOVE, CLEAR, PURGE_EXPIRED;
+ STORE, REMOVE, CLEAR, PURGE_EXPIRED, PREPARE, COMMIT;
}
Type getType();
Added: trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java 2010-01-28 17:02:33 UTC (rev 1422)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.modifications;
+
+import java.util.List;
+
+import org.infinispan.transaction.xa.GlobalTransaction;
+
+/**
+ * Prepare.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class Prepare implements Modification {
+ final List<? extends Modification> list;
+ final GlobalTransaction tx;
+ final boolean isOnePhase;
+
+ public Prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
+ this.list = list;
+ this.tx = tx;
+ this.isOnePhase = isOnePhase;
+ }
+
+ @Override
+ public Type getType() {
+ return Type.PREPARE;
+ }
+
+ public List<? extends Modification> getList() {
+ return list;
+ }
+
+ public GlobalTransaction getTx() {
+ return tx;
+ }
+
+ public boolean isOnePhase() {
+ return isOnePhase;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+ if (!(obj instanceof Prepare))
+ return false;
+ Prepare other = (Prepare) obj;
+ return list.equals(other.list)
+ && tx.equals(other.tx)
+ && isOnePhase == other.isOnePhase;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 17;
+ result = 31 * result + list.hashCode();
+ result = 31 * result + tx.hashCode();
+ result = 31 * result + (isOnePhase ? 1 : 0);
+ return result;
+ }
+
+}
Modified: trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java 2010-01-28 17:02:33 UTC (rev 1422)
@@ -7,22 +7,33 @@
import org.infinispan.loaders.CacheStore;
import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Prepare;
+import org.infinispan.loaders.modifications.Remove;
+import org.infinispan.loaders.modifications.Store;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.transaction.xa.GlobalTransactionFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import static org.infinispan.test.TestingUtil.*;
+
@Test(groups = "unit", testName = "loaders.decorators.AsyncTest")
public class AsyncTest extends AbstractInfinispanTest {
private static final Log log = LogFactory.getLog(AsyncTest.class);
@@ -126,6 +137,67 @@
}
}
+ public void testTransactionalModifications(Method m) throws Exception {
+ try {
+ final GlobalTransactionFactory gtf = new GlobalTransactionFactory();
+ final String k1 = k(m, "1"), k2 = k(m, "2"), v1 = v(m, "1"), v2 = v(m, "2");
+ final ConcurrentMap<Object, Modification> localMods = new ConcurrentHashMap<Object, Modification>();
+ final CyclicBarrier barrier = new CyclicBarrier(2);
+ DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+ store = new AsyncStore(underlying, asyncConfig) {
+ @Override
+ protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
+ for (Map.Entry<Object, Modification> entry : mods.entrySet()) {
+ localMods.put(entry.getKey(), entry.getValue());
+ }
+// try {
+// barrier.await(5, TimeUnit.SECONDS);
+// } catch (TimeoutException e) {
+// assert false : "Timed out waiting for modifications";
+// } catch (Exception e) {
+// throw new CacheLoaderException("Barried failed", e);
+// }
+ super.applyModificationsSync(mods);
+ try {
+ barrier.await(5, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ assert false : "Timed out applying for modifications";
+ } catch (Exception e) {
+ throw new CacheLoaderException("Barried failed", e);
+ }
+ }
+ };
+ dummyCfg = new DummyInMemoryCacheStore.Cfg();
+ dummyCfg.setStore(m.getName());
+ store.init(dummyCfg, null, null);
+ store.start();
+
+ List<Modification> mods = new ArrayList<Modification>();
+ mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+ mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+ mods.add(new Remove(k1));
+ GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
+ store.prepare(mods, tx, false);
+ barrier.await(5, TimeUnit.SECONDS);
+// barrier.await(5, TimeUnit.SECONDS);
+
+ assert 1 == localMods.size();
+ assert localMods.entrySet().iterator().next().getKey() instanceof Prepare;
+ assert !store.containsKey(k1);
+ assert !store.containsKey(k2);
+
+ store.commit(tx);
+ barrier.await(5, TimeUnit.SECONDS);
+// barrier.await(5, TimeUnit.SECONDS);
+ assert store.load(k2).getValue().equals(v2);
+ assert !store.containsKey(k1);
+ } finally {
+ store.delegate.clear();
+ store.stop();
+ store = null;
+ }
+ }
+
private void doTestPut(int number, String key, String value) throws Exception {
for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
Modified: trunk/core/src/test/java/org/infinispan/test/TestingUtil.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/TestingUtil.java 2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/test/java/org/infinispan/test/TestingUtil.java 2010-01-28 17:02:33 UTC (rev 1422)
@@ -23,7 +23,6 @@
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.loaders.CacheLoader;
import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.loaders.file.FileCacheStoreTest;
import org.infinispan.manager.CacheManager;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.remoting.ReplicationQueue;
@@ -35,6 +34,7 @@
import javax.transaction.TransactionManager;
import java.io.File;
import java.lang.reflect.Field;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -770,4 +770,22 @@
}
return prefix + TEST_PATH + separator + test.getClass().getSimpleName();
}
+
+ public static String k(Method method, String index) {
+ return new StringBuilder().append("k").append(index).append('-')
+ .append(method.getName()).toString();
+ }
+
+ public static String v(Method method, String index) {
+ return new StringBuilder().append("v").append(index).append('-')
+ .append(method.getName()).toString();
+ }
+
+ public static String k(Method method) {
+ return k(method, "");
+ }
+
+ public static Object v(Method method) {
+ return v(method, "");
+ }
}
More information about the infinispan-commits
mailing list