[infinispan-commits] Infinispan SVN: r1931 - in branches/4.1.x/lucene-directory/src: test/java/org/infinispan/lucene and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Thu Jun 24 13:12:17 EDT 2010
Author: sannegrinovero
Date: 2010-06-24 13:12:16 -0400 (Thu, 24 Jun 2010)
New Revision: 1931
Added:
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java
Modified:
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
Log:
[ISPN-507] (Search performance: Improve InfinispanIndexIO.InfinispanIndexInput by tuning readBytes()) - branch 4.1.x
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java 2010-06-24 17:05:18 UTC (rev 1930)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java 2010-06-24 17:12:16 UTC (rev 1931)
@@ -74,115 +74,109 @@
private static final Log log = LogFactory.getLog(InfinispanIndexInput.class);
- private final int bufferSize;
-
private final AdvancedCache<CacheKey, Object> cache;
private final FileMetadata file;
private final FileCacheKey fileKey;
- private ConcurrentHashMap<CacheKey, Object> localCache = new ConcurrentHashMap<CacheKey, Object>();
+ private final int chunkSize;
+
+ private int currentBufferSize;
private byte[] buffer;
- private int bufferPosition = 0;
- private int filePosition = 0;
- private int lastChunkNumberLoaded = -1;
+ private int bufferPosition;
+ private int currentLoadedChunk = -1;
public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey) throws IOException {
this(cache, fileKey, InfinispanIndexIO.DEFAULT_BUFFER_SIZE);
}
- public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int bufferSize) throws IOException {
+ public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int chunkSize) throws IOException {
this.cache = cache;
this.fileKey = fileKey;
- this.bufferSize = bufferSize;
- buffer = new byte[this.bufferSize];
+ this.chunkSize = chunkSize;
// get file header from file
this.file = (FileMetadata) cache.get(fileKey);
if (file == null) {
- throw new IOException("File [ " + fileKey.getFileName() + " ] for index [ " + fileKey.getIndexName()
- + " ] was not found");
+ throw new IOException("Error loading medatada for index file: " + fileKey);
}
- // get records to local cache
- int i = 0;
- Object fileChunk;
- ChunkCacheKey chunkKey = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), i);
- while ((fileChunk = cache.get(chunkKey)) != null) {
- localCache.put(chunkKey, fileChunk);
- chunkKey = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), ++i);
- }
-
if (log.isDebugEnabled()) {
log.debug("Opened new IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
}
}
- private byte[] getChunkFromPosition(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int pos, int bufferSize) {
- if (lastChunkNumberLoaded != getChunkNumberFromPosition(filePosition, bufferSize)) {
- Object object = InfinispanIndexIO.getChunkFromPosition(cache, fileKey, pos, bufferSize);
- if (object == null) {
- object = InfinispanIndexIO.getChunkFromPosition(localCache, fileKey, pos, bufferSize);
- }
- lastChunkNumberLoaded = getChunkNumberFromPosition(filePosition, bufferSize);
- return (byte[]) object;
- } else {
- return buffer;
- }
- }
-
+ @Override
public byte readByte() throws IOException {
- buffer = getChunkFromPosition(cache, fileKey, filePosition, bufferSize);
- if (buffer == null) {
- throw new IOException("Chunk id = [ " + getChunkNumberFromPosition(filePosition, bufferSize)
- + " ] does not exist for file [ " + fileKey.getFileName() + " ] for index [ "
- + fileKey.getIndexName() + " ]");
+ if (bufferPosition >= currentBufferSize) {
+ nextChunk();
+ bufferPosition = 0;
}
-
- bufferPosition = getPositionInBuffer(filePosition++, bufferSize);
- return buffer[bufferPosition];
- }
-
+ return buffer[bufferPosition++];
+ }
+
+ @Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
int bytesToRead = len;
+ if (buffer == null) {
+ nextChunk();
+ }
while (bytesToRead > 0) {
- buffer = getChunkFromPosition(cache, fileKey, filePosition, bufferSize);
- if (buffer == null) {
- throw new IOException("Chunk id = [ " + getChunkNumberFromPosition(filePosition, bufferSize)
- + " ] does not exist for file [ " + fileKey.getFileName() + " ] for index [ "
- + fileKey.getIndexName() + " ], file position: [ " + filePosition + " ], file size: [ "
- + file.getSize() + " ]");
- }
- bufferPosition = getPositionInBuffer(filePosition, bufferSize);
- int bytesToCopy = Math.min(buffer.length - bufferPosition, bytesToRead);
+ int bytesToCopy = Math.min(currentBufferSize - bufferPosition, bytesToRead);
System.arraycopy(buffer, bufferPosition, b, offset, bytesToCopy);
offset += bytesToCopy;
bytesToRead -= bytesToCopy;
- filePosition += bytesToCopy;
+ bufferPosition += bytesToCopy;
+ if (bufferPosition >= currentBufferSize && bytesToRead > 0) {
+ nextChunk();
+ bufferPosition = 0;
+ }
}
}
+ @Override
public void close() throws IOException {
- filePosition = 0;
+ currentBufferSize = 0;
bufferPosition = 0;
- lastChunkNumberLoaded = -1;
+ currentLoadedChunk = -1;
buffer = null;
- localCache = null;
if (log.isDebugEnabled()) {
log.debug("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
}
}
public long getFilePointer() {
- return filePosition;
+ return ((long)currentLoadedChunk) * chunkSize + bufferPosition;
}
+ @Override
public void seek(long pos) throws IOException {
- filePosition = (int) pos;
+ bufferPosition = (int)( pos % chunkSize );
+ int targetChunk = (int) (pos / chunkSize);
+ if (targetChunk != currentLoadedChunk) {
+ currentLoadedChunk = targetChunk;
+ setBufferToCurrentChunk();
+ }
}
+
+ private void nextChunk() throws IOException {
+ currentLoadedChunk++;
+ setBufferToCurrentChunk();
+ }
+ private void setBufferToCurrentChunk() throws IOException {
+ CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+ buffer = (byte[]) cache.get(key);
+ if (buffer == null) {
+ throw new IOException("Chunk value could not be found for key " + key);
+ }
+ currentBufferSize = buffer.length;
+ }
+
+ @Override
public long length() {
return file.getSize();
}
+
}
/**
Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java 2010-06-24 17:05:18 UTC (rev 1930)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java 2010-06-24 17:12:16 UTC (rev 1931)
@@ -113,7 +113,9 @@
//Now it reads some random byte and it compares to the expected byte
for (int i = 0; i < FILE_SIZE; i++) {
if(seekPoint == i) {
- assert bytesGenerator.nextByte() == indexInput.readByte();
+ byte expectedByte = bytesGenerator.nextByte();
+ byte actualByte = indexInput.readByte();
+ assert expectedByte == actualByte;
seekPoint = indexInput.getFilePointer() + r.nextInt(10);
indexInput.seek(seekPoint);
} else {
Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java 2010-06-24 17:12:16 UTC (rev 1931)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.profiling;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.RAMDirectory;
+import org.infinispan.Cache;
+import org.infinispan.lucene.CacheKey;
+import org.infinispan.lucene.CacheTestSupport;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.lucene.testutils.ClusteredCacheFactory;
+import org.infinispan.lucene.testutils.LuceneSettings;
+import org.infinispan.manager.CacheContainer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This is a stress test meant to compare relative performance of RAMDirectory, FSDirectory,
+ * Infinispan local Directory, clustered.
+ * Focuses on Search performance; an index is built before the performance measurement is started and is not
+ * changed during the searches.
+ * To use it set a DURATION_MS as long as you can afford; choose thread number and terms number
+ * according to your use case as they will affect the results.
+ *
+ * @author Sanne Grinovero
+ * @since 4.0
+ */
+ at Test(groups = "profiling", testName = "lucene.profiling.IndexReadingStressTest")
+public class IndexReadingStressTest {
+
+ /** Concurrent IndexSearchers used during tests */
+ private static final int THREADS = 20;
+
+ /** Test duration **/
+ private static final long DURATION_MS = 350000;
+
+ /** Number of Terms written in the index **/
+ private static final int TERMS_NUMBER = 200000;
+
+ private static final ClusteredCacheFactory cacheFactory = new ClusteredCacheFactory(CacheTestSupport.createTestConfiguration());
+
+ @Test
+ public void profileTestRAMDirectory() throws InterruptedException, IOException {
+ RAMDirectory dir = new RAMDirectory();
+ testDirectory(dir, "RAMDirectory");
+ }
+
+ @Test
+ public void profileTestFSDirectory() throws InterruptedException, IOException {
+ File indexDir = new File(new File("."), "tempindex");
+ indexDir.mkdirs();
+ FSDirectory dir = FSDirectory.open(indexDir);
+ testDirectory(dir, "FSDirectory");
+ }
+
+ @Test
+ public void profileTestInfinispanDirectory() throws InterruptedException, IOException {
+ //theses default are not for performance settings but meant for problem detection:
+ Cache<CacheKey,Object> cache = cacheFactory.createClusteredCache();
+ InfinispanDirectory dir = new InfinispanDirectory(cache, "iname");
+ testDirectory(dir, "InfinispanClustered");
+ }
+
+ @Test
+ public void profileInfinispanLocalDirectory() throws InterruptedException, IOException {
+ CacheContainer cacheManager = CacheTestSupport.createLocalCacheManager();
+ try {
+ Cache<CacheKey, Object> cache = cacheManager.getCache();
+ InfinispanDirectory dir = new InfinispanDirectory(cache, "iname");
+ testDirectory(dir, "InfinispanLocal");
+ } finally {
+ cacheManager.stop();
+ }
+ }
+
+ private void testDirectory(Directory dir, String testLabel) throws InterruptedException, IOException {
+ SharedState state = fillDirectory(dir);
+ ExecutorService e = Executors.newFixedThreadPool(THREADS);
+ for (int i=0; i<THREADS; i++) {
+ e.execute(new LuceneReaderThread(dir, state));
+ }
+ e.shutdown();
+ state.startWaitingThreads();
+ Thread.sleep(DURATION_MS);
+ long searchesCount = state.incrementIndexSearchesCount(0);
+ long writerTaskCount = state.incrementIndexWriterTaskCount(0);
+ state.quit();
+ e.awaitTermination(10, TimeUnit.SECONDS);
+ System.out.println(
+ "Test " + testLabel +" run in " + DURATION_MS + "ms:\n\tSearches: " + searchesCount + "\n\t" + "Writes: " + writerTaskCount);
+ }
+
+ private SharedState fillDirectory(Directory directory) throws CorruptIndexException, LockObtainFailedException, IOException {
+ CacheTestSupport.initializeDirectory(directory);
+ SharedState state = new SharedState(0);
+ IndexWriter iwriter = LuceneSettings.openWriter(directory, 100000);
+ for (int i = 0; i <= TERMS_NUMBER; i++) {
+ Document doc = new Document();
+ String term = String.valueOf(i);
+ state.addStringWrittenToIndex(term);
+ doc.add(new Field("main", term, Store.NO, Index.NOT_ANALYZED));
+ iwriter.addDocument(doc);
+ }
+ iwriter.commit();
+ iwriter.close();
+ return state;
+ }
+
+ @BeforeClass
+ public static void beforeTest() {
+ cacheFactory.start();
+ }
+
+ @AfterClass
+ public static void afterTest() {
+ cacheFactory.stop();
+ }
+
+}
More information about the infinispan-commits
mailing list