[infinispan-commits] Infinispan SVN: r2187 - in trunk/lucene-directory/src: test/java/org/infinispan/lucene and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Aug 11 05:19:57 EDT 2010
Author: sannegrinovero
Date: 2010-08-11 05:19:57 -0400 (Wed, 11 Aug 2010)
New Revision: 2187
Added:
trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
Modified:
trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
Log:
[ISPN-590] (Lucene's segment files are flushed twice using SKIP_LOCK, implement proper index commit()) - trunk
Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java 2010-08-10 16:19:29 UTC (rev 2186)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -174,16 +174,15 @@
} while (true);
// rename metadata first
- cache.startBatch();
+ boolean batching = cache.startBatch();
FileCacheKey fromKey = new FileCacheKey(indexName, from);
FileMetadata metadata = (FileMetadata) cache.remove(fromKey);
cache.put(new FileCacheKey(indexName, to), metadata);
Set<String> fileList = getFileList();
fileList.remove(from);
fileList.add(to);
- createRefCountForNewFile(to);
cache.put(fileListCacheKey, fileList);
- cache.endBatch(true);
+ if (batching) cache.endBatch(true);
// now trigger deletion of old file chunks:
FileReadLockKey fileFromReadLockKey = new FileReadLockKey(indexName, from);
@@ -214,7 +213,6 @@
FileMetadata previous = (FileMetadata) cache.putIfAbsent(key, newFileMetadata);
if (previous == null) {
// creating new file
- createRefCountForNewFile(name);
Set<String> fileList = getFileList();
fileList.add(name);
cache.put(fileListCacheKey, fileList);
@@ -224,11 +222,6 @@
}
}
- private void createRefCountForNewFile(String fileName) {
- FileReadLockKey readLockKey = new FileReadLockKey(indexName, fileName);
- cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_CACHE_STORE).put(readLockKey, Integer.valueOf(1));
- }
-
@SuppressWarnings("unchecked")
private Set<String> getFileList() {
Set<String> fileList = (Set<String>) cache.withFlags(Flag.SKIP_LOCKING).get(fileListCacheKey);
Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java 2010-08-10 16:19:29 UTC (rev 2186)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -48,6 +48,7 @@
private final FileCacheKey fileKey;
private final int chunkSize;
private final FileReadLockKey readLockKey;
+ private final boolean trace;
private int currentBufferSize;
private byte[] buffer;
@@ -64,8 +65,9 @@
final String filename = fileKey.getFileName();
this.readLockKey = new FileReadLockKey(fileKey.getIndexName(), filename);
aquireReadLock();
- if (log.isDebugEnabled()) {
- log.debug("Opened new IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
+ trace = log.isTraceEnabled();
+ if (trace) {
+ log.trace("Opened new IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
}
}
@@ -129,10 +131,10 @@
chunkKey = new ChunkCacheKey(indexName, filename, ++i);
} while (removed != null);
FileCacheKey key = new FileCacheKey(indexName, filename);
- cache.startBatch();
+// boolean batch = cache.startBatch(); //FIXME when enabling batch, org.infinispan.lucene.profiling.CacheStoreStressTest fails to remove the readLockKey
cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).remove(readLockKey);
cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).remove(key);
- cache.endBatch(true);
+// if (batch) cache.endBatch(true);
}
/**
@@ -203,8 +205,8 @@
buffer = null;
if (isClone) return;
releaseReadLock();
- if (log.isDebugEnabled()) {
- log.debug("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+ if (trace) {
+ log.trace("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
}
}
Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java 2010-08-10 16:19:29 UTC (rev 2186)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -33,6 +33,7 @@
* Responsible for writing to a <code>Directory</code>
*
* @since 4.0
+ * @author Sanne Grinovero
* @author Lukasz Moren
* @author Davide Di Somma
* @see org.apache.lucene.store.Directory
@@ -47,20 +48,24 @@
private final AdvancedCache cache;
private final FileMetadata file;
private final FileCacheKey fileKey;
+ private final boolean trace;
private byte[] buffer;
private int positionInBuffer = 0;
private long filePosition = 0;
private int currentChunkNumber = 0;
+ private boolean transactionRunning = false;
+
public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileMetadata fileMetadata) throws IOException {
this.cache = cache;
this.fileKey = fileKey;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
this.file = fileMetadata;
- if (log.isDebugEnabled()) {
- log.debug("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+ trace = log.isTraceEnabled();
+ if (trace) {
+ log.trace("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
}
}
@@ -80,7 +85,6 @@
}
}
-
private static int getPositionInBuffer(long pos, int bufferSize) {
return (int) (pos % bufferSize);
}
@@ -90,7 +94,7 @@
}
private void newChunk() throws IOException {
- flush();// save data first
+ doFlush();// save data first
currentChunkNumber++;
// check if we have to create new chunk, or get already existing in cache for modification
buffer = getChunkById(cache, fileKey, currentChunkNumber, bufferSize);
@@ -127,6 +131,15 @@
@Override
public void flush() throws IOException {
+ //flush is invoked by Lucene directly only in the before-commit phase,
+ //so that's what we begin here:
+ if ( ! transactionRunning) {
+ transactionRunning = cache.startBatch();
+ }
+ doFlush();
+ }
+
+ public void doFlush() throws IOException {
// create key for the current chunk
ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentChunkNumber);
// size changed, apply change to file header
@@ -143,12 +156,15 @@
System.arraycopy(buffer, 0, bufferToFlush, 0, newBufferSize);
}
}
- cache.startBatch();
+ boolean microbatch = false;
+ if ( ! transactionRunning) {
+ microbatch = cache.startBatch();
+ }
// add chunk to cache
cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(key, bufferToFlush);
// override existing file header with new size and last time access
cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(fileKey, file);
- cache.endBatch(true);
+ if (microbatch) cache.endBatch(true);
}
private void resizeFileIfNeeded() {
@@ -159,14 +175,18 @@
@Override
public void close() throws IOException {
- flush();
+ doFlush();
+ if (transactionRunning) {
+ //commit
+ cache.endBatch(true);
+ transactionRunning = false;
+ }
positionInBuffer = 0;
filePosition = 0;
buffer = null;
- if (log.isDebugEnabled()) {
- log.debug("Closed IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+ if (trace) {
+ log.trace("Closed IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
}
- // cache.compact(); //TODO investigate about this
}
@Override
@@ -183,7 +203,7 @@
throw new IOException(fileKey.getFileName() + ": seeking past of the file");
}
if (requestedChunkNumber != currentChunkNumber) {
- flush();
+ doFlush();
buffer = getChunkById(cache, fileKey, requestedChunkNumber, bufferSize);
currentChunkNumber = requestedChunkNumber;
}
@@ -201,4 +221,5 @@
int lastChunkNumber = (int) (file.getSize() / bufferSize);
return currentChunkNumber == lastChunkNumber;
}
+
}
Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java 2010-08-10 16:19:29 UTC (rev 2186)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -214,5 +214,23 @@
iw.commit();
iw.close();
}
-
+
+ /**
+ * Useful tool to debug the Lucene invocations into the directory;
+ * it prints a thread dump to standard output of only seven lines
+ * from the invoker.
+ *
+ * @param initialLine The label to print as first line of the stack
+ */
+ public static void dumpMicroStack(String initialLine) {
+ StackTraceElement[] stackTraceElements = Thread.getAllStackTraces().get(Thread.currentThread());
+ StringBuilder sb = new StringBuilder(initialLine);
+ for (int i = 3; i < 10; i++) {
+ sb.append("\n\t");
+ sb.append(stackTraceElements[i]);
+ }
+ sb.append("\n");
+ System.out.println(sb.toString());
+ }
+
}
Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java 2010-08-10 16:19:29 UTC (rev 2186)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -78,18 +78,18 @@
IndexOutput io = dir.createOutput(fileName);
RepeatableLongByteSequence bytesGenerator = new RepeatableLongByteSequence();
//It writes repeatable text
- int REPEATABLE_BUFFER_SIZE = 1501;
+ final int REPEATABLE_BUFFER_SIZE = 1501;
for (int i = 0; i < REPEATABLE_BUFFER_SIZE; i++) {
io.writeByte(bytesGenerator.nextByte());
}
io.flush();
//Text to write on file with repeatable text
- String someText = "This is some text";
- byte[] someTextAsBytes = someText.getBytes();
+ final String someText = "This is some text";
+ final byte[] someTextAsBytes = someText.getBytes();
//4 points in random order where writing someText: at begin of file, at end of file, within a single chunk,
//between 2 chunks
- int[] pointers = {0, 635, REPEATABLE_BUFFER_SIZE, 135};
+ final int[] pointers = {0, 635, REPEATABLE_BUFFER_SIZE, 135};
for(int i=0; i < pointers.length; i++) {
io.seek(pointers[i]);
@@ -104,23 +104,23 @@
int indexPointer = 0;
Arrays.sort(pointers);
byte[] buffer = null;
- int chunkIndex = 0;
+ int chunkIndex = -1;
//now testing the stream is equal to the produced repeatable but including the edits at pointed positions
for (int i = 0; i < REPEATABLE_BUFFER_SIZE + someTextAsBytes.length; i++) {
- if(i % BUFFER_SIZE == 0) {
- buffer = (byte[])cache.get(new ChunkCacheKey(INDEXNAME, fileName, chunkIndex++));
+ if (i % BUFFER_SIZE == 0) {
+ buffer = (byte[]) cache.get(new ChunkCacheKey(INDEXNAME, fileName, ++chunkIndex));
}
byte predictableByte = bytesGenerator.nextByte();
- if(i < pointers[indexPointer]) {
+ if (i < pointers[indexPointer]) {
//Assert predictable text
Assert.assertEquals(predictableByte, buffer[i % BUFFER_SIZE]);
- } else if(pointers[indexPointer] <= i && i < pointers[indexPointer] + someTextAsBytes.length){
+ } else if (pointers[indexPointer] <= i && i < pointers[indexPointer] + someTextAsBytes.length) {
//Assert someText
Assert.assertEquals(someTextAsBytes[i - pointers[indexPointer]], buffer[i % BUFFER_SIZE]);
}
- if(i == pointers[indexPointer] + someTextAsBytes.length) {
+ if (i == pointers[indexPointer] + someTextAsBytes.length) {
//Change pointer
indexPointer++;
}
@@ -473,6 +473,7 @@
io.writeByte((byte) 69);
io.flush();
+ io.close();
assert dir.fileExists("MyNewFile.txt");
assert null != cache.get(new ChunkCacheKey(INDEXNAME, "MyNewFile.txt", 0));
@@ -486,6 +487,7 @@
assert new String(new byte[] { 66, 69 }).equals(new String(buf).trim());
String testText = "This is some rubbish again that will span more than one chunk - one hopes. Who knows, maybe even three or four chunks.";
+ io = dir.createOutput("MyNewFile.txt");
io.seek(0);
io.writeBytes(testText.getBytes(), 0, testText.length());
io.close();
Added: trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java (rev 0)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.profiling;
+
+import java.io.IOException;
+
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.jdbc.TableManipulation;
+import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStoreConfig;
+import org.infinispan.lucene.CacheTestSupport;
+import org.infinispan.lucene.DirectoryIntegrityCheck;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.lucene.LuceneKey2StringMapper;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.test.fwk.UnitTestDatabaseManager;
+import org.testng.annotations.Test;
+
+/**
+ * Testcase verifying that the index is usable under stress even when a cachestore is configured.
+ * See ISPN-575 (Corruption in data when using a permanent store)
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class CacheStoreStressTest extends SingleCacheManagerTest {
+
+ private final ConnectionFactoryConfig connectionFactoryConfig = UnitTestDatabaseManager.getUniqueConnectionFactoryConfig();
+
+ private static final String indexName = "tempIndexName";
+
+ @Override
+ protected EmbeddedCacheManager createCacheManager() throws Exception {
+ Configuration configuration = CacheTestSupport.createTestConfiguration();
+ enableTestJdbcStorage(configuration);
+ return TestCacheManagerFactory.createClusteredCacheManager(configuration);
+ }
+
+ private void enableTestJdbcStorage(Configuration configuration) {
+ TableManipulation tm = UnitTestDatabaseManager.buildDefaultTableManipulation();
+ JdbcStringBasedCacheStoreConfig jdbcStoreConfiguration = new JdbcStringBasedCacheStoreConfig(connectionFactoryConfig, tm);
+ jdbcStoreConfiguration.setKey2StringMapperClass(LuceneKey2StringMapper.class.getName());
+ CacheLoaderManagerConfig loaderManagerConfig = configuration.getCacheLoaderManagerConfig();
+ loaderManagerConfig.setPreload(false); // TODO change after ISPN-579
+ loaderManagerConfig.addCacheLoaderConfig(jdbcStoreConfiguration);
+ }
+
+ @Test
+ public void stressTestOnStore() throws InterruptedException, IOException {
+ cache = cacheManager.getCache();
+ assert cache!=null;
+ InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
+ PerformanceCompareStressTest.stressTestDirectory(dir, "InfinispanClusteredWith-Store");
+ DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+ }
+
+}
Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java 2010-08-10 16:19:29 UTC (rev 2186)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java 2010-08-11 09:19:57 UTC (rev 2187)
@@ -129,6 +129,7 @@
}
}
+ @Test(enabled=false)//to prevent invocations from some versions of TestNG
public static void stressTestDirectory(Directory dir, String testLabel) throws InterruptedException, IOException {
SharedState state = new SharedState(DICTIONARY_SIZE);
CacheTestSupport.initializeDirectory(dir);
More information about the infinispan-commits
mailing list