[infinispan-commits] Infinispan SVN: r2220 - in branches/4.1.x/lucene-directory/src: main/java/org/infinispan/lucene/readlocks and 3 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sat Aug 14 17:43:49 EDT 2010


Author: sannegrinovero
Date: 2010-08-14 17:43:48 -0400 (Sat, 14 Aug 2010)
New Revision: 2220

Added:
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java
Modified:
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
   branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
   branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
Log:
[ISPN-603] (Extract Lucene readlock logic to an overridable component, provide alternative implementations) - branch 4.1

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -30,7 +30,7 @@
  * @author Lukasz Moren
  * @author Sanne Grinovero
  */
-final class ChunkCacheKey implements Serializable {
+public final class ChunkCacheKey implements Serializable {
 
    /** The serialVersionUID */
    private static final long serialVersionUID = 4429712073623290126L;

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -30,7 +30,7 @@
  * @author Lukasz Moren
  * @author Sanne Grinovero
  */
-final class FileListCacheKey implements Serializable {
+public final class FileListCacheKey implements Serializable {
 
    /** The serialVersionUID */
    private static final long serialVersionUID = 8965108175527988255L;

Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene;
+
+import java.io.FileNotFoundException;
+import java.util.Set;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.context.Flag;
+import org.infinispan.util.concurrent.ConcurrentHashSet;
+
+/**
+ * Collects operations on the existing fileList, stored as a Set<String> having key
+ * of type FileListCacheKey(indexName).
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+class FileListOperations {
+   
+   private final FileListCacheKey fileListCacheKey;
+   private final AdvancedCache cache;
+   private final String indexName;
+
+   FileListOperations(AdvancedCache cache, String indexName){
+      this.cache = cache;
+      this.indexName = indexName;
+      this.fileListCacheKey = new FileListCacheKey(indexName);
+   }
+   
+   /**
+    * @return the current list of files being part of the index 
+    */
+   Set<String> getFileList() {
+      Set<String> fileList = (Set<String>) cache.withFlags(Flag.SKIP_LOCKING).get(fileListCacheKey);
+      if (fileList == null)
+         fileList = new ConcurrentHashSet<String>();
+      return fileList;
+   }
+
+   /**
+    * Deleted a file from the list of files actively part of the index
+    * @param fileName
+    */
+   void deleteFileName(String fileName) {
+      Set<String> fileList = getFileList();
+      boolean done = fileList.remove(fileName);
+      if (done) {
+         cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileListCacheKey, fileList);
+      }
+   }
+   
+   /**
+    * Adds a new fileName in the list of files making up this index
+    * @param fileName
+    */
+   void addFileName(String fileName) {
+      Set<String> fileList = getFileList();
+      boolean done = fileList.add(fileName);
+      if (done) {
+         cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileListCacheKey, fileList);
+      }
+   }
+   
+   /**
+    * @param fileName
+    * @return the FileMetadata associated with the fileName
+    * @throws FileNotFoundException if the metadata was not found
+    */
+   FileMetadata getFileMetadata(String fileName) throws FileNotFoundException {
+      FileCacheKey key = new FileCacheKey(indexName, fileName);
+      FileMetadata metadata = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).get(key);
+      if (metadata == null) {
+         throw new FileNotFoundException(fileName);
+      }
+      return metadata;
+   }
+
+   /**
+    * Optimized implementation to perform both a remove and an add 
+    * @param toRemove
+    * @param toAdd
+    */
+   void removeAndAdd(String toRemove, String toAdd) {
+      Set<String> fileList = getFileList();
+      boolean doneAdd = fileList.add(toAdd);
+      boolean doneRemove = fileList.remove(toRemove);
+      if (doneAdd || doneRemove) {
+         cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileListCacheKey, fileList);
+      }
+   }
+
+}

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -31,7 +31,7 @@
  * @author Lukasz Moren
  * @see org.infinispan.lucene.FileCacheKey
  */
-final class FileMetadata implements Serializable {
+public final class FileMetadata implements Serializable {
 
    /** The serialVersionUID */
    private static final long serialVersionUID = -2605615719808221213L;

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -32,7 +32,7 @@
  * @author Sanne Grinovero
  * @since 4.0
  */
-final class FileReadLockKey implements Serializable {
+public final class FileReadLockKey implements Serializable {
 
    /** The serialVersionUID */
    private static final long serialVersionUID = 7789410500198851940L;
@@ -41,7 +41,7 @@
    private final String fileName;
    private final int hashCode;
 
-   FileReadLockKey(String indexName, String fileName) {
+   public FileReadLockKey(String indexName, String fileName) {
       if (indexName == null)
          throw new IllegalArgumentException("indexName shall not be null");
       if (fileName == null)

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -34,24 +34,49 @@
 import org.infinispan.Cache;
 import org.infinispan.context.Flag;
 import org.infinispan.lucene.locking.BaseLockFactory;
-import org.infinispan.util.concurrent.ConcurrentHashSet;
+import org.infinispan.lucene.readlocks.DistributedSegmentReadLocker;
+import org.infinispan.lucene.readlocks.SegmentReadLocker;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
 /**
- * Implementation that uses Infinispan to store Lucene indices.
+ * An implementation of Lucene's {@link org.apache.lucene.store.Directory} which uses Infinispan to store Lucene indices.
+ * As the RAMDirectory the data is stored in memory, but provides some additional flexibility:
+ * <p><b>Passivation, LRU or LIRS</b> Bigger indexes can be configured to passivate cleverly selected chunks of data to a cache store.
+ * This can be a local filesystem, a network filesystem, a database or custom cloud stores like S3. See Infinispan's core documentation for a full list of available implementations, or {@link org.infinispan.loaders.CacheStore} to implement more.</p>
+ * <p><b>Non-volatile memory</b> The contents of the index can be stored in it's entirety in such a store, so that on shutdown or crash of the system data is not lost.
+ * A copy of the index will be copied to the store in sync or async depending on configuration; In case you enable
+ * Infinispan's clustering even in case of async the segments are always duplicated synchronosly to other nodes, so you can
+ * benefit from good reliability even while choosing the asynchronous mode to write the index to the slowest store implementations.</p>
+ * <p><b>Real-time change propagation</b> All changes done on a node are propagated at low latency to other nodes of the cluster; this was designed especially for
+ * interactive usage of Lucene, so that after an IndexWriter commits on one node new IndexReaders opened on any node of the cluster
+ * will be able to deliver updated search results.</p>
+ * <p><b>Distributed heap</b> Infinispan acts as a shared heap for the purpose of total memory consumption, so you can avoid hitting the slower disks even
+ * if the total size of the index can't fit in the memory of a single node: network is faster than disks, especially if the index
+ * is bigger than the memory available to cache it.</p>
+ * <p><b>Distributed locking</b>
+ * As default Lucene Directory implementations a global lock needs to protect the index from having more than an IndexWriter open; in case of a
+ * replicated or distributed index you need to enable a cluster-wide {@link org.apache.lucene.store.LockFactory}.
+ * This implementation uses by default {@link org.infinispan.lucene.locking.BaseLockFactory}; in case you want to apply changes during a JTA transaction
+ * see also {@link org.infinispan.lucene.locking.TransactionalLockFactory}.
+ * </p>
+ * <p><b>Combined store patterns</b> It's possible to combine different stores and passivation policies, so that each nodes shares the index changes
+ * quickly to other nodes, offloads less frequently used data to a per-node local filesystem, and the cluster also coordinates to keeps a safe copy on a shared store.</p>
  * 
- * Directory locking is assured with {@link org.infinispan.lucene.locking.TransactionalSharedLuceneLock}
- * 
  * @since 4.0
+ * @author Sanne Grinovero
  * @author Lukasz Moren
- * @author Sanne Grinovero
+ * @see org.apache.lucene.store.Directory
+ * @see org.apache.lucene.store.LockFactory
+ * @see org.infinispan.lucene.locking.BaseLockFactory
  * @see org.infinispan.lucene.locking.TransactionalLockFactory
  */
 public class InfinispanDirectory extends Directory {
    
-   // used as default chunk size if not provided in conf
-   // each Lucene index segment is splitted into parts with default size defined here
+   /**
+    * used as default chunk size, can be overriden at construction time
+    * each Lucene index segment is splitted into parts with default size defined here
+    */
    public final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
 
    private static final Log log = LogFactory.getLog(InfinispanDirectory.class);
@@ -67,34 +92,47 @@
    // size per dir
    private final int chunkSize;
 
-   private final FileListCacheKey fileListCacheKey;
+   private final FileListOperations fileOps;
+   private final SegmentReadLocker readLocks;
 
-   public InfinispanDirectory(Cache cache, String indexName, LockFactory lf, int chunkSize) {
+   public InfinispanDirectory(Cache cache, String indexName, LockFactory lf, int chunkSize, SegmentReadLocker readLocker) {
       if (cache == null)
          throw new IllegalArgumentException("Cache must not be null");
       if (indexName == null)
          throw new IllegalArgumentException("index name must not be null");
       if (lf == null)
          throw new IllegalArgumentException("LockFactory must not be null");
-      if (chunkSize<=0)
-         throw new IllegalArgumentException("chunkSize must be a non-null positive integer");
+      if (readLocker == null)
+         throw new IllegalArgumentException("SegmentReadLocker must not be null");
+      if (chunkSize <= 0)
+         throw new IllegalArgumentException("chunkSize must be a positive integer");
       this.cache = cache.getAdvancedCache();
       this.indexName = indexName;
       this.setLockFactory(lf);
       this.chunkSize = chunkSize;
-      this.fileListCacheKey = new FileListCacheKey(indexName);
+      this.fileOps = new FileListOperations(this.cache, indexName);
+      this.readLocks = readLocker;
    }
+   
+   public InfinispanDirectory(Cache cache, String indexName, LockFactory lf, int chunkSize) {
+      this(cache, indexName, lf, chunkSize,
+               new DistributedSegmentReadLocker(cache, indexName, chunkSize));
+   }
 
+   public InfinispanDirectory(Cache cache, String indexName, int chunkSize, SegmentReadLocker readLocker) {
+      this(cache, indexName, makeDefaultLockFactory(cache, indexName), chunkSize, readLocker);
+   }
+   
    public InfinispanDirectory(Cache cache, String indexName, LockFactory lf) {
       this(cache, indexName, lf, DEFAULT_BUFFER_SIZE);
    }
 
    public InfinispanDirectory(Cache cache, String indexName, int chunkSize) {
-      this(cache, indexName, new BaseLockFactory(cache, indexName), chunkSize);
+      this(cache, indexName, makeDefaultLockFactory(cache, indexName), chunkSize);
    }
 
    public InfinispanDirectory(Cache cache, String indexName) {
-      this(cache, indexName, new BaseLockFactory(cache, indexName), DEFAULT_BUFFER_SIZE);
+      this(cache, indexName, makeDefaultLockFactory(cache, indexName), DEFAULT_BUFFER_SIZE);
    }
 
    public InfinispanDirectory(Cache cache) {
@@ -106,7 +144,7 @@
     */
    public String[] list() throws IOException {
       checkIsOpen();
-      Set<String> filesList = getFileList();
+      Set<String> filesList = fileOps.getFileList();
       String[] array = filesList.toArray(new String[0]);
       return array;
    }
@@ -116,7 +154,7 @@
     */
    public boolean fileExists(String name) throws IOException {
       checkIsOpen();
-      return cache.withFlags(Flag.SKIP_LOCKING).containsKey(new FileCacheKey(indexName, name));
+      return fileOps.getFileList().contains(name);
    }
 
    /**
@@ -124,11 +162,7 @@
     */
    public long fileModified(String name) throws IOException {
       checkIsOpen();
-      FileMetadata file = getFileMetadata(name);
-      if (file == null) {
-         throw new FileNotFoundException(name);
-      }
-      return file.getLastModified();
+      return fileOps.getFileMetadata(name).getLastModified();
    }
 
    /**
@@ -150,13 +184,8 @@
     */
    public void deleteFile(String name) throws IOException {
       checkIsOpen();
-      Set<String> fileList = getFileList();
-      boolean deleted = fileList.remove(name);
-      if (deleted) {
-         cache.put(fileListCacheKey, fileList);
-      }
-      FileReadLockKey fileReadLockKey = new FileReadLockKey(indexName, name);
-      InfinispanIndexInput.releaseReadLock(fileReadLockKey, cache);
+      fileOps.deleteFileName(name);
+      readLocks.deleteOrReleaseReadLock(name);
       if (log.isDebugEnabled()) {
          log.debug("Removed file: {0} from index: {1}", name, indexName);
       }
@@ -184,17 +213,13 @@
       // rename metadata first
       boolean batching = cache.startBatch();
       FileCacheKey fromKey = new FileCacheKey(indexName, from);
-      FileMetadata metadata = (FileMetadata) cache.remove(fromKey);
+      FileMetadata metadata = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).get(fromKey);
       cache.put(new FileCacheKey(indexName, to), metadata);
-      Set<String> fileList = getFileList();
-      fileList.remove(from);
-      fileList.add(to);
-      cache.put(fileListCacheKey, fileList);
+      fileOps.removeAndAdd(from, to);
       if (batching) cache.endBatch(true);
       
       // now trigger deletion of old file chunks:
-      FileReadLockKey fileFromReadLockKey = new FileReadLockKey(indexName, from);
-      InfinispanIndexInput.releaseReadLock(fileFromReadLockKey, cache);
+      readLocks.deleteOrReleaseReadLock(from);
       if (log.isTraceEnabled()) {
          log.trace("Renamed file from: {0} to: {1} in index {2}", from, to, indexName);
       }
@@ -205,11 +230,7 @@
     */
    public long fileLength(String name) throws IOException {
       checkIsOpen();
-      final FileMetadata file = getFileMetadata(name);
-      if (file == null) {
-         throw new FileNotFoundException(name);
-      }
-      return file.getSize();
+      return fileOps.getFileMetadata(name).getSize();
    }
 
    /**
@@ -217,27 +238,10 @@
     */
    public IndexOutput createOutput(String name) throws IOException {
       final FileCacheKey key = new FileCacheKey(indexName, name);
-      FileMetadata newFileMetadata = new FileMetadata();
-      FileMetadata previous = (FileMetadata) cache.putIfAbsent(key, newFileMetadata);
-      if (previous == null) {
-         // creating new file
-         Set<String> fileList = getFileList();
-         fileList.add(name);
-         cache.put(fileListCacheKey, fileList);
-         return new InfinispanIndexOutput(cache, key, chunkSize, newFileMetadata);
-      } else {
-         return new InfinispanIndexOutput(cache, key, chunkSize, previous);
-      }
+      // creating new file, metadata is added on flush() or close() of IndexOutPut
+      return new InfinispanIndexOutput(cache, key, chunkSize, fileOps);
    }
 
-   @SuppressWarnings("unchecked")
-   private Set<String> getFileList() {
-      Set<String> fileList = (Set<String>) cache.withFlags(Flag.SKIP_LOCKING).get(fileListCacheKey);
-      if (fileList == null)
-         fileList = new ConcurrentHashSet<String>();
-      return fileList;
-   }
-
    /**
     * {@inheritDoc}
     */
@@ -248,10 +252,16 @@
          throw new FileNotFoundException("Error loading medatada for index file: " + fileKey);
       }
       else if (fileMetadata.getSize() <= chunkSize) {
+         //files smaller than chunkSize don't need a readLock
          return new SingleChunkIndexInput(cache, fileKey, fileMetadata);
       }
       else {
-         return new InfinispanIndexInput(cache, fileKey, chunkSize, fileMetadata);
+         boolean locked = readLocks.aquireReadLock(name);
+         if (!locked) {
+            // safest reaction is to tell this file doesn't exist anymore.
+            throw new FileNotFoundException("Error loading medatada for index file: " + fileKey);
+         }
+         return new InfinispanIndexInput(cache, fileKey, chunkSize, fileMetadata, readLocks);
       }
    }
 
@@ -268,11 +278,6 @@
       }
    }
 
-   private FileMetadata getFileMetadata(String fileName) {
-      FileCacheKey key = new FileCacheKey(indexName, fileName);
-      return (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).get(key);
-   }
-
    @Override
    public String toString() {
       return "InfinispanDirectory{" + "indexName='" + indexName + '\'' + '}';
@@ -287,4 +292,8 @@
       return list();
    }
    
+   private static LockFactory makeDefaultLockFactory(Cache cache, String indexName) {
+      return new BaseLockFactory(cache, indexName);
+   }
+   
 }

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -27,6 +27,7 @@
 import org.apache.lucene.store.IndexInput;
 import org.infinispan.AdvancedCache;
 import org.infinispan.context.Flag;
+import org.infinispan.lucene.readlocks.SegmentReadLocker;
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
@@ -47,8 +48,9 @@
    private final FileMetadata file;
    private final FileCacheKey fileKey;
    private final int chunkSize;
-   private final FileReadLockKey readLockKey;
+   private final SegmentReadLocker readLocks;
    private final boolean trace;
+   private final String filename;
 
    private int currentBufferSize;
    private byte[] buffer;
@@ -57,118 +59,19 @@
 
    private boolean isClone;
 
-   public InfinispanIndexInput(AdvancedCache cache, FileCacheKey fileKey, int chunkSize, FileMetadata fileMetadata) throws FileNotFoundException {
+   public InfinispanIndexInput(AdvancedCache cache, FileCacheKey fileKey, int chunkSize, FileMetadata fileMetadata, SegmentReadLocker readLocks) throws FileNotFoundException {
       this.cache = cache;
       this.fileKey = fileKey;
       this.chunkSize = chunkSize;
       this.file = fileMetadata;
-      final String filename = fileKey.getFileName();
-      this.readLockKey = new FileReadLockKey(fileKey.getIndexName(), filename);
-      aquireReadLock();
+      this.readLocks = readLocks;
+      this.filename = fileKey.getFileName();
       trace = log.isTraceEnabled();
       if (trace) {
          log.trace("Opened new IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
       }
    }
 
-   private void releaseReadLock() {
-      releaseReadLock(readLockKey, cache);
-   }
-   
-   /**
-    * Releases a read-lock for this file, so that if it was marked as deleted and
-    * no other {@link InfinispanIndexInput} instances are using it, then it will
-    * be effectively deleted.
-    * 
-    * @see #aquireReadLock()
-    * @see InfinispanDirectory#deleteFile(String)
-    * 
-    * @param readLockKey the key pointing to the reference counter value
-    * @param cache The cache containing the reference counter value
-    */
-   static void releaseReadLock(FileReadLockKey readLockKey, AdvancedCache cache) {
-      int newValue = 0;
-      // spinning as we currently don't mandate transactions, so no proper lock support available
-      boolean done = false;
-      Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
-      while (done == false) {
-         if (lockValue == null) {
-            lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(0));
-            done = (null == lockValue);
-         }
-         else {
-            int refCount = (Integer) lockValue;
-            newValue = refCount - 1;
-            done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
-            if (!done) {
-               lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
-            }
-         }
-      }
-      if (newValue == 0) {
-         realFileDelete(readLockKey, cache);
-      }
-   }
-   
-   /**
-    * The {@link InfinispanDirectory#deleteFile(String)} is not deleting the elements from the cache
-    * but instead flagging the file as deletable.
-    * This method will really remove the elements from the cache; should be invoked only
-    * by {@link #releaseReadLock(FileReadLockKey, AdvancedCache)} after having verified that there
-    * are no users left in need to read these chunks.
-    * 
-    * @param readLockKey the key representing the values to be deleted
-    * @param cache the cache containing the elements to be deleted
-    */
-   static void realFileDelete(FileReadLockKey readLockKey, AdvancedCache cache) {
-      final String indexName = readLockKey.getIndexName();
-      final String filename = readLockKey.getFileName();
-      int i = 0;
-      Object removed;
-      ChunkCacheKey chunkKey = new ChunkCacheKey(indexName, filename, i);
-      do {
-         removed = cache.withFlags(Flag.SKIP_LOCKING).remove(chunkKey);
-         chunkKey = new ChunkCacheKey(indexName, filename, ++i);
-      } while (removed != null);
-      FileCacheKey key = new FileCacheKey(indexName, filename);
-//      boolean batch = cache.startBatch(); //FIXME when enabling batch, org.infinispan.lucene.profiling.CacheStoreStressTest fails to remove the readLockKey
-      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).remove(readLockKey);
-      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).remove(key);
-//      if (batch) cache.endBatch(true);
-   }
-
-   /**
-    * Acquires a readlock on all chunks for this file, to make sure chunks are not deleted while
-    * iterating on the group. This is needed to avoid an eager lock on all elements.
-    * 
-    * @see #releaseReadLock(FileReadLockKey, AdvancedCache)
-    */
-   private void aquireReadLock() throws FileNotFoundException {
-      // spinning as we currently don't mandate transactions, so no proper lock support is available
-      boolean done = false;
-      Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
-      while (done == false) {
-         if (lockValue != null) {
-            int refCount = (Integer) lockValue;
-            if (refCount == 0) {
-               // too late: in case refCount==0 the delete process already triggered chunk deletion.
-               // safest reaction is to tell this file doesn't exist anymore.
-               throw new FileNotFoundException("segment file was deleted");
-            }
-            Integer newValue = Integer.valueOf(refCount + 1);
-            done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
-            if (!done) {
-               lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
-            }
-         } else {
-            // readLocks are not stored, so if there's no value assume it's ==1, which means
-            // existing file, not deleted, nobody else owning a read lock
-            lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(2));
-            done = (null == lockValue);
-         }
-      }
-   }
-
    @Override
    public byte readByte() throws IOException {
       if (bufferPosition >= currentBufferSize) {
@@ -204,9 +107,9 @@
       currentLoadedChunk = -1;
       buffer = null;
       if (isClone) return;
-      releaseReadLock();
+      readLocks.deleteOrReleaseReadLock(filename);
       if (trace) {
-         log.trace("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+         log.trace("Closed IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
       }
    }
 
@@ -230,7 +133,7 @@
    }
 
    private void setBufferToCurrentChunk() throws IOException {
-      ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+      ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), filename, currentLoadedChunk);
       buffer = (byte[]) cache.withFlags(Flag.SKIP_LOCKING).get(key);
       if (buffer == null) {
          throw new IOException("Chunk value could not be found for key " + key);
@@ -241,7 +144,7 @@
    // Lucene might try seek(pos) using an illegal pos value
    // RAMDirectory teaches to position the cursor to the end of previous chunk in this case
    private void setBufferToCurrentChunkIfPossible() throws IOException {
-      ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+      ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), filename, currentLoadedChunk);
       buffer = (byte[]) cache.withFlags(Flag.SKIP_LOCKING).get(key);
       if (buffer == null) {
          currentLoadedChunk--;

Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -48,21 +48,24 @@
    private final AdvancedCache cache;
    private final FileMetadata file;
    private final FileCacheKey fileKey;
+   private final FileListOperations fileOps;
    private final boolean trace;
 
    private byte[] buffer;
    private int positionInBuffer = 0;
    private long filePosition = 0;
    private int currentChunkNumber = 0;
+   private boolean needsAddingToFileList = true;
 
    private boolean transactionRunning = false;
 
-   public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileMetadata fileMetadata) throws IOException {
+   public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileListOperations fileList) throws IOException {
       this.cache = cache;
       this.fileKey = fileKey;
       this.bufferSize = bufferSize;
+      this.fileOps = fileList;
       this.buffer = new byte[this.bufferSize];
-      this.file = fileMetadata;
+      this.file = new FileMetadata();
       trace = log.isTraceEnabled();
       if (trace) {
          log.trace("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
@@ -75,7 +78,7 @@
       if (readBuffer==null) {
          return new byte[bufferSize];
       }
-      else if (readBuffer.length==bufferSize) {
+      else if (readBuffer.length == bufferSize) {
          return readBuffer;
       }
       else {
@@ -145,9 +148,6 @@
       // size changed, apply change to file header
       file.touch();
       resizeFileIfNeeded();
-      if (file.getSize() < filePosition) {
-         file.setSize(filePosition);
-      }
       byte[] bufferToFlush = buffer;
       if (isWritingOnLastChunk()) {
          int newBufferSize = (int) (file.getSize() % bufferSize);
@@ -157,16 +157,25 @@
          }
       }
       boolean microbatch = false;
+      // we don't want to spawn a nested transaction, just to write in batch:
       if ( ! transactionRunning) {
          microbatch = cache.startBatch();
       }
       // add chunk to cache
-      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(key, bufferToFlush);
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(key, bufferToFlush);
       // override existing file header with new size and last time access
-      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(fileKey, file);
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileKey, file);
+      registerToFileListIfNeeded();
       if (microbatch) cache.endBatch(true);
    }
 
+   private void registerToFileListIfNeeded() {
+      if (needsAddingToFileList) {
+         fileOps.addFileName(this.fileKey.getFileName());
+         needsAddingToFileList = false;
+      }
+   }
+
    private void resizeFileIfNeeded() {
       if (file.getSize() < filePosition) {
          file.setSize(filePosition);
@@ -215,7 +224,6 @@
    public long length() throws IOException {
       return file.getSize();
    }
-
    
    private boolean isWritingOnLastChunk() {
       int lastChunkNumber = (int) (file.getSize() / bufferSize);

Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.Cache;
+import org.infinispan.context.Flag;
+import org.infinispan.lucene.ChunkCacheKey;
+import org.infinispan.lucene.FileCacheKey;
+import org.infinispan.lucene.FileMetadata;
+import org.infinispan.lucene.FileReadLockKey;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.lucene.InfinispanIndexInput;
+
+/**
+ * <p>DistributedSegmentReadLocker stores reference counters in the cache
+ * to keep track of the number of clients still needing to be able
+ * to read a segment. It makes extensive usage of Infinispan's atomic
+ * operations.</p>
+ * <p>Locks stored this way are not optimally performing as it might spin
+ * on remote invocations, and might fail to cleanup some garbage
+ * in case a node is disconnected without having released the readlock.</p>
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class DistributedSegmentReadLocker implements SegmentReadLocker {
+   
+   private final AdvancedCache cache;
+   private final String indexName;
+   private final int chunkSize;
+   
+   public DistributedSegmentReadLocker(Cache cache, String indexName, int chunkSize) {
+      if (cache == null)
+         throw new IllegalArgumentException("Cache must not be null");
+      if (indexName == null)
+         throw new IllegalArgumentException("index name must not be null");
+      if (chunkSize <= 0)
+         throw new IllegalArgumentException("chunkSize must be a non-null positive integer");
+      this.chunkSize = chunkSize;
+      this.indexName = indexName;
+      this.cache = cache.getAdvancedCache();
+   }
+
+   /**
+    * Deletes or releases a read-lock for the specified filename, so that if it was marked as deleted and
+    * no other {@link InfinispanIndexInput} instances are reading from it, then it will
+    * be effectively deleted.
+    * 
+    * For removal of readLockKey the SKIP_CACHE_STORE is not used to make sure even
+    * values eventually stored by a rehash are cleaned up.
+    * 
+    * @see #aquireReadLock(String)
+    * @see InfinispanDirectory#deleteFile(String)
+    */
+   @Override
+   public void deleteOrReleaseReadLock(String filename) {
+      FileReadLockKey readLockKey = new FileReadLockKey(indexName, filename);
+      int newValue = 0;
+      boolean done = false;
+      Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+      while (done == false) {
+         if (lockValue == null) {
+            lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(0));
+            done = (null == lockValue);
+         }
+         else {
+            int refCount = (Integer) lockValue;
+            newValue = refCount - 1;
+            done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
+            if (!done) {
+               lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+            }
+         }
+      }
+      if (newValue == 0) {
+         realFileDelete(readLockKey, cache, chunkSize);
+      }
+   }
+   
+   /**
+    * Acquires a readlock on all chunks for this file, to make sure chunks are not deleted while
+    * iterating on the group. This is needed to avoid an eager lock on all elements.
+    * 
+    * If no value is found in the cache, a disambiguation procedure is needed: not value
+    * might mean both "existing, no readlocks, no deletions in progress", but also "not existent file".
+    * The first possibility is coded as no value to avoid storing readlocks in a permanent store,
+    * which would unnecessarily slow down and provide unwanted long term storage of the lock;
+    * so the value is treated as one if not found, but obviously it's also not found for non-existent
+    * or concurrently deleted files.
+    * 
+    * @param name the name of the "file" for which a readlock is requested 
+    * 
+    * @see #deleteOrReleaseReadLock(String)
+    */
+   public boolean aquireReadLock(String filename) {
+      FileReadLockKey readLockKey = new FileReadLockKey(indexName, filename);
+      Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+      boolean done = false;
+      while (done == false) {
+         if (lockValue != null) {
+            int refCount = (Integer) lockValue;
+            if (refCount == 0) {
+               // too late: in case refCount==0 the delete is being performed
+               return false;
+            }
+            Integer newValue = Integer.valueOf(refCount + 1);
+            done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, lockValue, newValue);
+            if ( ! done) {
+               lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+            }
+         } else {
+            // readLocks are not stored, so if there's no value assume it's ==1, which means
+            // existing file, not deleted, nobody else owning a read lock. but check for ambiguity
+            lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(2));
+            done = (null == lockValue);
+            if (done) {
+               // have to check now that the fileKey still exists to prevent the race condition of 
+               // T1 fileKey exists - T2 delete file and remove readlock - T1 putIfAbsent(readlock, 2)
+               final FileCacheKey fileKey = new FileCacheKey(indexName, filename);
+               if (cache.get(fileKey) == null) {
+                  cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(readLockKey);
+                  return false;
+               }
+            }
+         }
+      }
+      return true;
+   }
+   
+   /**
+    * The {@link InfinispanDirectory#deleteFile(String)} is not deleting the elements from the cache
+    * but instead flagging the file as deletable.
+    * This method will really remove the elements from the cache; should be invoked only
+    * by {@link #releaseReadLock(FileReadLockKey, AdvancedCache)} after having verified that there
+    * are no users left in need to read these chunks.
+    * 
+    * @param readLockKey the key representing the values to be deleted
+    * @param cache the cache containing the elements to be deleted
+    */
+   static void realFileDelete(FileReadLockKey readLockKey, AdvancedCache cache, int chunkSize) {
+      final String indexName = readLockKey.getIndexName();
+      final String filename = readLockKey.getFileName();
+      FileCacheKey key = new FileCacheKey(indexName, filename);
+      boolean batch = cache.startBatch();
+      FileMetadata file = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).remove(key);
+      int numChunks = (int) (file.getSize() / chunkSize);
+      for (int i = 0; i <= numChunks; i++) {
+         ChunkCacheKey chunkKey = new ChunkCacheKey(indexName, filename, i);
+         cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(chunkKey);  
+      }
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(key);
+      //last operation, as being set as value==0 it prevents others from using it during the deletion process:
+      cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(readLockKey);
+      if (batch) {
+         cache.endBatch(true);
+      }
+   }
+
+}

Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.infinispan.Cache;
+import org.infinispan.lucene.InfinispanDirectory;
+
+/**
+ * LocalLockMergingSegmentReadLocker decorates the {@link DistributedSegmentReadLocker} to minimize
+ * remote operations in case several IndexReaders are opened on the same {@link InfinispanDirectory}.
+ * It keeps track of locks which where already acquired for a specific filename from another request on
+ * the same node and merges the request so that the different clients share the same remote lock.
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class LocalLockMergingSegmentReadLocker implements SegmentReadLocker {
+
+   private final ConcurrentHashMap<String, LocalReadLock> localLocks = new ConcurrentHashMap<String, LocalReadLock>();
+   private final DistributedSegmentReadLocker delegate;
+
+   /**
+    * Create a new ReadLockContainer.
+    * 
+    * @param cache
+    * @param indexName
+    */
+   public LocalLockMergingSegmentReadLocker(Cache cache, String indexName, int chunkSize) {
+      this.delegate = new DistributedSegmentReadLocker(cache, indexName, chunkSize);
+   }
+
+   public boolean aquireReadLock(String name) {
+      LocalReadLock localReadLock = getLocalLockByName(name);
+      boolean aquired = localReadLock.aquire();
+      if (aquired) {
+         return true;
+      }
+      else {
+         // cleanup
+         localLocks.remove(name);
+         return false;
+      }
+   }
+   
+   private LocalReadLock getLocalLockByName(String name) {
+      LocalReadLock localReadLock = localLocks.get(name);
+      if (localReadLock == null) {
+         LocalReadLock newReadLock = new LocalReadLock(name);
+         LocalReadLock prevReadLock = localLocks.putIfAbsent(name, newReadLock);
+         localReadLock = prevReadLock == null ? newReadLock : prevReadLock;
+      }
+      return localReadLock;
+   }
+
+   public void deleteOrReleaseReadLock(String name) {
+      getLocalLockByName(name).release();
+   }
+   
+   private class LocalReadLock {
+      private final String name;
+      private int value = 0;
+
+      LocalReadLock(String name) {
+         this.name = name;
+      }
+
+      /**
+       * @return true if the lock was acquired, false if it's too late: the file
+       * was deleted and this LocalReadLock should be removed too.
+       */
+      synchronized boolean aquire() {
+         if (value == 0) {
+            boolean haveIt = delegate.aquireReadLock(name);
+            if (haveIt) {
+               value = 1;
+               return true;
+            } else {
+               value = -1;
+               return false;
+            }
+         } else if (value == -1) {
+            // it was deleted just a two lines ago
+            return false;
+         } else {
+            value++;
+            return true;
+         }
+      }
+      
+      synchronized void release() {
+         value--;
+         if (value <= 0) {
+            localLocks.remove(name);
+            delegate.deleteOrReleaseReadLock(name);
+         }
+      }
+   }
+
+}

Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+/**
+ * NoopSegmentReadLocker ignores requests to apply a readlock, but also ignores requests to delete files.
+ * It might be a good choice for read-only indexes, or cases in which leaving unused segments in the index is
+ * not considered a problem.
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class NoopSegmentReadLocker implements SegmentReadLocker {
+
+   /**
+    * doesn't do anything and returns true
+    */
+   @Override
+   public boolean aquireReadLock(String filename) {
+      return true;
+   }
+
+   /**
+    * doesn't do anything
+    */
+   @Override
+   public void deleteOrReleaseReadLock(String filename) {
+      return;
+   }
+
+}

Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import org.infinispan.lucene.InfinispanDirectory;
+
+/**
+ * <p>SegmentReadLocker implementations have to make sure that segments are not deleted while they are
+ * being used by an IndexReader.</p>
+ * <p>When an {@link org.infinispan.lucene.InfinispanIndexInput} is opened on a file which is split in smaller chunks,
+ * {@link #aquireReadLock(String)} is invoked; then the {@link #deleteOrReleaseReadLock(String)} is
+ * invoked when the stream is closed.</p>
+ * <p>The same {@link #deleteOrReleaseReadLock(String)} is invoked when a file is deleted, so if this invocation is not balancing
+ * a lock acquire this implementation must delete all segment chunks and the associated metadata.</p>
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public interface SegmentReadLocker {
+
+   /**
+    * It will release a previously acquired readLock, or
+    * if no readLock was acquired it will mark the file to be deleted as soon
+    * as all pending locks are releases.
+    * If it's invoked on a file without pending locks the file is deleted.
+    * 
+    * @param fileName of the file to release or delete
+    * @see InfinispanDirectory#deleteFile(String)
+    */
+   void deleteOrReleaseReadLock(String fileName);
+
+   /**
+    * Acquires a readlock, in order to prevent other invocations to {@link #deleteOrReleaseReadLock(String)}
+    * from deleting the file.
+    * 
+    * @param filename
+    * @return true if the lock was acquired, false if the implementation
+    * detects the file does not exist, or that it's being deleted by some other thread.
+    * @see InfinispanDirectory#openInput(String)
+    */
+   boolean aquireReadLock(String filename);
+
+}

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -51,6 +51,10 @@
     *           The name of the unique index stored in the cache
     */
    public static void verifyDirectoryStructure(Cache cache, String indexName) {
+      verifyDirectoryStructure(cache, indexName, false);
+   }
+
+   public static void verifyDirectoryStructure(Cache cache, String indexName, boolean wasAStressTest) {
       Set<String> fileList = (Set<String>) cache.get(new FileListCacheKey(indexName));
       Assert.assertNotNull(fileList);
       int fileListCacheKeyInstances = 0;
@@ -59,7 +63,10 @@
             ChunkCacheKey existingChunkKey = (ChunkCacheKey) key;
             String filename = existingChunkKey.getFileName();
             Assert.assertEquals(existingChunkKey.getIndexName(), indexName);
-            Assert.assertTrue(fileList.contains(filename));
+            // the chunk must either match an entry in fileList or have a pending readLock:
+//            if (fileList.contains(filename) == false) {
+//               verifyReadlockExists(cache, indexName, filename);
+//            }
             Object value = cache.get(existingChunkKey);
             Assert.assertNotNull(value);
             Assert.assertTrue(value instanceof byte[]);
@@ -68,7 +75,12 @@
          } else if (key instanceof FileCacheKey) {
             FileCacheKey fileCacheKey = (FileCacheKey) key;
             Assert.assertEquals(fileCacheKey.getIndexName(), indexName);
-            Assert.assertTrue(fileList.contains(fileCacheKey.getFileName()), fileCacheKey + " should not have existed");
+            String filename = fileCacheKey.getFileName();
+//            if (fileList.contains(filename) == false) {
+//               // if the file is not registered, assert that a readlock prevented it from being
+//               // deleted:
+//               verifyReadlockExists(cache, indexName, filename);
+//            }
             Object value = cache.get(fileCacheKey);
             Assert.assertNotNull(value);
             Assert.assertTrue(value instanceof FileMetadata);
@@ -76,24 +88,36 @@
             long totalFileSize = metadata.getSize();
             long actualFileSize = deepCountFileSize(fileCacheKey, cache);
             Assert.assertEquals(actualFileSize, totalFileSize);
+            Assert.assertTrue(fileList.contains(fileCacheKey.getFileName()), fileCacheKey + " should not have existed");
          } else if (key instanceof FileListCacheKey) {
             fileListCacheKeyInstances++;
             Assert.assertEquals(1, fileListCacheKeyInstances);
          } else if (key instanceof FileReadLockKey) {
             FileReadLockKey readLockKey = (FileReadLockKey) key;
             Assert.assertEquals(readLockKey.getIndexName(), indexName);
-            Assert.assertTrue(fileList.contains(readLockKey.getFileName()), readLockKey + " should not have existed");
             Object value = cache.get(readLockKey);
-            Assert.assertNotNull(value);
-            Assert.assertTrue(value instanceof Integer);
-            int readLockCount = (Integer) value;
-            Assert.assertEquals(readLockCount, 1, " for FileReadLockKey " + readLockKey);
+            // we verify that a ReadLock exists only for existing files
+            Assert.assertTrue(cache.get(new FileCacheKey(indexName, readLockKey.getFileName())) != null, key + " left over from deleted "
+                     + readLockKey.getFileName());
+            Assert.assertTrue(cache.get(new ChunkCacheKey(indexName, readLockKey.getFileName(), 0)) != null);
+            Assert.assertTrue(fileList.contains(readLockKey.getFileName()), "readlock still exists but the file was deleted: "
+                     + readLockKey);
+            Assert.assertTrue(value == null || value.equals(1));
          } else {
             Assert.fail("an unexpected key was found in the cache having key type " + key.getClass() + " toString:" + key);
          }
       }
    }
 
+   private static void verifyReadlockExists(Cache cache, String indexName, String filename) {
+      FileReadLockKey readLockKey = new FileReadLockKey(indexName, filename);
+      Object readLockValue = cache.get(readLockKey);
+      Assert.assertNotNull(readLockValue);
+      Assert.assertTrue(readLockValue instanceof Integer);
+      int v = (Integer) readLockValue;
+      Assert.assertTrue(v > 1, "readlock exists for unregistered file of unexpected value: " + v + " for file: " + filename);
+   }
+
    /**
     * For a given FileCacheKey return the total size of all chunks related to the file.
     * 
@@ -117,5 +141,44 @@
          }
       }
    }
+   
+   public static void assertFileNotExists(Cache cache, String indexName, String fileName) {
+      Set<String> fileList = (Set<String>) cache.get(new FileListCacheKey(indexName));
+      Assert.assertNotNull(fileList);
+      Assert.assertFalse(fileList.contains(fileName));
+      Assert.assertNull(cache.get(new FileCacheKey(indexName, fileName)), "metadata found for deleted file");
+      for (int i = 0; i < 10; i++) {
+         ChunkCacheKey key = new ChunkCacheKey(indexName, fileName, i);
+         Assert.assertNull(cache.get(key), "a chunk was found for a deleted file: " + key);
+      }
+   }
 
+   /**
+    * Verified the file exists and has a specified value for readLock;
+    * Consider that null should be interpreted as value 1;
+    */
+   public static void assertFileExistsHavingRLCount(Cache cache, String fileName, String indexName, int expectedReadcount, int chunkSize, boolean expectRegisteredInFat) {
+      Set<String> fileList = (Set<String>) cache.get(new FileListCacheKey(indexName));
+      Assert.assertNotNull(fileList);
+      Assert.assertTrue(fileList.contains(fileName) == expectRegisteredInFat);
+      FileMetadata metadata = (FileMetadata) cache.get(new FileCacheKey(indexName, fileName));
+      Assert.assertNotNull(metadata);
+      long totalFileSize = metadata.getSize();
+      int chunkNumbers = (int)(totalFileSize / chunkSize);
+      for (int i = 0; i < chunkNumbers; i++) {
+         Assert.assertNotNull(cache.get(new ChunkCacheKey(indexName, fileName, i)));
+      }
+      FileReadLockKey readLockKey = new FileReadLockKey(indexName,fileName);
+      Object value = cache.get(readLockKey);
+      if (expectedReadcount == 1) {
+         Assert.assertTrue(value == null || Integer.valueOf(1).equals(value), "readlock value is " + value);
+      }
+      else {
+         Assert.assertNotNull(value);
+         Assert.assertTrue(value instanceof Integer);
+         int v = (Integer)value;
+         Assert.assertEquals(v, expectedReadcount);
+      }
+   }
+
 }

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -74,7 +74,7 @@
       assert cache!=null;
       InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
       PerformanceCompareStressTest.stressTestDirectory(dir, "InfinispanClusteredWith-Store");
-      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName, true);
    }
    
 }

Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java	2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -97,7 +97,7 @@
       // TestingUtil.setDelayForCache(cache, 0, 0);
       InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
       stressTestDirectory(dir, "InfinispanClustered-delayedIO:0");
-      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+      verifyDirectoryState();
    }
 
    @Test
@@ -105,7 +105,7 @@
       TestingUtil.setDelayForCache(cache, 4, 4);
       InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
       stressTestDirectory(dir, "InfinispanClustered-delayedIO:4");
-      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+      verifyDirectoryState();
    }
 
    @Test
@@ -113,17 +113,17 @@
       TestingUtil.setDelayForCache(cache, 40, 40);
       InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
       stressTestDirectory(dir, "InfinispanClustered-delayedIO:40");
-      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+      verifyDirectoryState();
    }
 
    @Test
    public void profileInfinispanLocalDirectory() throws InterruptedException, IOException {
       CacheContainer cacheContainer = CacheTestSupport.createLocalCacheManager();
       try {
-         Cache cache = cacheContainer.getCache();
+         cache = cacheContainer.getCache();
          InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
          stressTestDirectory(dir, "InfinispanLocal");
-         DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+         verifyDirectoryState();
       } finally {
          cacheContainer.stop();
       }
@@ -166,5 +166,9 @@
       TestingUtil.killCacheManagers(cacheFactory);
       TestingUtil.recursiveFileRemove(indexName);
    }
+   
+   private void verifyDirectoryState() {
+      DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName, true);
+   }
 
 }

Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.lucene.CacheTestSupport;
+import org.infinispan.lucene.DirectoryIntegrityCheck;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * DistributedSegmentReadLockerTest represents a quick check on the functionality
+ * of {@link org.infinispan.lucene.readlocks.DistributedSegmentReadLocker}
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "lucene.readlocks.DistributedSegmentReadLockerTest")
+public class DistributedSegmentReadLockerTest extends MultipleCacheManagersTest {
+   
+   /** The Index name */
+   protected static final String INDEX_NAME = "indexName";
+   /** The cache name */
+   protected static final String CACHE_NAME = "lucene";
+   /** Chunk Size **/
+   protected static final int CHUNK_SIZE = 6;
+   /** The name of the test file **/
+   protected static final String filename = "readme.txt";
+
+   protected Cache cache0;
+   protected Cache cache1;
+   protected Directory dirA;
+   protected Directory dirB;
+
+   @Override
+   protected void createCacheManagers() throws Throwable {
+      Configuration configuration = CacheTestSupport.createTestConfiguration();
+      createClusteredCaches(2, CACHE_NAME, configuration);
+   }
+   
+   @BeforeMethod
+   protected void prepare() throws IOException {
+      cache0 = cache(0, CACHE_NAME);
+      cache1 = cache(1, CACHE_NAME);
+      dirA = createDirectory(cache0);
+      dirB = createDirectory(cache1);
+      CacheTestSupport.initializeDirectory(dirA);
+   }
+   
+   @Test
+   @SuppressWarnings("unchecked")
+   public void testIndexWritingAndFinding() throws IOException {
+      verifyBoth(cache0,cache1);
+      IndexOutput indexOutput = dirA.createOutput(filename);
+      indexOutput.writeString("no need to write, nobody ever will read this");
+      indexOutput.flush();
+      indexOutput.close();
+      assertFileExistsHavingRLCount(filename, 1, true);
+      IndexInput openInput = dirB.openInput(filename);
+      assertFileExistsHavingRLCount(filename, 2, true);
+      dirA.deleteFile(filename);
+      assertFileExistsHavingRLCount(filename, 1, false);
+      //Lucene does use clone() - lock implementation ignores it as a clone is
+      //cast on locked segments and released before the close on the parent object
+      IndexInput clone = (IndexInput) openInput.clone();
+      assertFileExistsHavingRLCount(filename, 1, false);
+      clone.close();
+      assertFileExistsHavingRLCount(filename, 1, false);
+      openInput.close();
+      assertFileNotExists(filename);
+      dirA.close();
+      dirB.close();
+      verifyBoth(cache0, cache1);
+   }
+
+   void assertFileNotExists(String fileName) {
+      DirectoryIntegrityCheck.assertFileNotExists(cache0, INDEX_NAME, fileName);
+      DirectoryIntegrityCheck.assertFileNotExists(cache1, INDEX_NAME, fileName);
+   }
+
+   void assertFileExistsHavingRLCount(String fileName, int expectedReadcount, boolean expectRegisteredInFat) {
+      DirectoryIntegrityCheck.assertFileExistsHavingRLCount(cache0, fileName, INDEX_NAME, expectedReadcount, CHUNK_SIZE, expectRegisteredInFat);
+      DirectoryIntegrityCheck.assertFileExistsHavingRLCount(cache1, fileName, INDEX_NAME, expectedReadcount, CHUNK_SIZE, expectRegisteredInFat);
+   }
+   
+   Directory createDirectory(Cache cache) {
+      return new InfinispanDirectory(cache, INDEX_NAME, CHUNK_SIZE,
+               new DistributedSegmentReadLocker(cache, INDEX_NAME, CHUNK_SIZE));
+   }
+
+   void verifyBoth(Cache cache0, Cache cache1) {
+      DirectoryIntegrityCheck.verifyDirectoryStructure(cache0, INDEX_NAME);
+      DirectoryIntegrityCheck.verifyDirectoryStructure(cache1, INDEX_NAME);
+   }
+
+}

Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java	                        (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java	2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.infinispan.Cache;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.testng.annotations.Test;
+
+/**
+ * LocalLockMergingSegmentReadLockerTest represents a quick check on the functionality
+ * of {@link org.infinispan.lucene.readlocks.LocalLockMergingSegmentReadLocker}
+ * 
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "lucene.readlocks.LocalLockMergingSegmentReadLockerTest")
+public class LocalLockMergingSegmentReadLockerTest extends DistributedSegmentReadLockerTest {
+   
+   @Test @Override
+   public void testIndexWritingAndFinding() throws IOException {
+      verifyBoth(cache0,cache1);
+      IndexOutput indexOutput = dirA.createOutput(filename);
+      indexOutput.writeString("no need to write, nobody ever will read this");
+      indexOutput.flush();
+      indexOutput.close();
+      assertFileExistsHavingRLCount(filename, 1, true);
+      IndexInput firstOpenOnB = dirB.openInput(filename);
+      assertFileExistsHavingRLCount(filename, 2, true);
+      dirA.deleteFile(filename);
+      assertFileExistsHavingRLCount(filename, 1, false);
+      //Lucene does use clone() - lock implementation ignores it as a clone is
+      //cast on locked segments and released before the close on the parent object
+      IndexInput cloneOfFirstOpenOnB = (IndexInput) firstOpenOnB.clone();
+      assertFileExistsHavingRLCount(filename, 1, false);
+      cloneOfFirstOpenOnB.close();
+      assertFileExistsHavingRLCount(filename, 1, false);
+      IndexInput firstOpenOnA = dirA.openInput(filename);
+      assertFileExistsHavingRLCount(filename, 2, false);
+      IndexInput secondOpenOnA = dirA.openInput(filename);
+      assertFileExistsHavingRLCount(filename, 2, false);
+      firstOpenOnA.close();
+      assertFileExistsHavingRLCount(filename, 2, false);
+      secondOpenOnA.close();
+      assertFileExistsHavingRLCount(filename, 1, false);
+      firstOpenOnB.close();
+      assertFileNotExists(filename);
+      dirA.close();
+      dirB.close();
+      verifyBoth(cache0, cache1);
+   }
+   
+   @Override
+   Directory createDirectory(Cache cache) {
+      return new InfinispanDirectory(cache, INDEX_NAME, CHUNK_SIZE,
+               new LocalLockMergingSegmentReadLocker(cache, INDEX_NAME, CHUNK_SIZE));
+   }
+
+}



More information about the infinispan-commits mailing list