[infinispan-commits] Infinispan SVN: r2481 - in trunk/lucene-directory/src: main/java/org/infinispan/lucene/readlocks and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sun Oct 3 16:11:37 EDT 2010


Author: sannegrinovero
Date: 2010-10-03 16:11:37 -0400 (Sun, 03 Oct 2010)
New Revision: 2481

Modified:
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
   trunk/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
   trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
Log:
[ISPN-689] (Read past EOF caused in Lucene Directory when Lucene flushes but doesn't close the segment) merged from 4.2

Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-10-03 20:02:27 UTC (rev 2480)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-10-03 20:11:37 UTC (rev 2481)
@@ -56,8 +56,6 @@
    private int currentChunkNumber = 0;
    private boolean needsAddingToFileList = true;
 
-   private boolean transactionRunning = false;
-
    public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileListOperations fileList) throws IOException {
       this.cache = cache;
       this.fileKey = fileKey;
@@ -134,11 +132,6 @@
 
    @Override
    public void flush() throws IOException {
-      //flush is invoked by Lucene directly only in the before-commit phase,
-      //so that's what we begin here:
-      if ( ! transactionRunning) {
-         transactionRunning = cache.startBatch();
-      }
       storeCurrentBuffer();
    }
 
@@ -154,18 +147,16 @@
             System.arraycopy(buffer, 0, bufferToFlush, 0, newBufferSize);
          }
       }
-      boolean microbatch = false;
-      // we don't want to spawn a nested transaction, just to write in batch:
-      if ( ! transactionRunning) {
-         microbatch = cache.startBatch();
-      }
+      boolean microbatch = cache.startBatch();
       // add chunk to cache
       if ( ! writingOnLastChunk || this.positionInBuffer != 0) {
          // create key for the current chunk
          ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentChunkNumber);
+         if (trace) log.trace("Storing segment chunk: " + key);
          cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(key, bufferToFlush);
       }
       // override existing file header with new size and updated accesstime
+      file.touch();
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileKey, file);
       registerToFileListIfNeeded();
       if (microbatch) cache.endBatch(true);
@@ -187,11 +178,6 @@
    @Override
    public void close() throws IOException {
       storeCurrentBuffer();
-      if (transactionRunning) {
-         //commit
-         cache.endBatch(true);
-         transactionRunning = false;
-      }
       positionInBuffer = 0;
       filePosition = 0;
       buffer = null;

Modified: trunk/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java
===================================================================
--- trunk/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java	2010-10-03 20:02:27 UTC (rev 2480)
+++ trunk/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java	2010-10-03 20:11:37 UTC (rev 2481)
@@ -30,6 +30,8 @@
 import org.infinispan.lucene.FileReadLockKey;
 import org.infinispan.lucene.InfinispanDirectory;
 import org.infinispan.lucene.InfinispanIndexInput;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
 /**
  * <p>DistributedSegmentReadLocker stores reference counters in the cache
@@ -45,6 +47,8 @@
  */
 public class DistributedSegmentReadLocker implements SegmentReadLocker {
    
+   private static final Log log = LogFactory.getLog(DistributedSegmentReadLocker.class);
+   
    private final AdvancedCache cache;
    private final String indexName;
    
@@ -154,18 +158,22 @@
     * @param cache the cache containing the elements to be deleted
     */
    static void realFileDelete(FileReadLockKey readLockKey, AdvancedCache cache) {
+      final boolean trace = log.isTraceEnabled();
       final String indexName = readLockKey.getIndexName();
       final String filename = readLockKey.getFileName();
       FileCacheKey key = new FileCacheKey(indexName, filename);
+      if (trace) log.trace("deleting metadata: " + key);
       FileMetadata file = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).remove(key);
       if (file != null) { //during optimization of index a same file could be deleted twice, so you could see a null here
          for (int i = 0; i < file.getNumberOfChunks(); i++) {
             ChunkCacheKey chunkKey = new ChunkCacheKey(indexName, filename, i);
+            if (trace) log.trace("deleting chunk: " + chunkKey);
             cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).removeAsync(chunkKey);
          }
       }
       // last operation, as being set as value==0 it prevents others from using it during the
       // deletion process:
+      if (trace) log.trace("deleting readlock: " + readLockKey);
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(readLockKey);
    }
 

Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java	2010-10-03 20:02:27 UTC (rev 2480)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java	2010-10-03 20:11:37 UTC (rev 2481)
@@ -613,6 +613,9 @@
       testOn(dir, 40 ,40, cache);
       testOn(dir, 40 ,39, cache);
       testOn(dir, 39 ,40, cache);
+      
+      // quite bigger
+      testOn(dir, 600, 600, cache);
    }
 
    private void testOn(Directory dir, int writeSize, int readSize, Cache cache) throws IOException {
@@ -640,6 +643,31 @@
             assert false :"should not have thrown an IOException" + ioe.getMessage();
       }
    }
+   
+   public void multipleFlushTest() throws IOException {
+      final String filename = "longFile.writtenInMultipleFlushes";
+      final int bufferSize = 300;
+      Cache cache = cacheManager.getCache();
+      cache.clear();
+      InfinispanDirectory dir = new InfinispanDirectory(cache, INDEXNAME, 13);
+      byte[] manyBytes = fillBytes(bufferSize);
+      IndexOutput indexOutput = dir.createOutput(filename);
+      for (int i = 0; i < 10; i++) {
+         indexOutput.writeBytes(manyBytes, bufferSize);
+         indexOutput.flush();
+      }
+      IndexInput input = dir.openInput(filename);
+      final int finalSize = (10 * bufferSize);
+      assert input.length() == finalSize;
+      final byte[] resultingBuffer = new byte[finalSize];
+      input.readBytes(resultingBuffer, 0, finalSize);
+      int index = 0;
+      for (int i = 0; i < 10; i++) {
+         for (int j = 0; j < bufferSize; j++)
+            assert resultingBuffer[index++] == manyBytes[j];
+      }
+      indexOutput.close();
+   }
 
    private byte[] fillBytes(int size) {
       byte[] b = new byte[size];

Modified: trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
===================================================================
--- trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java	2010-10-03 20:02:27 UTC (rev 2480)
+++ trunk/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java	2010-10-03 20:11:37 UTC (rev 2481)
@@ -68,6 +68,8 @@
    /** Concurrent Threads in tests */
    private static final int READER_THREADS = 5;
    private static final int WRITER_THREADS = 1;
+   
+   private static final int CHUNK_SIZE = 512 * 1024;
 
    private static final String indexName = "tempIndexName";
 
@@ -95,7 +97,7 @@
    @Test
    public void profileTestInfinispanDirectoryWithNetworkDelayZero() throws Exception {
       // TestingUtil.setDelayForCache(cache, 0, 0);
-      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
+      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName, CHUNK_SIZE);
       stressTestDirectory(dir, "InfinispanClustered-delayedIO:0");
       verifyDirectoryState();
    }
@@ -103,7 +105,7 @@
    @Test
    public void profileTestInfinispanDirectoryWithNetworkDelay4() throws Exception {
       TestingUtil.setDelayForCache(cache, 4, 4);
-      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
+      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName, CHUNK_SIZE);
       stressTestDirectory(dir, "InfinispanClustered-delayedIO:4");
       verifyDirectoryState();
    }
@@ -111,7 +113,7 @@
    @Test
    public void profileTestInfinispanDirectoryWithHighNetworkDelay40() throws Exception {
       TestingUtil.setDelayForCache(cache, 40, 40);
-      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
+      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName, CHUNK_SIZE);
       stressTestDirectory(dir, "InfinispanClustered-delayedIO:40");
       verifyDirectoryState();
    }
@@ -121,7 +123,7 @@
       CacheContainer cacheContainer = CacheTestSupport.createLocalCacheManager();
       try {
          cache = cacheContainer.getCache();
-         InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
+         InfinispanDirectory dir = new InfinispanDirectory(cache, indexName, CHUNK_SIZE);
          stressTestDirectory(dir, "InfinispanLocal");
          verifyDirectoryState();
       } finally {



More information about the infinispan-commits mailing list