[infinispan-commits] Infinispan SVN: r2188 - in branches/4.1.x/lucene-directory/src: test/java/org/infinispan/lucene and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Aug 11 05:21:27 EDT 2010


Author: sannegrinovero
Date: 2010-08-11 05:21:26 -0400 (Wed, 11 Aug 2010)
New Revision: 2188

Added:
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
Modified:
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
Log:
[ISPN-590] (Lucene's segment files are flushed twice using SKIP_LOCK, implement proper index commit()) - branch 4.1

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2010-08-11 09:19:57 UTC (rev 2187)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -174,16 +174,15 @@
       } while (true);
       
       // rename metadata first
-      cache.startBatch();
+      boolean batching = cache.startBatch();
       FileCacheKey fromKey = new FileCacheKey(indexName, from);
       FileMetadata metadata = (FileMetadata) cache.remove(fromKey);
       cache.put(new FileCacheKey(indexName, to), metadata);
       Set<String> fileList = getFileList();
       fileList.remove(from);
       fileList.add(to);
-      createRefCountForNewFile(to);
       cache.put(fileListCacheKey, fileList);
-      cache.endBatch(true);
+      if (batching) cache.endBatch(true);
       
       // now trigger deletion of old file chunks:
       FileReadLockKey fileFromReadLockKey = new FileReadLockKey(indexName, from);
@@ -214,7 +213,6 @@
       FileMetadata previous = (FileMetadata) cache.putIfAbsent(key, newFileMetadata);
       if (previous == null) {
          // creating new file
-         createRefCountForNewFile(name);
          Set<String> fileList = getFileList();
          fileList.add(name);
          cache.put(fileListCacheKey, fileList);
@@ -224,11 +222,6 @@
       }
    }
 
-   private void createRefCountForNewFile(String fileName) {
-      FileReadLockKey readLockKey = new FileReadLockKey(indexName, fileName);
-      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_CACHE_STORE).put(readLockKey, Integer.valueOf(1));
-   }
-
    @SuppressWarnings("unchecked")
    private Set<String> getFileList() {
       Set<String> fileList = (Set<String>) cache.withFlags(Flag.SKIP_LOCKING).get(fileListCacheKey);

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-08-11 09:19:57 UTC (rev 2187)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -48,6 +48,7 @@
    private final FileCacheKey fileKey;
    private final int chunkSize;
    private final FileReadLockKey readLockKey;
+   private final boolean trace;
 
    private int currentBufferSize;
    private byte[] buffer;
@@ -64,8 +65,9 @@
       final String filename = fileKey.getFileName();
       this.readLockKey = new FileReadLockKey(fileKey.getIndexName(), filename);
       aquireReadLock();
-      if (log.isDebugEnabled()) {
-         log.debug("Opened new IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
+      trace = log.isTraceEnabled();
+      if (trace) {
+         log.trace("Opened new IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
       }
    }
 
@@ -129,10 +131,10 @@
          chunkKey = new ChunkCacheKey(indexName, filename, ++i);
       } while (removed != null);
       FileCacheKey key = new FileCacheKey(indexName, filename);
-      cache.startBatch();
+//      boolean batch = cache.startBatch(); //FIXME when enabling batch, org.infinispan.lucene.profiling.CacheStoreStressTest fails to remove the readLockKey
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).remove(readLockKey);
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).remove(key);
-      cache.endBatch(true);
+//      if (batch) cache.endBatch(true);
    }
 
    /**
@@ -203,8 +205,8 @@
       buffer = null;
       if (isClone) return;
       releaseReadLock();
-      if (log.isDebugEnabled()) {
-         log.debug("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      if (trace) {
+         log.trace("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
       }
    }
 

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-08-11 09:19:57 UTC (rev 2187)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -33,6 +33,7 @@
  * Responsible for writing to a <code>Directory</code>
  * 
  * @since 4.0
+ * @author Sanne Grinovero
  * @author Lukasz Moren
  * @author Davide Di Somma
  * @see org.apache.lucene.store.Directory
@@ -47,20 +48,24 @@
    private final AdvancedCache cache;
    private final FileMetadata file;
    private final FileCacheKey fileKey;
+   private final boolean trace;
 
    private byte[] buffer;
    private int positionInBuffer = 0;
    private long filePosition = 0;
    private int currentChunkNumber = 0;
 
+   private boolean transactionRunning = false;
+
    public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileMetadata fileMetadata) throws IOException {
       this.cache = cache;
       this.fileKey = fileKey;
       this.bufferSize = bufferSize;
       this.buffer = new byte[this.bufferSize];
       this.file = fileMetadata;
-      if (log.isDebugEnabled()) {
-         log.debug("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      trace = log.isTraceEnabled();
+      if (trace) {
+         log.trace("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
       }
    }
    
@@ -80,7 +85,6 @@
       }
    }
 
-   
    private static int getPositionInBuffer(long pos, int bufferSize) {
       return (int) (pos % bufferSize);
    }
@@ -90,7 +94,7 @@
    }
 
    private void newChunk() throws IOException {
-      flush();// save data first
+      doFlush();// save data first
       currentChunkNumber++;
       // check if we have to create new chunk, or get already existing in cache for modification
       buffer = getChunkById(cache, fileKey, currentChunkNumber, bufferSize);
@@ -127,6 +131,15 @@
 
    @Override
    public void flush() throws IOException {
+      //flush is invoked by Lucene directly only in the before-commit phase,
+      //so that's what we begin here:
+      if ( ! transactionRunning) {
+         transactionRunning = cache.startBatch();
+      }
+      doFlush();
+   }
+
+   public void doFlush() throws IOException {
       // create key for the current chunk
       ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentChunkNumber);
       // size changed, apply change to file header
@@ -143,12 +156,15 @@
             System.arraycopy(buffer, 0, bufferToFlush, 0, newBufferSize);
          }
       }
-      cache.startBatch();
+      boolean microbatch = false;
+      if ( ! transactionRunning) {
+         microbatch = cache.startBatch();
+      }
       // add chunk to cache
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(key, bufferToFlush);
       // override existing file header with new size and last time access
       cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(fileKey, file);
-      cache.endBatch(true);
+      if (microbatch) cache.endBatch(true);
    }
 
    private void resizeFileIfNeeded() {
@@ -159,14 +175,18 @@
 
    @Override
    public void close() throws IOException {
-      flush();
+      doFlush();
+      if (transactionRunning) {
+         //commit
+         cache.endBatch(true);
+         transactionRunning = false;
+      }
       positionInBuffer = 0;
       filePosition = 0;
       buffer = null;
-      if (log.isDebugEnabled()) {
-         log.debug("Closed IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+      if (trace) {
+         log.trace("Closed IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
       }
-      // cache.compact(); //TODO investigate about this
    }
 
    @Override
@@ -183,7 +203,7 @@
             throw new IOException(fileKey.getFileName() + ": seeking past of the file");
       }
       if (requestedChunkNumber != currentChunkNumber) {
-         flush();
+         doFlush();
          buffer = getChunkById(cache, fileKey, requestedChunkNumber, bufferSize);
          currentChunkNumber = requestedChunkNumber;
       }
@@ -201,4 +221,5 @@
       int lastChunkNumber = (int) (file.getSize() / bufferSize);
       return currentChunkNumber == lastChunkNumber;
    }
+   
 }

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java	2010-08-11 09:19:57 UTC (rev 2187)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/CacheTestSupport.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -214,5 +214,23 @@
       iw.commit();
       iw.close();
    }
-
+   
+   /**
+    * Useful tool to debug the Lucene invocations into the directory;
+    * it prints a thread dump to standard output of only seven lines
+    * from the invoker.
+    * 
+    * @param initialLine The label to print as first line of the stack
+    */
+   public static void dumpMicroStack(String initialLine) {
+      StackTraceElement[] stackTraceElements = Thread.getAllStackTraces().get(Thread.currentThread());
+      StringBuilder sb = new StringBuilder(initialLine);
+      for (int i = 3; i < 10; i++) {
+         sb.append("\n\t");
+         sb.append(stackTraceElements[i]);
+      }
+      sb.append("\n");
+      System.out.println(sb.toString());
+   }
+   
 }

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java	2010-08-11 09:19:57 UTC (rev 2187)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/InfinispanDirectoryIOTest.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -78,18 +78,18 @@
       IndexOutput io = dir.createOutput(fileName);
       RepeatableLongByteSequence bytesGenerator = new RepeatableLongByteSequence();
       //It writes repeatable text
-      int REPEATABLE_BUFFER_SIZE = 1501;
+      final int REPEATABLE_BUFFER_SIZE = 1501;
       for (int i = 0; i < REPEATABLE_BUFFER_SIZE; i++) {
          io.writeByte(bytesGenerator.nextByte());
       }
       io.flush();
       
       //Text to write on file with repeatable text
-      String someText = "This is some text";
-      byte[] someTextAsBytes = someText.getBytes();
+      final String someText = "This is some text";
+      final byte[] someTextAsBytes = someText.getBytes();
       //4 points in random order where writing someText: at begin of file, at end of file, within a single chunk,
       //between 2 chunks
-      int[] pointers = {0, 635, REPEATABLE_BUFFER_SIZE, 135};
+      final int[] pointers = {0, 635, REPEATABLE_BUFFER_SIZE, 135};
       
       for(int i=0; i < pointers.length; i++) {
          io.seek(pointers[i]);
@@ -104,23 +104,23 @@
       int indexPointer = 0;
       Arrays.sort(pointers);
       byte[] buffer = null;
-      int chunkIndex = 0;
+      int chunkIndex = -1;
       //now testing the stream is equal to the produced repeatable but including the edits at pointed positions
       for (int i = 0; i < REPEATABLE_BUFFER_SIZE + someTextAsBytes.length; i++) {
-         if(i % BUFFER_SIZE == 0) {
-            buffer = (byte[])cache.get(new ChunkCacheKey(INDEXNAME, fileName, chunkIndex++));
+         if (i % BUFFER_SIZE == 0) {
+            buffer = (byte[]) cache.get(new ChunkCacheKey(INDEXNAME, fileName, ++chunkIndex));
          }
          
          byte predictableByte = bytesGenerator.nextByte();
-         if(i < pointers[indexPointer]) {
+         if (i < pointers[indexPointer]) {
             //Assert predictable text
             Assert.assertEquals(predictableByte, buffer[i % BUFFER_SIZE]);
-         } else if(pointers[indexPointer] <= i && i < pointers[indexPointer] + someTextAsBytes.length){
+         } else if (pointers[indexPointer] <= i && i < pointers[indexPointer] + someTextAsBytes.length) {
             //Assert someText 
             Assert.assertEquals(someTextAsBytes[i - pointers[indexPointer]], buffer[i % BUFFER_SIZE]);
          }
          
-         if(i == pointers[indexPointer] + someTextAsBytes.length) {
+         if (i == pointers[indexPointer] + someTextAsBytes.length) {
             //Change pointer
             indexPointer++;
          }
@@ -473,6 +473,7 @@
       io.writeByte((byte) 69);
 
       io.flush();
+      io.close();
 
       assert dir.fileExists("MyNewFile.txt");
       assert null != cache.get(new ChunkCacheKey(INDEXNAME, "MyNewFile.txt", 0));
@@ -486,6 +487,7 @@
       assert new String(new byte[] { 66, 69 }).equals(new String(buf).trim());
 
       String testText = "This is some rubbish again that will span more than one chunk - one hopes.  Who knows, maybe even three or four chunks.";
+      io = dir.createOutput("MyNewFile.txt");
       io.seek(0);
       io.writeBytes(testText.getBytes(), 0, testText.length());
       io.close();

Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.profiling;
+
+import java.io.IOException;
+
+import org.infinispan.config.CacheLoaderManagerConfig;
+import org.infinispan.config.Configuration;
+import org.infinispan.loaders.jdbc.TableManipulation;
+import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStoreConfig;
+import org.infinispan.lucene.CacheTestSupport;
+import org.infinispan.lucene.DirectoryIntegrityCheck;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.lucene.LuceneKey2StringMapper;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.test.SingleCacheManagerTest;
+import org.infinispan.test.fwk.TestCacheManagerFactory;
+import org.infinispan.test.fwk.UnitTestDatabaseManager;
+import org.testng.annotations.Test;
+
+/**
+ * Testcase verifying that the index is usable under stress even when a cachestore is configured.
+ * See ISPN-575 (Corruption in data when using a permanent store)
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class CacheStoreStressTest extends SingleCacheManagerTest {
+   
+   private final ConnectionFactoryConfig connectionFactoryConfig = UnitTestDatabaseManager.getUniqueConnectionFactoryConfig();
+
+   private static final String indexName = "tempIndexName";
+   
+   @Override
+   protected EmbeddedCacheManager createCacheManager() throws Exception {
+      Configuration configuration = CacheTestSupport.createTestConfiguration();
+      enableTestJdbcStorage(configuration);
+      return TestCacheManagerFactory.createClusteredCacheManager(configuration);
+   }
+   
+   private void enableTestJdbcStorage(Configuration configuration) {
+      TableManipulation tm = UnitTestDatabaseManager.buildDefaultTableManipulation();
+      JdbcStringBasedCacheStoreConfig jdbcStoreConfiguration = new JdbcStringBasedCacheStoreConfig(connectionFactoryConfig, tm);
+      jdbcStoreConfiguration.setKey2StringMapperClass(LuceneKey2StringMapper.class.getName());
+      CacheLoaderManagerConfig loaderManagerConfig = configuration.getCacheLoaderManagerConfig();
+      loaderManagerConfig.setPreload(false); // TODO change after ISPN-579
+      loaderManagerConfig.addCacheLoaderConfig(jdbcStoreConfiguration);
+   }
+
+   @Test
+   public void stressTestOnStore() throws InterruptedException, IOException {
+      cache = cacheManager.getCache();
+      assert cache!=null;
+      InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
+      PerformanceCompareStressTest.stressTestDirectory(dir, "InfinispanClusteredWith-Store");
+      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+   }
+   
+}

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java	2010-08-11 09:19:57 UTC (rev 2187)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java	2010-08-11 09:21:26 UTC (rev 2188)
@@ -129,6 +129,7 @@
       }
    }
 
+   @Test(enabled=false)//to prevent invocations from some versions of TestNG
    public static void stressTestDirectory(Directory dir, String testLabel) throws InterruptedException, IOException {
       SharedState state = new SharedState(DICTIONARY_SIZE);
       CacheTestSupport.initializeDirectory(dir);



More information about the infinispan-commits mailing list