[infinispan-commits] Infinispan SVN: r1405 - in trunk/core/src: main/java/org/infinispan/statetransfer and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Jan 21 09:37:48 EST 2010
Author: galder.zamarreno at jboss.com
Date: 2010-01-21 09:37:48 -0500 (Thu, 21 Jan 2010)
New Revision: 1405
Added:
trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java
Modified:
trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
Log:
[ISPN-335] (EOFException when fetchPersistentState) Fixed by making sure that when buckets are loaded, we signal the underlying marshaller that this call could be reentrant.
Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java 2010-01-21 10:50:02 UTC (rev 1404)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java 2010-01-21 14:37:48 UTC (rev 1405)
@@ -3,6 +3,7 @@
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.io.ExposedByteArrayOutputStream;
import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.bucket.Bucket;
@@ -14,10 +15,12 @@
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashSet;
@@ -29,6 +32,7 @@
*
* @author Manik Surtani
* @author Mircea.Markus at jboss.com
+ *
* @since 4.0
*/
public class FileCacheStore extends BucketBasedCacheStore {
@@ -112,6 +116,7 @@
BufferedInputStream bis = null;
FileInputStream fileInStream = null;
try {
+ if (trace) log.trace("Opening file in {0}", file);
fileInStream = new FileInputStream(file);
int sz = fileInStream.available();
bis = new BufferedInputStream(fileInStream);
@@ -140,7 +145,7 @@
return;
}
for (File f : toDelete) {
- if (!f.delete()) {
+ if (!deleteFile(f)) {
log.warn("Had problems removing file {0}", f);
}
}
@@ -191,7 +196,7 @@
FileInputStream is = null;
try {
is = new FileInputStream(bucketFile);
- bucket = (Bucket) marshaller.objectFromInputStream(is);
+ bucket = (Bucket) objectFromInputStreamInReentrantMode(is);
} catch (Exception e) {
String message = "Error while reading from file: " + bucketFile.getAbsoluteFile();
log.error(message, e);
@@ -213,7 +218,7 @@
public void updateBucket(Bucket b) throws CacheLoaderException {
File f = new File(root, b.getBucketName());
if (f.exists()) {
- if (!f.delete()) log.warn("Had problems removing file {0}", f);
+ if (!deleteFile(f)) log.warn("Had problems removing file {0}", f);
} else if (log.isTraceEnabled()) {
log.trace("Successfully deleted file: '" + f.getName() + "'");
}
@@ -260,4 +265,26 @@
public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
return loadBucket(key.hashCode() + "");
}
+
+ private boolean deleteFile(File f) {
+ if (trace) log.trace("Really delete file {0}", f);
+ return f.delete();
+ }
+
+ private Object objectFromInputStreamInReentrantMode(InputStream is) throws IOException, ClassNotFoundException {
+ int len = is.available();
+ ExposedByteArrayOutputStream bytes = new ExposedByteArrayOutputStream(len);
+ byte[] buf = new byte[Math.min(len, 1024)];
+ int bytesRead;
+ while ((bytesRead = is.read(buf, 0, buf.length)) != -1) bytes.write(buf, 0, bytesRead);
+ is = new ByteArrayInputStream(bytes.getRawBuffer(), 0, bytes.size());
+ ObjectInput unmarshaller = marshaller.startObjectInput(is, true);
+ Object o = null;
+ try {
+ o = marshaller.objectFromObjectStream(unmarshaller);
+ } finally {
+ marshaller.finishObjectInput(unmarshaller);
+ }
+ return o;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java 2010-01-21 10:50:02 UTC (rev 1404)
+++ trunk/core/src/main/java/org/infinispan/statetransfer/StateTransferManagerImpl.java 2010-01-21 14:37:48 UTC (rev 1405)
@@ -363,6 +363,7 @@
private void generatePersistentState(ObjectOutput oo) throws StateTransferException {
try {
// always use the unclosable stream delegate to ensure the impl doesn't close the stream
+ if (trace) log.trace("Generate persistent state");
cs.toStream(new UnclosableObjectOutputStream(oo));
} catch (CacheLoaderException cle) {
throw new StateTransferException(cle);
Added: trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferFileCacheLoaderFunctionalTest.java 2010-01-21 14:37:48 UTC (rev 1405)
@@ -0,0 +1,285 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.statetransfer;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import org.infinispan.Cache;
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.file.FileCacheStoreConfig;
+import org.infinispan.manager.CacheManager;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.infinispan.test.TestingUtil;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Optional;
+import org.testng.annotations.Parameters;
+import org.testng.annotations.Test;
+
+/**
+ * StateTransferFileCacheStoreFunctionalTest.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+ at Test(groups = "functional", testName = "statetransfer.StateTransferFileCacheLoaderFunctionalTest")
+public class StateTransferFileCacheLoaderFunctionalTest extends MultipleCacheManagersTest {
+ static final Log log = LogFactory.getLog(StateTransferFileCacheLoaderFunctionalTest.class);
+ static String cacheName = "nbst-with-file-loader";
+ volatile int testCount = 0;
+ ThreadLocal<Boolean> sharedCacheLoader = new ThreadLocal<Boolean>() {
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+ String tmpDirectory1;
+ String tmpDirectory2;
+ String tmpDirectory3;
+ String tmpDirectory4;
+
+ Configuration config;
+
+ @BeforeTest
+ @Parameters({"basedir"})
+ protected void setUpTempDir(@Optional(value = "/tmp") String basedir) {
+ tmpDirectory1 = basedir + TestingUtil.TEST_PATH + File.separator + "1" + File.separator + getClass().getSimpleName();
+ tmpDirectory2 = basedir + TestingUtil.TEST_PATH + File.separator + "2" + File.separator + getClass().getSimpleName();
+ tmpDirectory3 = basedir + TestingUtil.TEST_PATH + File.separator + "3" + File.separator + getClass().getSimpleName();
+ tmpDirectory4 = basedir + TestingUtil.TEST_PATH + File.separator + "4" + File.separator + getClass().getSimpleName();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ protected void clearTempDir() {
+ TestingUtil.recursiveFileRemove(tmpDirectory1);
+ new File(tmpDirectory1).mkdirs();
+ TestingUtil.recursiveFileRemove(tmpDirectory2);
+ new File(tmpDirectory2).mkdirs();
+ TestingUtil.recursiveFileRemove(tmpDirectory3);
+ new File(tmpDirectory3).mkdirs();
+ TestingUtil.recursiveFileRemove(tmpDirectory4);
+ new File(tmpDirectory4).mkdirs();
+ }
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ // This impl only really sets up a configuration for use later.
+ config = getDefaultClusteredConfig(Configuration.CacheMode.REPL_SYNC, true);
+ config.setSyncReplTimeout(30000);
+ config.setFetchInMemoryState(true);
+ config.setUseLockStriping(false); // reduces the odd chance of a key collision and deadlock
+ }
+
+ protected CacheManager createCacheManager(String tmpDirectory) {
+ // increment the DIMCS store id
+ FileCacheStoreConfig cfg = new FileCacheStoreConfig();
+ cfg.setLocation(tmpDirectory);
+ cfg.setPurgeSynchronously(true); // for more accurate unit testing
+ cfg.setFetchPersistentState(true);
+
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ clmc.addCacheLoaderConfig(cfg);
+ clmc.setShared(sharedCacheLoader.get());
+ config.setCacheLoaderManagerConfig(clmc);
+
+ CacheManager cm = addClusterEnabledCacheManager();
+ cm.defineConfiguration(cacheName, config.clone());
+ return cm;
+ }
+
+ public void testSharedLoader() throws Exception {
+ CacheManager cm1 = null, cm2 = null;
+ try {
+ sharedCacheLoader.set(true);
+ cm1 = createCacheManager(tmpDirectory1);
+ Cache c1 = cm1.getCache(cacheName);
+ StateTransferTestingUtil.verifyNoDataOnLoader(c1);
+ StateTransferTestingUtil.verifyNoData(c1);
+ StateTransferTestingUtil.writeInitialData(c1);
+
+ // starting the second cache would initialize an in-memory state transfer but not a persistent one since the loader is shared
+ cm2 = createCacheManager(tmpDirectory2);
+ Cache c2 = cm2.getCache(cacheName);
+
+ TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+ StateTransferTestingUtil.verifyInitialDataOnLoader(c1);
+ StateTransferTestingUtil.verifyInitialData(c1);
+
+ StateTransferTestingUtil.verifyNoDataOnLoader(c2);
+ StateTransferTestingUtil.verifyNoData(c2);
+ } finally {
+ if (cm1 != null) cm1.stop();
+ if (cm2 != null) cm2.stop();
+ sharedCacheLoader.set(false);
+ }
+ }
+
+ public void testInitialStateTransfer() throws Exception {
+ testCount++;
+ log.info("testInitialStateTransfer start - " + testCount);
+ CacheManager cm1 = null, cm2 = null;
+ try {
+ Cache<Object, Object> cache1, cache2;
+ cm1 = createCacheManager(tmpDirectory1);
+ cache1 = cm1.getCache(cacheName);
+ StateTransferTestingUtil.writeInitialData(cache1);
+
+ cm2 = createCacheManager(tmpDirectory2);
+ cache2 = cm2.getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+ StateTransferTestingUtil.verifyInitialData(cache2);
+ log.info("testInitialStateTransfer end - " + testCount);
+ } finally {
+ if (cm1 != null) cm1.stop();
+ if (cm2 != null) cm2.stop();
+ }
+ }
+
+ public void testInitialStateTransferInDifferentThread(Method m) throws Exception {
+ testCount++;
+ log.info(m.getName() + " start - " + testCount);
+ CacheManager cm1 = null, cm2 = null, cm30 = null;
+ try {
+ Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null;
+ cm1 = createCacheManager(tmpDirectory1);
+ cache1 = cm1.getCache(cacheName);
+ StateTransferTestingUtil.writeInitialData(cache1);
+
+ cm2 = createCacheManager(tmpDirectory2);
+ cache2 = cm2.getCache(cacheName);
+
+ cache1.put("delay", new StateTransferFunctionalTest.DelayTransfer());
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+ StateTransferTestingUtil.verifyInitialData(cache2);
+
+ final CacheManager cm3 = createCacheManager(tmpDirectory3);
+
+ cm30 = cm3;
+
+ Future<Void> f1 = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "CacheStarter-Cache3");
+ }
+ }).submit(new Callable<Void>() {
+ public Void call() throws Exception {
+ cm3.getCache(cacheName);
+ return null;
+ }
+ });
+
+ f1.get();
+
+ cache3 = cm3.getCache(cacheName);
+
+ TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3);
+ StateTransferTestingUtil.verifyInitialData(cache3);
+ log.info("testConcurrentStateTransfer end - " + testCount);
+ } finally {
+ if (cm1 != null) cm1.stop();
+ if (cm2 != null) cm2.stop();
+ if (cm30 != null) cm30.stop();
+ }
+ }
+
+// @Override
+// public void testConcurrentStateTransfer() throws Exception {
+// testCount++;
+// log.info("testConcurrentStateTransfer start - " + testCount);
+// CacheManager cm1 = null, cm2 = null, cm30 = null, cm40 = null;
+// try {
+// Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4 = null;
+// cm1 = createCacheManager(tmpDirectory1);
+// cache1 = cm1.getCache(cacheName);
+// writeInitialData(cache1);
+//
+// cm2 = createCacheManager(tmpDirectory2);
+// cache2 = cm2.getCache(cacheName);
+//
+// cache1.put("delay", new StateTransferFunctionalTest.DelayTransfer());
+//
+// // Pause to give caches time to see each other
+// TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+// verifyInitialData(cache2);
+//
+// final CacheManager cm3 = createCacheManager(tmpDirectory3);
+//// final CacheManager cm4 = createCacheManager(tmpDirectory4);
+//
+// cm30 = cm3;
+//// cm40 = cm4;
+//
+// Future<Void> f1 = Executors.newSingleThreadExecutor(new ThreadFactory() {
+// public Thread newThread(Runnable r) {
+// return new Thread(r, "CacheStarter-Cache3");
+// }
+// }).submit(new Callable<Void>() {
+// public Void call() throws Exception {
+// cm3.getCache(cacheName);
+// return null;
+// }
+// });
+//
+//// Future<Void> f2 = Executors.newSingleThreadExecutor(new ThreadFactory() {
+//// public Thread newThread(Runnable r) {
+//// return new Thread(r, "CacheStarter-Cache4");
+//// }
+//// }).submit(new Callable<Void>() {
+//// public Void call() throws Exception {
+//// cm4.getCache(cacheName);
+//// return null;
+//// }
+//// });
+//
+// f1.get();
+//// f2.get();
+//
+// cache3 = cm3.getCache(cacheName);
+//// cache4 = cm4.getCache(cacheName);
+//
+// TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3);
+//// TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3, cache4);
+// verifyInitialData(cache3);
+//// verifyInitialData(cache4);
+// log.info("testConcurrentStateTransfer end - " + testCount);
+// } finally {
+// if (cm1 != null) cm1.stop();
+// if (cm2 != null) cm2.stop();
+// if (cm30 != null) cm30.stop();
+//// if (cm40 != null) cm40.stop();
+// }
+// }
+
+
+}
Added: trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java (rev 0)
+++ trunk/core/src/test/java/org/infinispan/statetransfer/StateTransferTestingUtil.java 2010-01-21 14:37:48 UTC (rev 1405)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat, Inc. and/or its affiliates, and
+ * individual contributors as indicated by the @author tags. See the
+ * copyright.txt file in the distribution for a full listing of
+ * individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.statetransfer;
+
+import org.infinispan.Cache;
+import org.infinispan.loaders.CacheLoader;
+import org.infinispan.loaders.CacheLoaderManager;
+import org.infinispan.test.TestingUtil;
+
+/**
+ * StateTransferTestingUtil.
+ *
+ * @author Galder Zamarreño
+ * @since 4.0
+ */
+public class StateTransferTestingUtil {
+ public static final String A_B_NAME = "a_b_name";
+ public static final String A_C_NAME = "a_c_name";
+ public static final String A_D_NAME = "a_d_age";
+ public static final String A_B_AGE = "a_b_age";
+ public static final String A_C_AGE = "a_c_age";
+ public static final String A_D_AGE = "a_d_age";
+ public static final String JOE = "JOE";
+ public static final String BOB = "BOB";
+ public static final String JANE = "JANE";
+ public static final Integer TWENTY = 20;
+ public static final Integer FORTY = 40;
+
+ public static void verifyNoDataOnLoader(Cache<Object, Object> c) throws Exception {
+ CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+ assert !l.containsKey(A_B_AGE);
+ assert !l.containsKey(A_B_NAME);
+ assert !l.containsKey(A_C_AGE);
+ assert !l.containsKey(A_C_NAME);
+ assert !l.containsKey(A_D_AGE);
+ assert !l.containsKey(A_D_NAME);
+ }
+
+ public static void verifyNoData(Cache<Object, Object> c) {
+ assert c.isEmpty() : "Cache should be empty!";
+ }
+
+ public static void writeInitialData(final Cache<Object, Object> c) {
+ c.put(A_B_NAME, JOE);
+ c.put(A_B_AGE, TWENTY);
+ c.put(A_C_NAME, BOB);
+ c.put(A_C_AGE, FORTY);
+ c.evict(A_B_NAME);
+ c.evict(A_B_AGE);
+ c.evict(A_C_NAME);
+ c.evict(A_C_AGE);
+ c.evict(A_D_NAME);
+ c.evict(A_D_AGE);
+ }
+
+ public static void verifyInitialDataOnLoader(Cache<Object, Object> c) throws Exception {
+ CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+ assert l.containsKey(A_B_AGE);
+ assert l.containsKey(A_B_NAME);
+ assert l.containsKey(A_C_AGE);
+ assert l.containsKey(A_C_NAME);
+ assert l.load(A_B_AGE).getValue().equals(TWENTY);
+ assert l.load(A_B_NAME).getValue().equals(JOE);
+ assert l.load(A_C_AGE).getValue().equals(FORTY);
+ assert l.load(A_C_NAME).getValue().equals(BOB);
+ }
+
+ public static void verifyInitialData(Cache<Object, Object> c) {
+ assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
+ assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
+ assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
+ assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
+ }
+}
More information about the infinispan-commits
mailing list