[infinispan-commits] Infinispan SVN: r1422 - in trunk/core/src: main/java/org/infinispan/loaders/modifications and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jan 28 12:02:34 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-01-28 12:02:33 -0500 (Thu, 28 Jan 2010)
New Revision: 1422

Added:
   trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
   trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
Modified:
   trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
   trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
   trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
   trunk/core/src/test/java/org/infinispan/test/TestingUtil.java
Log:
[ISPN-340] (Using Transactions Cache Store doesn't work Async Anymore) Fixed.

Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AsyncStore.java	2010-01-28 17:02:33 UTC (rev 1422)
@@ -8,11 +8,14 @@
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.loaders.modifications.Clear;
+import org.infinispan.loaders.modifications.Commit;
 import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Prepare;
 import org.infinispan.loaders.modifications.PurgeExpired;
 import org.infinispan.loaders.modifications.Remove;
 import org.infinispan.loaders.modifications.Store;
 import org.infinispan.marshall.Marshaller;
+import org.infinispan.transaction.xa.GlobalTransaction;
 import org.infinispan.util.concurrent.locks.containers.ReentrantPerEntryLockContainer;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
@@ -37,8 +40,6 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.infinispan.loaders.modifications.Modification.Type.*;
-
 /**
  * The AsyncStore is a delegating CacheStore that extends AbstractDelegatingStore, overriding methods to that should not
  * just delegate the operation to the underlying store.
@@ -121,6 +122,18 @@
    }
 
    @Override
+   public void prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
+      Prepare prepare = new Prepare(list, tx, isOnePhase);
+      enqueue(prepare, prepare);
+   }
+
+   @Override
+   public void commit(GlobalTransaction tx) throws CacheLoaderException {
+      Commit commit = new Commit(tx);
+      enqueue(commit, commit);
+   }
+
+   @Override
    public void start() throws CacheLoaderException {
       state = newStateMap();
       log.info("Async cache loader starting {0}", this);
@@ -163,7 +176,7 @@
          Modification mod = entry.getValue();
          switch (mod.getType()) {
             case STORE:
-               super.store(((Store)mod).getStoredEntry());
+               super.store(((Store) mod).getStoredEntry());
                break;
             case REMOVE:
                super.remove(entry.getKey());
@@ -174,6 +187,12 @@
             case PURGE_EXPIRED:
                super.purgeExpired();
                break;
+            case PREPARE:
+               super.prepare(((Prepare) mod).getList(), ((Prepare) mod).getTx(), ((Prepare) mod).isOnePhase());
+               break;
+            case COMMIT:
+               super.commit(((Commit) mod).getTx());
+               break;
          }
       }
    }

Added: trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/loaders/modifications/Commit.java	2010-01-28 17:02:33 UTC (rev 1422)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.modifications;
+
+import org.infinispan.transaction.xa.GlobalTransaction;
+
+/**
+ * Commit.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class Commit implements Modification {
+   final GlobalTransaction tx;
+
+   public Commit(GlobalTransaction tx) {
+      this.tx = tx;
+   }
+
+   public GlobalTransaction getTx() {
+      return tx;
+   }
+
+   @Override
+   public Type getType() {
+      return Type.COMMIT;
+   }
+
+   @Override
+   public int hashCode() {
+      int result = 17;
+      result = 31 * result + tx.hashCode();
+      return result;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (obj == this)
+         return true;
+      if (!(obj instanceof Commit))
+         return false;
+      Commit other = (Commit) obj;
+      return tx.equals(other.tx);
+   }
+
+}

Modified: trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java	2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/main/java/org/infinispan/loaders/modifications/Modification.java	2010-01-28 17:02:33 UTC (rev 1422)
@@ -8,7 +8,7 @@
  */
 public interface Modification {
    public static enum Type {
-      STORE, REMOVE, CLEAR, PURGE_EXPIRED;
+      STORE, REMOVE, CLEAR, PURGE_EXPIRED, PREPARE, COMMIT;
    }
 
    Type getType();

Added: trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/loaders/modifications/Prepare.java	2010-01-28 17:02:33 UTC (rev 1422)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.loaders.modifications;
+
+import java.util.List;
+
+import org.infinispan.transaction.xa.GlobalTransaction;
+
+/**
+ * Prepare.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class Prepare implements Modification {
+   final List<? extends Modification> list;
+   final GlobalTransaction tx;
+   final boolean isOnePhase;
+
+   public Prepare(List<? extends Modification> list, GlobalTransaction tx, boolean isOnePhase) {
+      this.list = list;
+      this.tx = tx;
+      this.isOnePhase = isOnePhase;
+   }
+
+   @Override
+   public Type getType() {
+      return Type.PREPARE;
+   }
+
+   public List<? extends Modification> getList() {
+      return list;
+   }
+
+   public GlobalTransaction getTx() {
+      return tx;
+   }
+
+   public boolean isOnePhase() {
+      return isOnePhase;
+   }
+
+   @Override
+   public boolean equals(Object obj) {
+      if (obj == this)
+         return true;
+      if (!(obj instanceof Prepare))
+         return false;
+      Prepare other = (Prepare) obj;
+      return list.equals(other.list) 
+         && tx.equals(other.tx) 
+         && isOnePhase == other.isOnePhase;
+   }
+
+   @Override
+   public int hashCode() {
+      int result = 17;
+      result = 31 * result + list.hashCode();
+      result = 31 * result + tx.hashCode();
+      result = 31 * result + (isOnePhase ? 1 : 0);
+      return result;
+   }
+
+}

Modified: trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/test/java/org/infinispan/loaders/decorators/AsyncTest.java	2010-01-28 17:02:33 UTC (rev 1422)
@@ -7,22 +7,33 @@
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.loaders.dummy.DummyInMemoryCacheStore;
 import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Prepare;
+import org.infinispan.loaders.modifications.Remove;
+import org.infinispan.loaders.modifications.Store;
 import org.infinispan.test.AbstractInfinispanTest;
 import org.infinispan.test.TestingUtil;
+import org.infinispan.transaction.xa.GlobalTransaction;
+import org.infinispan.transaction.xa.GlobalTransactionFactory;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterTest;
 import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static org.infinispan.test.TestingUtil.*;
+
 @Test(groups = "unit", testName = "loaders.decorators.AsyncTest")
 public class AsyncTest extends AbstractInfinispanTest {
    private static final Log log = LogFactory.getLog(AsyncTest.class);
@@ -126,6 +137,67 @@
       }
    }
 
+   public void testTransactionalModifications(Method m) throws Exception {
+      try {
+         final GlobalTransactionFactory gtf = new GlobalTransactionFactory();
+         final String k1 = k(m, "1"), k2 = k(m, "2"), v1 = v(m, "1"), v2 = v(m, "2");
+         final ConcurrentMap<Object, Modification> localMods = new ConcurrentHashMap<Object, Modification>();
+         final CyclicBarrier barrier = new CyclicBarrier(2);
+         DummyInMemoryCacheStore underlying = new DummyInMemoryCacheStore();
+         store = new AsyncStore(underlying, asyncConfig) {
+            @Override
+            protected void applyModificationsSync(ConcurrentMap<Object, Modification> mods) throws CacheLoaderException {
+               for (Map.Entry<Object, Modification> entry : mods.entrySet()) {
+                  localMods.put(entry.getKey(), entry.getValue());
+               }
+//               try {
+//                  barrier.await(5, TimeUnit.SECONDS);
+//               } catch (TimeoutException e) {
+//                  assert false : "Timed out waiting for modifications";
+//               } catch (Exception e) {
+//                  throw new CacheLoaderException("Barried failed", e);
+//               }
+               super.applyModificationsSync(mods);
+               try {
+                  barrier.await(5, TimeUnit.SECONDS);
+               } catch (TimeoutException e) {
+                  assert false : "Timed out applying for modifications";
+               } catch (Exception e) {
+                  throw new CacheLoaderException("Barried failed", e);
+               }
+            }
+         };
+         dummyCfg = new DummyInMemoryCacheStore.Cfg();
+         dummyCfg.setStore(m.getName());
+         store.init(dummyCfg, null, null);
+         store.start();
+
+         List<Modification> mods = new ArrayList<Modification>();
+         mods.add(new Store(InternalEntryFactory.create(k1, v1)));
+         mods.add(new Store(InternalEntryFactory.create(k2, v2)));
+         mods.add(new Remove(k1));
+         GlobalTransaction tx = gtf.newGlobalTransaction(null, false);
+         store.prepare(mods, tx, false);
+         barrier.await(5, TimeUnit.SECONDS);
+//         barrier.await(5, TimeUnit.SECONDS);
+
+         assert 1 == localMods.size();
+         assert localMods.entrySet().iterator().next().getKey() instanceof Prepare;
+         assert !store.containsKey(k1);
+         assert !store.containsKey(k2);
+
+         store.commit(tx);
+         barrier.await(5, TimeUnit.SECONDS);
+//         barrier.await(5, TimeUnit.SECONDS);
+         assert store.load(k2).getValue().equals(v2);
+         assert !store.containsKey(k1);
+      } finally {
+         store.delegate.clear();
+         store.stop();
+         store = null;
+      }
+   }
+
    private void doTestPut(int number, String key, String value) throws Exception {
       for (int i = 0; i < number; i++) store.store(InternalEntryFactory.create(key + i, value + i));
 

Modified: trunk/core/src/test/java/org/infinispan/test/TestingUtil.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/test/TestingUtil.java	2010-01-28 16:50:25 UTC (rev 1421)
+++ trunk/core/src/test/java/org/infinispan/test/TestingUtil.java	2010-01-28 17:02:33 UTC (rev 1422)
@@ -23,7 +23,6 @@
 import org.infinispan.lifecycle.ComponentStatus;
 import org.infinispan.loaders.CacheLoader;
 import org.infinispan.loaders.CacheLoaderManager;
-import org.infinispan.loaders.file.FileCacheStoreTest;
 import org.infinispan.manager.CacheManager;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.remoting.ReplicationQueue;
@@ -35,6 +34,7 @@
 import javax.transaction.TransactionManager;
 import java.io.File;
 import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -770,4 +770,22 @@
       }
       return prefix + TEST_PATH + separator + test.getClass().getSimpleName();
    }
+
+   public static String k(Method method, String index) {
+      return new StringBuilder().append("k").append(index).append('-')
+         .append(method.getName()).toString();
+   }
+
+   public static String v(Method method, String index) {
+      return new StringBuilder().append("v").append(index).append('-')
+         .append(method.getName()).toString();
+   }
+
+   public static String k(Method method) {
+      return k(method, "");
+   }
+
+   public static Object v(Method method) {
+      return v(method, "");
+   }
 }



More information about the infinispan-commits mailing list