[infinispan-commits] Infinispan SVN: r1931 - in branches/4.1.x/lucene-directory/src: test/java/org/infinispan/lucene and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Thu Jun 24 13:12:17 EDT 2010


Author: sannegrinovero
Date: 2010-06-24 13:12:16 -0400 (Thu, 24 Jun 2010)
New Revision: 1931

Added:
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java
Modified:
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
Log:
[ISPN-507] (Search performance: Improve InfinispanIndexIO.InfinispanIndexInput by tuning readBytes()) - branch 4.1.x

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java	2010-06-24 17:05:18 UTC (rev 1930)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexIO.java	2010-06-24 17:12:16 UTC (rev 1931)
@@ -74,115 +74,109 @@
 
       private static final Log log = LogFactory.getLog(InfinispanIndexInput.class);
 
-      private final int bufferSize;
-
       private final AdvancedCache<CacheKey, Object> cache;
       private final FileMetadata file;
       private final FileCacheKey fileKey;
-      private ConcurrentHashMap<CacheKey, Object> localCache = new ConcurrentHashMap<CacheKey, Object>();
+      private final int chunkSize;
+
+      private int currentBufferSize;
       private byte[] buffer;
-      private int bufferPosition = 0;
-      private int filePosition = 0;
-      private int lastChunkNumberLoaded = -1;
+      private int bufferPosition;
+      private int currentLoadedChunk = -1;
 
       public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey) throws IOException {
          this(cache, fileKey, InfinispanIndexIO.DEFAULT_BUFFER_SIZE);
       }
 
-      public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int bufferSize) throws IOException {
+      public InfinispanIndexInput(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int chunkSize) throws IOException {
          this.cache = cache;
          this.fileKey = fileKey;
-         this.bufferSize = bufferSize;
-         buffer = new byte[this.bufferSize];
+         this.chunkSize = chunkSize;
 
          // get file header from file
          this.file = (FileMetadata) cache.get(fileKey);
 
          if (file == null) {
-            throw new IOException("File [ " + fileKey.getFileName() + " ] for index [ " + fileKey.getIndexName()
-                     + " ] was not found");
+            throw new IOException("Error loading medatada for index file: " + fileKey);
          }
 
-         // get records to local cache
-         int i = 0;
-         Object fileChunk;
-         ChunkCacheKey chunkKey = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), i);
-         while ((fileChunk = cache.get(chunkKey)) != null) {
-            localCache.put(chunkKey, fileChunk);
-            chunkKey = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), ++i);
-         }
-
          if (log.isDebugEnabled()) {
             log.debug("Opened new IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
          }
       }
 
-      private byte[] getChunkFromPosition(AdvancedCache<CacheKey, Object> cache, FileCacheKey fileKey, int pos, int bufferSize) {
-         if (lastChunkNumberLoaded != getChunkNumberFromPosition(filePosition, bufferSize)) {
-            Object object = InfinispanIndexIO.getChunkFromPosition(cache, fileKey, pos, bufferSize);
-            if (object == null) {
-               object = InfinispanIndexIO.getChunkFromPosition(localCache, fileKey, pos, bufferSize);
-            }
-            lastChunkNumberLoaded = getChunkNumberFromPosition(filePosition, bufferSize);
-            return (byte[]) object;
-         } else {
-            return buffer;
-         }
-      }
-
+      @Override
       public byte readByte() throws IOException {
-         buffer = getChunkFromPosition(cache, fileKey, filePosition, bufferSize);
-         if (buffer == null) {
-            throw new IOException("Chunk id = [ " + getChunkNumberFromPosition(filePosition, bufferSize)
-                     + " ] does not exist for file [ " + fileKey.getFileName() + " ] for index [ "
-                     + fileKey.getIndexName() + " ]");
+         if (bufferPosition >= currentBufferSize) {
+            nextChunk();
+            bufferPosition = 0;
          }
-
-         bufferPosition = getPositionInBuffer(filePosition++, bufferSize);
-         return buffer[bufferPosition];
-      }
-
+         return buffer[bufferPosition++];
+       }
+      
+      @Override
       public void readBytes(byte[] b, int offset, int len) throws IOException {
          int bytesToRead = len;
+         if (buffer == null) {
+            nextChunk();
+         }
          while (bytesToRead > 0) {
-            buffer = getChunkFromPosition(cache, fileKey, filePosition, bufferSize);
-            if (buffer == null) {
-               throw new IOException("Chunk id = [ " + getChunkNumberFromPosition(filePosition, bufferSize)
-                        + " ] does not exist for file [ " + fileKey.getFileName() + " ] for index [ "
-                        + fileKey.getIndexName() + " ], file position: [ " + filePosition + " ], file size: [ "
-                        + file.getSize() + " ]");
-            }
-            bufferPosition = getPositionInBuffer(filePosition, bufferSize);
-            int bytesToCopy = Math.min(buffer.length - bufferPosition, bytesToRead);
+            int bytesToCopy = Math.min(currentBufferSize - bufferPosition, bytesToRead);
             System.arraycopy(buffer, bufferPosition, b, offset, bytesToCopy);
             offset += bytesToCopy;
             bytesToRead -= bytesToCopy;
-            filePosition += bytesToCopy;
+            bufferPosition += bytesToCopy;
+            if (bufferPosition >= currentBufferSize && bytesToRead > 0) {
+               nextChunk();
+               bufferPosition = 0;
+            }
          }
       }
 
+      @Override
       public void close() throws IOException {
-         filePosition = 0;
+         currentBufferSize = 0;
          bufferPosition = 0;
-         lastChunkNumberLoaded = -1;
+         currentLoadedChunk = -1;
          buffer = null;
-         localCache = null;
          if (log.isDebugEnabled()) {
             log.debug("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
          }
       }
 
       public long getFilePointer() {
-         return filePosition;
+         return ((long)currentLoadedChunk) * chunkSize + bufferPosition;
       }
 
+      @Override
       public void seek(long pos) throws IOException {
-         filePosition = (int) pos;
+         bufferPosition = (int)( pos % chunkSize );
+         int targetChunk = (int) (pos / chunkSize);
+         if (targetChunk != currentLoadedChunk) {
+            currentLoadedChunk = targetChunk;
+            setBufferToCurrentChunk();
+         }
       }
+      
+      private void nextChunk() throws IOException {
+         currentLoadedChunk++;
+         setBufferToCurrentChunk();
+      }
 
+      private void setBufferToCurrentChunk() throws IOException {
+         CacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+         buffer = (byte[]) cache.get(key);
+         if (buffer == null) {
+            throw new IOException("Chunk value could not be found for key " + key);
+         }
+         currentBufferSize = buffer.length;
+      }
+
+      @Override
       public long length() {
          return file.getSize();
       }
+      
    }
 
    /**

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java	2010-06-24 17:05:18 UTC (rev 1930)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java	2010-06-24 17:12:16 UTC (rev 1931)
@@ -113,7 +113,9 @@
          //Now it reads some random byte and it compares to the expected byte 
          for (int i = 0; i < FILE_SIZE; i++) {
             if(seekPoint == i) {
-               assert bytesGenerator.nextByte() == indexInput.readByte();
+               byte expectedByte = bytesGenerator.nextByte();
+               byte actualByte = indexInput.readByte();
+               assert expectedByte == actualByte;
                seekPoint = indexInput.getFilePointer() + r.nextInt(10);
                indexInput.seek(seekPoint);
             } else {

Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/IndexReadingStressTest.java	2010-06-24 17:12:16 UTC (rev 1931)
@@ -0,0 +1,153 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.profiling;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.Field.Index;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.index.CorruptIndexException;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.store.LockObtainFailedException;
+import org.apache.lucene.store.RAMDirectory;
+import org.infinispan.Cache;
+import org.infinispan.lucene.CacheKey;
+import org.infinispan.lucene.CacheTestSupport;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.lucene.testutils.ClusteredCacheFactory;
+import org.infinispan.lucene.testutils.LuceneSettings;
+import org.infinispan.manager.CacheContainer;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * This is a stress test meant to compare relative performance of RAMDirectory, FSDirectory,
+ * Infinispan local Directory, clustered.
+ * Focuses on Search performance; an index is built before the performance measurement is started and is not
+ * changed during the searches.
+ * To use it set a DURATION_MS as long as you can afford; choose thread number and terms number
+ * according to your use case as they will affect the results.
+ * 
+ * @author Sanne Grinovero
+ * @since 4.0
+ */
+ at Test(groups = "profiling", testName = "lucene.profiling.IndexReadingStressTest")
+public class IndexReadingStressTest {
+   
+   /** Concurrent IndexSearchers used during tests */
+   private static final int THREADS = 20;
+   
+   /** Test duration **/
+   private static final long DURATION_MS = 350000;
+   
+   /** Number of Terms written in the index **/
+   private static final int TERMS_NUMBER = 200000;
+
+   private static final ClusteredCacheFactory cacheFactory = new ClusteredCacheFactory(CacheTestSupport.createTestConfiguration());
+   
+   @Test
+   public void profileTestRAMDirectory() throws InterruptedException, IOException {
+      RAMDirectory dir = new RAMDirectory();
+      testDirectory(dir, "RAMDirectory");
+   }
+   
+   @Test
+   public void profileTestFSDirectory() throws InterruptedException, IOException {
+      File indexDir = new File(new File("."), "tempindex");
+      indexDir.mkdirs();
+      FSDirectory dir = FSDirectory.open(indexDir);
+      testDirectory(dir, "FSDirectory");
+   }
+   
+   @Test
+   public void profileTestInfinispanDirectory() throws InterruptedException, IOException {
+      //theses default are not for performance settings but meant for problem detection:
+      Cache<CacheKey,Object> cache = cacheFactory.createClusteredCache();
+      InfinispanDirectory dir = new InfinispanDirectory(cache, "iname");
+      testDirectory(dir, "InfinispanClustered");
+   }
+   
+   @Test
+   public void profileInfinispanLocalDirectory() throws InterruptedException, IOException {
+      CacheContainer cacheManager = CacheTestSupport.createLocalCacheManager();
+      try {
+         Cache<CacheKey, Object> cache = cacheManager.getCache();
+         InfinispanDirectory dir = new InfinispanDirectory(cache, "iname");
+         testDirectory(dir, "InfinispanLocal");
+      } finally {
+         cacheManager.stop();
+      }
+   }
+   
+   private void testDirectory(Directory dir, String testLabel) throws InterruptedException, IOException {
+      SharedState state = fillDirectory(dir);
+      ExecutorService e = Executors.newFixedThreadPool(THREADS);
+      for (int i=0; i<THREADS; i++) {
+         e.execute(new LuceneReaderThread(dir, state));
+      }
+      e.shutdown();
+      state.startWaitingThreads();
+      Thread.sleep(DURATION_MS);
+      long searchesCount = state.incrementIndexSearchesCount(0);
+      long writerTaskCount = state.incrementIndexWriterTaskCount(0);
+      state.quit();
+      e.awaitTermination(10, TimeUnit.SECONDS);
+      System.out.println(
+               "Test " + testLabel +" run in " + DURATION_MS + "ms:\n\tSearches: " + searchesCount + "\n\t" + "Writes: " + writerTaskCount);
+   }
+   
+   private SharedState fillDirectory(Directory directory) throws CorruptIndexException, LockObtainFailedException, IOException {
+      CacheTestSupport.initializeDirectory(directory);
+      SharedState state = new SharedState(0);
+      IndexWriter iwriter = LuceneSettings.openWriter(directory, 100000);
+      for (int i = 0; i <= TERMS_NUMBER; i++) {
+         Document doc = new Document();
+         String term = String.valueOf(i);
+         state.addStringWrittenToIndex(term);
+         doc.add(new Field("main", term, Store.NO, Index.NOT_ANALYZED));
+         iwriter.addDocument(doc);
+      }
+      iwriter.commit();
+      iwriter.close();
+      return state;
+   }
+
+   @BeforeClass
+   public static void beforeTest() {
+      cacheFactory.start();
+   }
+
+   @AfterClass
+   public static void afterTest() {
+      cacheFactory.stop();
+   }
+
+}



More information about the infinispan-commits mailing list