[infinispan-commits] Infinispan SVN: r1405 - in trunk/core/src: main/java/org/infinispan/statetransfer and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jan 21 09:37:48 EST 2010


Author: galder.zamarreno at jboss.com
Date: 2010-01-21 09:37:48 -0500 (Thu, 21 Jan 2010)
New Revision: 1405

Added:
   trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
   trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java
Modified:
   trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
   trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
Log:
[ISPN-335] (EOFException when fetchPersistentState) Fixed by making sure that when buckets are loaded, we signal the underlying marshaller that this call could be reentrant.

Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java	2010-01-21 10:50:02 UTC (rev 1404)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java	2010-01-21 14:37:48 UTC (rev 1405)
@@ -3,6 +3,7 @@
 import org.infinispan.Cache;
 import org.infinispan.config.ConfigurationException;
 import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.io.ExposedByteArrayOutputStream;
 import org.infinispan.loaders.CacheLoaderConfig;
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.bucket.Bucket;
@@ -14,10 +15,12 @@
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.HashSet;
@@ -29,6 +32,7 @@
  *
  * @author Manik Surtani
  * @author Mircea.Markus at jboss.com
+ * 
  * @since 4.0
  */
 public class FileCacheStore extends BucketBasedCacheStore {
@@ -112,6 +116,7 @@
             BufferedInputStream bis = null;
             FileInputStream fileInStream = null;
             try {
+               if (trace) log.trace("Opening file in {0}", file);
                fileInStream = new FileInputStream(file);
                int sz = fileInStream.available();
                bis = new BufferedInputStream(fileInStream);
@@ -140,7 +145,7 @@
          return;
       }
       for (File f : toDelete) {
-         if (!f.delete()) {
+         if (!deleteFile(f)) {
             log.warn("Had problems removing file {0}", f);
          }
       }
@@ -191,7 +196,7 @@
          FileInputStream is = null;
          try {
             is = new FileInputStream(bucketFile);
-            bucket = (Bucket) marshaller.objectFromInputStream(is);
+            bucket = (Bucket) objectFromInputStreamInReentrantMode(is);
          } catch (Exception e) {
             String message = "Error while reading from file: " + bucketFile.getAbsoluteFile();
             log.error(message, e);
@@ -213,7 +218,7 @@
    public void updateBucket(Bucket b) throws CacheLoaderException {
       File f = new File(root, b.getBucketName());
       if (f.exists()) {
-         if (!f.delete()) log.warn("Had problems removing file {0}", f);
+         if (!deleteFile(f)) log.warn("Had problems removing file {0}", f);
       } else if (log.isTraceEnabled()) {
          log.trace("Successfully deleted file: '" + f.getName() + "'");
       }
@@ -260,4 +265,26 @@
    public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
       return loadBucket(key.hashCode() + "");
    }
+
+   private boolean deleteFile(File f) {
+      if (trace) log.trace("Really delete file {0}", f);
+      return f.delete();
+   }
+
+   private Object objectFromInputStreamInReentrantMode(InputStream is) throws IOException, ClassNotFoundException {
+      int len = is.available();
+      ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len);
+      byte[] buf = new byte[Math.min(len, 1024)];
+      int bytesRead;
+      while ((bytesRead = is.read(buf, 0, buf.length)) != -1) bytes.write(buf, 0, bytesRead);
+      is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size());
+      ObjectInput unmarshaller = marshaller.startObjectInput(is, true);
+      Object o = null;
+      try {
+         o = marshaller.objectFromObjectStream(unmarshaller);
+      } finally {
+         marshaller.finishObjectInput(unmarshaller);
+      }
+      return o;
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java	2010-01-21 10:50:02 UTC (rev 1404)
+++ trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java	2010-01-21 14:37:48 UTC (rev 1405)
@@ -363,6 +363,7 @@
    private void generatePersistentState(ObjectOutput oo) throws StateTransferException {
       try {
          // always use the unclosable stream delegate to ensure the impl doesn't close the stream
+         if (trace) log.trace("Generate persistent state");
          cs.toStream(new UnclosableObjectOutputStream(oo));
       } catch (CacheLoaderException cle) {
          throw new StateTransferException(cle);

Added: trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java	2010-01-21 14:37:48 UTC (rev 1405)
@@ -0,0 +1,285 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.statetransfer;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.infinispan.Cache;
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.file.FileCacheStoreConfig;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Optional;
+import org.testng.annotations.Parameters;
+import org.testng.annotations.Test;
+
+/**
+ * StateTransferFileCacheStoreFunctionalTest.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "statetransfer.StateTransferFileCacheLoaderFunctionalTest")
+public class StateTransferFileCacheLoaderFunctionalTest extends MultipleCacheManagersTest {
+   static final Log log = LogFactory.getLog(StateTransferFileCacheLoaderFunctionalTest.class);
+   static String cacheName = "nbst-with-file-loader";
+   volatile int testCount = 0;
+   ThreadLocal<Boolean> sharedCacheLoader = new ThreadLocal<Boolean>() {
+      protected Boolean initialValue() {
+         return false;
+      }
+   };
+   String tmpDirectory1;
+   String tmpDirectory2;
+   String tmpDirectory3;
+   String tmpDirectory4;
+
+   Configuration config;
+
+   @BeforeTest
+   @Parameters({"basedir"})
+   protected void setUpTempDir(@Optional(value = "/tmp") String basedir) {
+      tmpDirectory1 = basedir + TestingUtil.TEST_PATH + File.separator + "1" + File.separator + getClass().getSimpleName();
+      tmpDirectory2 = basedir + TestingUtil.TEST_PATH + File.separator + "2" + File.separator + getClass().getSimpleName();
+      tmpDirectory3 = basedir + TestingUtil.TEST_PATH + File.separator + "3" + File.separator + getClass().getSimpleName();
+      tmpDirectory4 = basedir + TestingUtil.TEST_PATH + File.separator + "4" + File.separator + getClass().getSimpleName();
+   }
+
+   @AfterMethod(alwaysRun = true)
+   protected void clearTempDir() {
+      TestingUtil.recursiveFileRemove(tmpDirectory1);
+      new File(tmpDirectory1).mkdirs();
+      TestingUtil.recursiveFileRemove(tmpDirectory2);
+      new File(tmpDirectory2).mkdirs();
+      TestingUtil.recursiveFileRemove(tmpDirectory3);
+      new File(tmpDirectory3).mkdirs();
+      TestingUtil.recursiveFileRemove(tmpDirectory4);
+      new File(tmpDirectory4).mkdirs();
+   }
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      // This impl only really sets up a configuration for use later.
+      config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+      config.setSyncReplTimeout(30000);
+      config.setFetchInMemoryState(true);
+      config.setUseLockStriping(false); // reduces the odd chance of a key collision and deadlock
+   }
+
+   protected CacheManager createCacheManager(String tmpDirectory) {
+      // increment the DIMCS store id
+      FileCacheStoreConfig cfg = new FileCacheStoreConfig();
+      cfg.setLocation(tmpDirectory);
+      cfg.setPurgeSynchronously(true); // for more accurate unit testing
+      cfg.setFetchPersistentState(true);
+
+      CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+      clmc.addCacheLoaderConfig(cfg);
+      clmc.setShared(sharedCacheLoader.get());
+      config.setCacheLoaderManagerConfig(clmc);
+
+      CacheManager cm = addClusterEnabledCacheManager();
+      cm.defineConfiguration(cacheName, config.clone());
+      return cm;
+   }
+
+   public void testSharedLoader() throws Exception {
+      CacheManager cm1 = null, cm2 = null;
+      try {
+         sharedCacheLoader.set(true);
+         cm1 = createCacheManager(tmpDirectory1);
+         Cache c1 = cm1.getCache(cacheName);
+         StateTransferTestingUtil.verifyNoDataOnLoader(c1);
+         StateTransferTestingUtil.verifyNoData(c1);
+         StateTransferTestingUtil.writeInitialData(c1);
+
+         // starting the second cache would initialize an in-memory state transfer but not a persistent one since the loader is shared
+         cm2 = createCacheManager(tmpDirectory2);
+         Cache c2 = cm2.getCache(cacheName);
+
+         TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+         StateTransferTestingUtil.verifyInitialDataOnLoader(c1);
+         StateTransferTestingUtil.verifyInitialData(c1);
+
+         StateTransferTestingUtil.verifyNoDataOnLoader(c2);
+         StateTransferTestingUtil.verifyNoData(c2);
+      } finally {
+         if (cm1 != null) cm1.stop();
+         if (cm2 != null) cm2.stop();
+         sharedCacheLoader.set(false);
+      }
+   }
+
+   public void testInitialStateTransfer() throws Exception {
+      testCount++;
+      log.info("testInitialStateTransfer start - " + testCount);
+      CacheManager cm1 = null, cm2 = null;
+      try {
+         Cache<Object, Object> cache1, cache2;
+         cm1 = createCacheManager(tmpDirectory1);
+         cache1 = cm1.getCache(cacheName);
+         StateTransferTestingUtil.writeInitialData(cache1);
+
+         cm2 = createCacheManager(tmpDirectory2);
+         cache2 = cm2.getCache(cacheName);
+
+         // Pause to give caches time to see each other
+         TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+         StateTransferTestingUtil.verifyInitialData(cache2);
+         log.info("testInitialStateTransfer end - " + testCount);
+      } finally {
+         if (cm1 != null) cm1.stop();
+         if (cm2 != null) cm2.stop();
+      }
+   }
+
+   public void testInitialStateTransferInDifferentThread(Method m) throws Exception {
+      testCount++;
+      log.info(m.getName() + " start - " + testCount);
+      CacheManager cm1 = null, cm2 = null, cm30 = null;
+      try {
+         Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null;
+         cm1 = createCacheManager(tmpDirectory1);
+         cache1 = cm1.getCache(cacheName);
+         StateTransferTestingUtil.writeInitialData(cache1);
+
+         cm2 = createCacheManager(tmpDirectory2);
+         cache2 = cm2.getCache(cacheName);
+
+         cache1.put("delay", new StateTransferFunctionalTest.DelayTransfer());
+
+         // Pause to give caches time to see each other
+         TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+         StateTransferTestingUtil.verifyInitialData(cache2);
+
+         final CacheManager cm3 = createCacheManager(tmpDirectory3);
+
+         cm30 = cm3;
+
+         Future<Void> f1 = Executors.newSingleThreadExecutor(new ThreadFactory() {
+            public Thread newThread(Runnable r) {
+               return new Thread(r, "CacheStarter-Cache3");
+            }
+         }).submit(new Callable<Void>() {
+            public Void call() throws Exception {
+               cm3.getCache(cacheName);
+               return null;
+            }
+         });
+
+         f1.get();
+
+         cache3 = cm3.getCache(cacheName);
+
+         TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3);
+         StateTransferTestingUtil.verifyInitialData(cache3);
+         log.info("testConcurrentStateTransfer end - " + testCount);
+      } finally {
+         if (cm1 != null) cm1.stop();
+         if (cm2 != null) cm2.stop();
+         if (cm30 != null) cm30.stop();
+      }
+   }
+
+//   @Override
+//   public void testConcurrentStateTransfer() throws Exception {
+//      testCount++;
+//      log.info("testConcurrentStateTransfer start - " + testCount);
+//      CacheManager cm1 = null, cm2 = null, cm30 = null, cm40 = null;
+//      try {
+//         Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4 = null;
+//         cm1 = createCacheManager(tmpDirectory1);
+//         cache1 = cm1.getCache(cacheName);
+//         writeInitialData(cache1);
+//
+//         cm2 = createCacheManager(tmpDirectory2);
+//         cache2 = cm2.getCache(cacheName);
+//
+//         cache1.put("delay", new StateTransferFunctionalTest.DelayTransfer());
+//
+//         // Pause to give caches time to see each other
+//         TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+//         verifyInitialData(cache2);
+//
+//         final CacheManager cm3 = createCacheManager(tmpDirectory3);
+////         final CacheManager cm4 = createCacheManager(tmpDirectory4);
+//
+//         cm30 = cm3;
+////         cm40 = cm4;
+//
+//         Future<Void> f1 = Executors.newSingleThreadExecutor(new ThreadFactory() {
+//            public Thread newThread(Runnable r) {
+//               return new Thread(r, "CacheStarter-Cache3");
+//            }
+//         }).submit(new Callable<Void>() {
+//            public Void call() throws Exception {
+//               cm3.getCache(cacheName);
+//               return null;
+//            }
+//         });
+//
+////         Future<Void> f2 = Executors.newSingleThreadExecutor(new ThreadFactory() {
+////            public Thread newThread(Runnable r) {
+////               return new Thread(r, "CacheStarter-Cache4");
+////            }
+////         }).submit(new Callable<Void>() {
+////            public Void call() throws Exception {
+////               cm4.getCache(cacheName);
+////               return null;
+////            }
+////         });
+//
+//         f1.get();
+////         f2.get();
+//
+//         cache3 = cm3.getCache(cacheName);
+////         cache4 = cm4.getCache(cacheName);
+//
+//         TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3);
+////         TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3, cache4);
+//         verifyInitialData(cache3);
+////         verifyInitialData(cache4);
+//         log.info("testConcurrentStateTransfer end - " + testCount);
+//      } finally {
+//         if (cm1 != null) cm1.stop();
+//         if (cm2 != null) cm2.stop();
+//         if (cm30 != null) cm30.stop();
+////         if (cm40 != null) cm40.stop();
+//      }
+//   }
+
+
+}

Added: trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java	                        (rev 0)
+++ trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java	2010-01-21 14:37:48 UTC (rev 1405)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.statetransfer;
+
+import org.infinispan.Cache;
+import org.infinispan.loaders.CacheLoader;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.test.TestingUtil;
+
+/**
+ * StateTransferTestingUtil.
+ * 
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class StateTransferTestingUtil {
+   public static final String A_B_NAME = "a_b_name";
+   public static final String A_C_NAME = "a_c_name";
+   public static final String A_D_NAME = "a_d_age";
+   public static final String A_B_AGE = "a_b_age";
+   public static final String A_C_AGE = "a_c_age";
+   public static final String A_D_AGE = "a_d_age";
+   public static final String JOE = "JOE";
+   public static final String BOB = "BOB";
+   public static final String JANE = "JANE";
+   public static final Integer TWENTY = 20;
+   public static final Integer FORTY = 40;
+
+   public static void verifyNoDataOnLoader(Cache<Object, Object> c) throws Exception {
+      CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+      assert !l.containsKey(A_B_AGE);
+      assert !l.containsKey(A_B_NAME);
+      assert !l.containsKey(A_C_AGE);
+      assert !l.containsKey(A_C_NAME);
+      assert !l.containsKey(A_D_AGE);
+      assert !l.containsKey(A_D_NAME);
+   }
+
+   public static void verifyNoData(Cache<Object, Object> c) {
+      assert c.isEmpty() : "Cache should be empty!";
+   }
+
+   public static void writeInitialData(final Cache<Object, Object> c) {
+      c.put(A_B_NAME, JOE);
+      c.put(A_B_AGE, TWENTY);
+      c.put(A_C_NAME, BOB);
+      c.put(A_C_AGE, FORTY);
+      c.evict(A_B_NAME);
+      c.evict(A_B_AGE);
+      c.evict(A_C_NAME);
+      c.evict(A_C_AGE);
+      c.evict(A_D_NAME);
+      c.evict(A_D_AGE);
+   }
+
+   public static void verifyInitialDataOnLoader(Cache<Object, Object> c) throws Exception {
+      CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+      assert l.containsKey(A_B_AGE);
+      assert l.containsKey(A_B_NAME);
+      assert l.containsKey(A_C_AGE);
+      assert l.containsKey(A_C_NAME);
+      assert l.load(A_B_AGE).getValue().equals(TWENTY);
+      assert l.load(A_B_NAME).getValue().equals(JOE);
+      assert l.load(A_C_AGE).getValue().equals(FORTY);
+      assert l.load(A_C_NAME).getValue().equals(BOB);
+   }
+
+   public static void verifyInitialData(Cache<Object, Object> c) {
+      assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
+      assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
+      assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
+      assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
+   }
+}



More information about the infinispan-commits mailing list