[infinispan-commits] Infinispan SVN: r2220 - in branches/4.1.x/lucene-directory/src: main/java/org/infinispan/lucene/readlocks and 3 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Sat Aug 14 17:43:49 EDT 2010
Author: sannegrinovero
Date: 2010-08-14 17:43:48 -0400 (Sat, 14 Aug 2010)
New Revision: 2220
Added:
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java
Modified:
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
Log:
[ISPN-603] (Extract Lucene readlock logic to an overridable component, provide alternative implementations) - branch 4.1
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/ChunkCacheKey.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -30,7 +30,7 @@
* @author Lukasz Moren
* @author Sanne Grinovero
*/
-final class ChunkCacheKey implements Serializable {
+public final class ChunkCacheKey implements Serializable {
/** The serialVersionUID */
private static final long serialVersionUID = 4429712073623290126L;
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListCacheKey.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -30,7 +30,7 @@
* @author Lukasz Moren
* @author Sanne Grinovero
*/
-final class FileListCacheKey implements Serializable {
+public final class FileListCacheKey implements Serializable {
/** The serialVersionUID */
private static final long serialVersionUID = 8965108175527988255L;
Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileListOperations.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene;
+
+import java.io.FileNotFoundException;
+import java.util.Set;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.context.Flag;
+import org.infinispan.util.concurrent.ConcurrentHashSet;
+
+/**
+ * Collects operations on the existing fileList, stored as a Set<String> having key
+ * of type FileListCacheKey(indexName).
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+class FileListOperations {
+
+ private final FileListCacheKey fileListCacheKey;
+ private final AdvancedCache cache;
+ private final String indexName;
+
+ FileListOperations(AdvancedCache cache, String indexName){
+ this.cache = cache;
+ this.indexName = indexName;
+ this.fileListCacheKey = new FileListCacheKey(indexName);
+ }
+
+ /**
+ * @return the current list of files being part of the index
+ */
+ Set<String> getFileList() {
+ Set<String> fileList = (Set<String>) cache.withFlags(Flag.SKIP_LOCKING).get(fileListCacheKey);
+ if (fileList == null)
+ fileList = new ConcurrentHashSet<String>();
+ return fileList;
+ }
+
+ /**
+ * Deleted a file from the list of files actively part of the index
+ * @param fileName
+ */
+ void deleteFileName(String fileName) {
+ Set<String> fileList = getFileList();
+ boolean done = fileList.remove(fileName);
+ if (done) {
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileListCacheKey, fileList);
+ }
+ }
+
+ /**
+ * Adds a new fileName in the list of files making up this index
+ * @param fileName
+ */
+ void addFileName(String fileName) {
+ Set<String> fileList = getFileList();
+ boolean done = fileList.add(fileName);
+ if (done) {
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileListCacheKey, fileList);
+ }
+ }
+
+ /**
+ * @param fileName
+ * @return the FileMetadata associated with the fileName
+ * @throws FileNotFoundException if the metadata was not found
+ */
+ FileMetadata getFileMetadata(String fileName) throws FileNotFoundException {
+ FileCacheKey key = new FileCacheKey(indexName, fileName);
+ FileMetadata metadata = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).get(key);
+ if (metadata == null) {
+ throw new FileNotFoundException(fileName);
+ }
+ return metadata;
+ }
+
+ /**
+ * Optimized implementation to perform both a remove and an add
+ * @param toRemove
+ * @param toAdd
+ */
+ void removeAndAdd(String toRemove, String toAdd) {
+ Set<String> fileList = getFileList();
+ boolean doneAdd = fileList.add(toAdd);
+ boolean doneRemove = fileList.remove(toRemove);
+ if (doneAdd || doneRemove) {
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileListCacheKey, fileList);
+ }
+ }
+
+}
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileMetadata.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -31,7 +31,7 @@
* @author Lukasz Moren
* @see org.infinispan.lucene.FileCacheKey
*/
-final class FileMetadata implements Serializable {
+public final class FileMetadata implements Serializable {
/** The serialVersionUID */
private static final long serialVersionUID = -2605615719808221213L;
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/FileReadLockKey.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -32,7 +32,7 @@
* @author Sanne Grinovero
* @since 4.0
*/
-final class FileReadLockKey implements Serializable {
+public final class FileReadLockKey implements Serializable {
/** The serialVersionUID */
private static final long serialVersionUID = 7789410500198851940L;
@@ -41,7 +41,7 @@
private final String fileName;
private final int hashCode;
- FileReadLockKey(String indexName, String fileName) {
+ public FileReadLockKey(String indexName, String fileName) {
if (indexName == null)
throw new IllegalArgumentException("indexName shall not be null");
if (fileName == null)
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanDirectory.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -34,24 +34,49 @@
import org.infinispan.Cache;
import org.infinispan.context.Flag;
import org.infinispan.lucene.locking.BaseLockFactory;
-import org.infinispan.util.concurrent.ConcurrentHashSet;
+import org.infinispan.lucene.readlocks.DistributedSegmentReadLocker;
+import org.infinispan.lucene.readlocks.SegmentReadLocker;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
/**
- * Implementation that uses Infinispan to store Lucene indices.
+ * An implementation of Lucene's {@link org.apache.lucene.store.Directory} which uses Infinispan to store Lucene indices.
+ * As the RAMDirectory the data is stored in memory, but provides some additional flexibility:
+ * <p><b>Passivation, LRU or LIRS</b> Bigger indexes can be configured to passivate cleverly selected chunks of data to a cache store.
+ * This can be a local filesystem, a network filesystem, a database or custom cloud stores like S3. See Infinispan's core documentation for a full list of available implementations, or {@link org.infinispan.loaders.CacheStore} to implement more.</p>
+ * <p><b>Non-volatile memory</b> The contents of the index can be stored in it's entirety in such a store, so that on shutdown or crash of the system data is not lost.
+ * A copy of the index will be copied to the store in sync or async depending on configuration; In case you enable
+ * Infinispan's clustering even in case of async the segments are always duplicated synchronosly to other nodes, so you can
+ * benefit from good reliability even while choosing the asynchronous mode to write the index to the slowest store implementations.</p>
+ * <p><b>Real-time change propagation</b> All changes done on a node are propagated at low latency to other nodes of the cluster; this was designed especially for
+ * interactive usage of Lucene, so that after an IndexWriter commits on one node new IndexReaders opened on any node of the cluster
+ * will be able to deliver updated search results.</p>
+ * <p><b>Distributed heap</b> Infinispan acts as a shared heap for the purpose of total memory consumption, so you can avoid hitting the slower disks even
+ * if the total size of the index can't fit in the memory of a single node: network is faster than disks, especially if the index
+ * is bigger than the memory available to cache it.</p>
+ * <p><b>Distributed locking</b>
+ * As default Lucene Directory implementations a global lock needs to protect the index from having more than an IndexWriter open; in case of a
+ * replicated or distributed index you need to enable a cluster-wide {@link org.apache.lucene.store.LockFactory}.
+ * This implementation uses by default {@link org.infinispan.lucene.locking.BaseLockFactory}; in case you want to apply changes during a JTA transaction
+ * see also {@link org.infinispan.lucene.locking.TransactionalLockFactory}.
+ * </p>
+ * <p><b>Combined store patterns</b> It's possible to combine different stores and passivation policies, so that each nodes shares the index changes
+ * quickly to other nodes, offloads less frequently used data to a per-node local filesystem, and the cluster also coordinates to keeps a safe copy on a shared store.</p>
*
- * Directory locking is assured with {@link org.infinispan.lucene.locking.TransactionalSharedLuceneLock}
- *
* @since 4.0
+ * @author Sanne Grinovero
* @author Lukasz Moren
- * @author Sanne Grinovero
+ * @see org.apache.lucene.store.Directory
+ * @see org.apache.lucene.store.LockFactory
+ * @see org.infinispan.lucene.locking.BaseLockFactory
* @see org.infinispan.lucene.locking.TransactionalLockFactory
*/
public class InfinispanDirectory extends Directory {
- // used as default chunk size if not provided in conf
- // each Lucene index segment is splitted into parts with default size defined here
+ /**
+ * used as default chunk size, can be overriden at construction time
+ * each Lucene index segment is splitted into parts with default size defined here
+ */
public final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
private static final Log log = LogFactory.getLog(InfinispanDirectory.class);
@@ -67,34 +92,47 @@
// size per dir
private final int chunkSize;
- private final FileListCacheKey fileListCacheKey;
+ private final FileListOperations fileOps;
+ private final SegmentReadLocker readLocks;
- public InfinispanDirectory(Cache cache, String indexName, LockFactory lf, int chunkSize) {
+ public InfinispanDirectory(Cache cache, String indexName, LockFactory lf, int chunkSize, SegmentReadLocker readLocker) {
if (cache == null)
throw new IllegalArgumentException("Cache must not be null");
if (indexName == null)
throw new IllegalArgumentException("index name must not be null");
if (lf == null)
throw new IllegalArgumentException("LockFactory must not be null");
- if (chunkSize<=0)
- throw new IllegalArgumentException("chunkSize must be a non-null positive integer");
+ if (readLocker == null)
+ throw new IllegalArgumentException("SegmentReadLocker must not be null");
+ if (chunkSize <= 0)
+ throw new IllegalArgumentException("chunkSize must be a positive integer");
this.cache = cache.getAdvancedCache();
this.indexName = indexName;
this.setLockFactory(lf);
this.chunkSize = chunkSize;
- this.fileListCacheKey = new FileListCacheKey(indexName);
+ this.fileOps = new FileListOperations(this.cache, indexName);
+ this.readLocks = readLocker;
}
+
+ public InfinispanDirectory(Cache cache, String indexName, LockFactory lf, int chunkSize) {
+ this(cache, indexName, lf, chunkSize,
+ new DistributedSegmentReadLocker(cache, indexName, chunkSize));
+ }
+ public InfinispanDirectory(Cache cache, String indexName, int chunkSize, SegmentReadLocker readLocker) {
+ this(cache, indexName, makeDefaultLockFactory(cache, indexName), chunkSize, readLocker);
+ }
+
public InfinispanDirectory(Cache cache, String indexName, LockFactory lf) {
this(cache, indexName, lf, DEFAULT_BUFFER_SIZE);
}
public InfinispanDirectory(Cache cache, String indexName, int chunkSize) {
- this(cache, indexName, new BaseLockFactory(cache, indexName), chunkSize);
+ this(cache, indexName, makeDefaultLockFactory(cache, indexName), chunkSize);
}
public InfinispanDirectory(Cache cache, String indexName) {
- this(cache, indexName, new BaseLockFactory(cache, indexName), DEFAULT_BUFFER_SIZE);
+ this(cache, indexName, makeDefaultLockFactory(cache, indexName), DEFAULT_BUFFER_SIZE);
}
public InfinispanDirectory(Cache cache) {
@@ -106,7 +144,7 @@
*/
public String[] list() throws IOException {
checkIsOpen();
- Set<String> filesList = getFileList();
+ Set<String> filesList = fileOps.getFileList();
String[] array = filesList.toArray(new String[0]);
return array;
}
@@ -116,7 +154,7 @@
*/
public boolean fileExists(String name) throws IOException {
checkIsOpen();
- return cache.withFlags(Flag.SKIP_LOCKING).containsKey(new FileCacheKey(indexName, name));
+ return fileOps.getFileList().contains(name);
}
/**
@@ -124,11 +162,7 @@
*/
public long fileModified(String name) throws IOException {
checkIsOpen();
- FileMetadata file = getFileMetadata(name);
- if (file == null) {
- throw new FileNotFoundException(name);
- }
- return file.getLastModified();
+ return fileOps.getFileMetadata(name).getLastModified();
}
/**
@@ -150,13 +184,8 @@
*/
public void deleteFile(String name) throws IOException {
checkIsOpen();
- Set<String> fileList = getFileList();
- boolean deleted = fileList.remove(name);
- if (deleted) {
- cache.put(fileListCacheKey, fileList);
- }
- FileReadLockKey fileReadLockKey = new FileReadLockKey(indexName, name);
- InfinispanIndexInput.releaseReadLock(fileReadLockKey, cache);
+ fileOps.deleteFileName(name);
+ readLocks.deleteOrReleaseReadLock(name);
if (log.isDebugEnabled()) {
log.debug("Removed file: {0} from index: {1}", name, indexName);
}
@@ -184,17 +213,13 @@
// rename metadata first
boolean batching = cache.startBatch();
FileCacheKey fromKey = new FileCacheKey(indexName, from);
- FileMetadata metadata = (FileMetadata) cache.remove(fromKey);
+ FileMetadata metadata = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).get(fromKey);
cache.put(new FileCacheKey(indexName, to), metadata);
- Set<String> fileList = getFileList();
- fileList.remove(from);
- fileList.add(to);
- cache.put(fileListCacheKey, fileList);
+ fileOps.removeAndAdd(from, to);
if (batching) cache.endBatch(true);
// now trigger deletion of old file chunks:
- FileReadLockKey fileFromReadLockKey = new FileReadLockKey(indexName, from);
- InfinispanIndexInput.releaseReadLock(fileFromReadLockKey, cache);
+ readLocks.deleteOrReleaseReadLock(from);
if (log.isTraceEnabled()) {
log.trace("Renamed file from: {0} to: {1} in index {2}", from, to, indexName);
}
@@ -205,11 +230,7 @@
*/
public long fileLength(String name) throws IOException {
checkIsOpen();
- final FileMetadata file = getFileMetadata(name);
- if (file == null) {
- throw new FileNotFoundException(name);
- }
- return file.getSize();
+ return fileOps.getFileMetadata(name).getSize();
}
/**
@@ -217,27 +238,10 @@
*/
public IndexOutput createOutput(String name) throws IOException {
final FileCacheKey key = new FileCacheKey(indexName, name);
- FileMetadata newFileMetadata = new FileMetadata();
- FileMetadata previous = (FileMetadata) cache.putIfAbsent(key, newFileMetadata);
- if (previous == null) {
- // creating new file
- Set<String> fileList = getFileList();
- fileList.add(name);
- cache.put(fileListCacheKey, fileList);
- return new InfinispanIndexOutput(cache, key, chunkSize, newFileMetadata);
- } else {
- return new InfinispanIndexOutput(cache, key, chunkSize, previous);
- }
+ // creating new file, metadata is added on flush() or close() of IndexOutPut
+ return new InfinispanIndexOutput(cache, key, chunkSize, fileOps);
}
- @SuppressWarnings("unchecked")
- private Set<String> getFileList() {
- Set<String> fileList = (Set<String>) cache.withFlags(Flag.SKIP_LOCKING).get(fileListCacheKey);
- if (fileList == null)
- fileList = new ConcurrentHashSet<String>();
- return fileList;
- }
-
/**
* {@inheritDoc}
*/
@@ -248,10 +252,16 @@
throw new FileNotFoundException("Error loading medatada for index file: " + fileKey);
}
else if (fileMetadata.getSize() <= chunkSize) {
+ //files smaller than chunkSize don't need a readLock
return new SingleChunkIndexInput(cache, fileKey, fileMetadata);
}
else {
- return new InfinispanIndexInput(cache, fileKey, chunkSize, fileMetadata);
+ boolean locked = readLocks.aquireReadLock(name);
+ if (!locked) {
+ // safest reaction is to tell this file doesn't exist anymore.
+ throw new FileNotFoundException("Error loading medatada for index file: " + fileKey);
+ }
+ return new InfinispanIndexInput(cache, fileKey, chunkSize, fileMetadata, readLocks);
}
}
@@ -268,11 +278,6 @@
}
}
- private FileMetadata getFileMetadata(String fileName) {
- FileCacheKey key = new FileCacheKey(indexName, fileName);
- return (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).get(key);
- }
-
@Override
public String toString() {
return "InfinispanDirectory{" + "indexName='" + indexName + '\'' + '}';
@@ -287,4 +292,8 @@
return list();
}
+ private static LockFactory makeDefaultLockFactory(Cache cache, String indexName) {
+ return new BaseLockFactory(cache, indexName);
+ }
+
}
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexInput.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -27,6 +27,7 @@
import org.apache.lucene.store.IndexInput;
import org.infinispan.AdvancedCache;
import org.infinispan.context.Flag;
+import org.infinispan.lucene.readlocks.SegmentReadLocker;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -47,8 +48,9 @@
private final FileMetadata file;
private final FileCacheKey fileKey;
private final int chunkSize;
- private final FileReadLockKey readLockKey;
+ private final SegmentReadLocker readLocks;
private final boolean trace;
+ private final String filename;
private int currentBufferSize;
private byte[] buffer;
@@ -57,118 +59,19 @@
private boolean isClone;
- public InfinispanIndexInput(AdvancedCache cache, FileCacheKey fileKey, int chunkSize, FileMetadata fileMetadata) throws FileNotFoundException {
+ public InfinispanIndexInput(AdvancedCache cache, FileCacheKey fileKey, int chunkSize, FileMetadata fileMetadata, SegmentReadLocker readLocks) throws FileNotFoundException {
this.cache = cache;
this.fileKey = fileKey;
this.chunkSize = chunkSize;
this.file = fileMetadata;
- final String filename = fileKey.getFileName();
- this.readLockKey = new FileReadLockKey(fileKey.getIndexName(), filename);
- aquireReadLock();
+ this.readLocks = readLocks;
+ this.filename = fileKey.getFileName();
trace = log.isTraceEnabled();
if (trace) {
log.trace("Opened new IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
}
}
- private void releaseReadLock() {
- releaseReadLock(readLockKey, cache);
- }
-
- /**
- * Releases a read-lock for this file, so that if it was marked as deleted and
- * no other {@link InfinispanIndexInput} instances are using it, then it will
- * be effectively deleted.
- *
- * @see #aquireReadLock()
- * @see InfinispanDirectory#deleteFile(String)
- *
- * @param readLockKey the key pointing to the reference counter value
- * @param cache The cache containing the reference counter value
- */
- static void releaseReadLock(FileReadLockKey readLockKey, AdvancedCache cache) {
- int newValue = 0;
- // spinning as we currently don't mandate transactions, so no proper lock support available
- boolean done = false;
- Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
- while (done == false) {
- if (lockValue == null) {
- lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(0));
- done = (null == lockValue);
- }
- else {
- int refCount = (Integer) lockValue;
- newValue = refCount - 1;
- done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
- if (!done) {
- lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
- }
- }
- }
- if (newValue == 0) {
- realFileDelete(readLockKey, cache);
- }
- }
-
- /**
- * The {@link InfinispanDirectory#deleteFile(String)} is not deleting the elements from the cache
- * but instead flagging the file as deletable.
- * This method will really remove the elements from the cache; should be invoked only
- * by {@link #releaseReadLock(FileReadLockKey, AdvancedCache)} after having verified that there
- * are no users left in need to read these chunks.
- *
- * @param readLockKey the key representing the values to be deleted
- * @param cache the cache containing the elements to be deleted
- */
- static void realFileDelete(FileReadLockKey readLockKey, AdvancedCache cache) {
- final String indexName = readLockKey.getIndexName();
- final String filename = readLockKey.getFileName();
- int i = 0;
- Object removed;
- ChunkCacheKey chunkKey = new ChunkCacheKey(indexName, filename, i);
- do {
- removed = cache.withFlags(Flag.SKIP_LOCKING).remove(chunkKey);
- chunkKey = new ChunkCacheKey(indexName, filename, ++i);
- } while (removed != null);
- FileCacheKey key = new FileCacheKey(indexName, filename);
-// boolean batch = cache.startBatch(); //FIXME when enabling batch, org.infinispan.lucene.profiling.CacheStoreStressTest fails to remove the readLockKey
- cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).remove(readLockKey);
- cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).remove(key);
-// if (batch) cache.endBatch(true);
- }
-
- /**
- * Acquires a readlock on all chunks for this file, to make sure chunks are not deleted while
- * iterating on the group. This is needed to avoid an eager lock on all elements.
- *
- * @see #releaseReadLock(FileReadLockKey, AdvancedCache)
- */
- private void aquireReadLock() throws FileNotFoundException {
- // spinning as we currently don't mandate transactions, so no proper lock support is available
- boolean done = false;
- Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
- while (done == false) {
- if (lockValue != null) {
- int refCount = (Integer) lockValue;
- if (refCount == 0) {
- // too late: in case refCount==0 the delete process already triggered chunk deletion.
- // safest reaction is to tell this file doesn't exist anymore.
- throw new FileNotFoundException("segment file was deleted");
- }
- Integer newValue = Integer.valueOf(refCount + 1);
- done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
- if (!done) {
- lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
- }
- } else {
- // readLocks are not stored, so if there's no value assume it's ==1, which means
- // existing file, not deleted, nobody else owning a read lock
- lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(2));
- done = (null == lockValue);
- }
- }
- }
-
@Override
public byte readByte() throws IOException {
if (bufferPosition >= currentBufferSize) {
@@ -204,9 +107,9 @@
currentLoadedChunk = -1;
buffer = null;
if (isClone) return;
- releaseReadLock();
+ readLocks.deleteOrReleaseReadLock(filename);
if (trace) {
- log.trace("Closed IndexInput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
+ log.trace("Closed IndexInput for file:{0} in index: {1}", filename, fileKey.getIndexName());
}
}
@@ -230,7 +133,7 @@
}
private void setBufferToCurrentChunk() throws IOException {
- ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+ ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), filename, currentLoadedChunk);
buffer = (byte[]) cache.withFlags(Flag.SKIP_LOCKING).get(key);
if (buffer == null) {
throw new IOException("Chunk value could not be found for key " + key);
@@ -241,7 +144,7 @@
// Lucene might try seek(pos) using an illegal pos value
// RAMDirectory teaches to position the cursor to the end of previous chunk in this case
private void setBufferToCurrentChunkIfPossible() throws IOException {
- ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), fileKey.getFileName(), currentLoadedChunk);
+ ChunkCacheKey key = new ChunkCacheKey(fileKey.getIndexName(), filename, currentLoadedChunk);
buffer = (byte[]) cache.withFlags(Flag.SKIP_LOCKING).get(key);
if (buffer == null) {
currentLoadedChunk--;
Modified: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/InfinispanIndexOutput.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -48,21 +48,24 @@
private final AdvancedCache cache;
private final FileMetadata file;
private final FileCacheKey fileKey;
+ private final FileListOperations fileOps;
private final boolean trace;
private byte[] buffer;
private int positionInBuffer = 0;
private long filePosition = 0;
private int currentChunkNumber = 0;
+ private boolean needsAddingToFileList = true;
private boolean transactionRunning = false;
- public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileMetadata fileMetadata) throws IOException {
+ public InfinispanIndexOutput(AdvancedCache cache, FileCacheKey fileKey, int bufferSize, FileListOperations fileList) throws IOException {
this.cache = cache;
this.fileKey = fileKey;
this.bufferSize = bufferSize;
+ this.fileOps = fileList;
this.buffer = new byte[this.bufferSize];
- this.file = fileMetadata;
+ this.file = new FileMetadata();
trace = log.isTraceEnabled();
if (trace) {
log.trace("Opened new IndexOutput for file:{0} in index: {1}", fileKey.getFileName(), fileKey.getIndexName());
@@ -75,7 +78,7 @@
if (readBuffer==null) {
return new byte[bufferSize];
}
- else if (readBuffer.length==bufferSize) {
+ else if (readBuffer.length == bufferSize) {
return readBuffer;
}
else {
@@ -145,9 +148,6 @@
// size changed, apply change to file header
file.touch();
resizeFileIfNeeded();
- if (file.getSize() < filePosition) {
- file.setSize(filePosition);
- }
byte[] bufferToFlush = buffer;
if (isWritingOnLastChunk()) {
int newBufferSize = (int) (file.getSize() % bufferSize);
@@ -157,16 +157,25 @@
}
}
boolean microbatch = false;
+ // we don't want to spawn a nested transaction, just to write in batch:
if ( ! transactionRunning) {
microbatch = cache.startBatch();
}
// add chunk to cache
- cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(key, bufferToFlush);
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(key, bufferToFlush);
// override existing file header with new size and last time access
- cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).put(fileKey, file);
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP, Flag.SKIP_LOCKING).put(fileKey, file);
+ registerToFileListIfNeeded();
if (microbatch) cache.endBatch(true);
}
+ private void registerToFileListIfNeeded() {
+ if (needsAddingToFileList) {
+ fileOps.addFileName(this.fileKey.getFileName());
+ needsAddingToFileList = false;
+ }
+ }
+
private void resizeFileIfNeeded() {
if (file.getSize() < filePosition) {
file.setSize(filePosition);
@@ -215,7 +224,6 @@
public long length() throws IOException {
return file.getSize();
}
-
private boolean isWritingOnLastChunk() {
int lastChunkNumber = (int) (file.getSize() / bufferSize);
Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLocker.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import org.infinispan.AdvancedCache;
+import org.infinispan.Cache;
+import org.infinispan.context.Flag;
+import org.infinispan.lucene.ChunkCacheKey;
+import org.infinispan.lucene.FileCacheKey;
+import org.infinispan.lucene.FileMetadata;
+import org.infinispan.lucene.FileReadLockKey;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.lucene.InfinispanIndexInput;
+
+/**
+ * <p>DistributedSegmentReadLocker stores reference counters in the cache
+ * to keep track of the number of clients still needing to be able
+ * to read a segment. It makes extensive usage of Infinispan's atomic
+ * operations.</p>
+ * <p>Locks stored this way are not optimally performing as it might spin
+ * on remote invocations, and might fail to cleanup some garbage
+ * in case a node is disconnected without having released the readlock.</p>
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class DistributedSegmentReadLocker implements SegmentReadLocker {
+
+ private final AdvancedCache cache;
+ private final String indexName;
+ private final int chunkSize;
+
+ public DistributedSegmentReadLocker(Cache cache, String indexName, int chunkSize) {
+ if (cache == null)
+ throw new IllegalArgumentException("Cache must not be null");
+ if (indexName == null)
+ throw new IllegalArgumentException("index name must not be null");
+ if (chunkSize <= 0)
+ throw new IllegalArgumentException("chunkSize must be a non-null positive integer");
+ this.chunkSize = chunkSize;
+ this.indexName = indexName;
+ this.cache = cache.getAdvancedCache();
+ }
+
+ /**
+ * Deletes or releases a read-lock for the specified filename, so that if it was marked as deleted and
+ * no other {@link InfinispanIndexInput} instances are reading from it, then it will
+ * be effectively deleted.
+ *
+ * For removal of readLockKey the SKIP_CACHE_STORE is not used to make sure even
+ * values eventually stored by a rehash are cleaned up.
+ *
+ * @see #aquireReadLock(String)
+ * @see InfinispanDirectory#deleteFile(String)
+ */
+ @Override
+ public void deleteOrReleaseReadLock(String filename) {
+ FileReadLockKey readLockKey = new FileReadLockKey(indexName, filename);
+ int newValue = 0;
+ boolean done = false;
+ Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+ while (done == false) {
+ if (lockValue == null) {
+ lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(0));
+ done = (null == lockValue);
+ }
+ else {
+ int refCount = (Integer) lockValue;
+ newValue = refCount - 1;
+ done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, refCount, newValue);
+ if (!done) {
+ lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+ }
+ }
+ }
+ if (newValue == 0) {
+ realFileDelete(readLockKey, cache, chunkSize);
+ }
+ }
+
+ /**
+ * Acquires a readlock on all chunks for this file, to make sure chunks are not deleted while
+ * iterating on the group. This is needed to avoid an eager lock on all elements.
+ *
+ * If no value is found in the cache, a disambiguation procedure is needed: not value
+ * might mean both "existing, no readlocks, no deletions in progress", but also "not existent file".
+ * The first possibility is coded as no value to avoid storing readlocks in a permanent store,
+ * which would unnecessarily slow down and provide unwanted long term storage of the lock;
+ * so the value is treated as one if not found, but obviously it's also not found for non-existent
+ * or concurrently deleted files.
+ *
+ * @param name the name of the "file" for which a readlock is requested
+ *
+ * @see #deleteOrReleaseReadLock(String)
+ */
+ public boolean aquireReadLock(String filename) {
+ FileReadLockKey readLockKey = new FileReadLockKey(indexName, filename);
+ Object lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+ boolean done = false;
+ while (done == false) {
+ if (lockValue != null) {
+ int refCount = (Integer) lockValue;
+ if (refCount == 0) {
+ // too late: in case refCount==0 the delete is being performed
+ return false;
+ }
+ Integer newValue = Integer.valueOf(refCount + 1);
+ done = cache.withFlags(Flag.SKIP_CACHE_STORE).replace(readLockKey, lockValue, newValue);
+ if ( ! done) {
+ lockValue = cache.withFlags(Flag.SKIP_LOCKING, Flag.SKIP_CACHE_STORE).get(readLockKey);
+ }
+ } else {
+ // readLocks are not stored, so if there's no value assume it's ==1, which means
+ // existing file, not deleted, nobody else owning a read lock. but check for ambiguity
+ lockValue = cache.withFlags(Flag.SKIP_CACHE_STORE).putIfAbsent(readLockKey, Integer.valueOf(2));
+ done = (null == lockValue);
+ if (done) {
+ // have to check now that the fileKey still exists to prevent the race condition of
+ // T1 fileKey exists - T2 delete file and remove readlock - T1 putIfAbsent(readlock, 2)
+ final FileCacheKey fileKey = new FileCacheKey(indexName, filename);
+ if (cache.get(fileKey) == null) {
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(readLockKey);
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * The {@link InfinispanDirectory#deleteFile(String)} is not deleting the elements from the cache
+ * but instead flagging the file as deletable.
+ * This method will really remove the elements from the cache; should be invoked only
+ * by {@link #releaseReadLock(FileReadLockKey, AdvancedCache)} after having verified that there
+ * are no users left in need to read these chunks.
+ *
+ * @param readLockKey the key representing the values to be deleted
+ * @param cache the cache containing the elements to be deleted
+ */
+ static void realFileDelete(FileReadLockKey readLockKey, AdvancedCache cache, int chunkSize) {
+ final String indexName = readLockKey.getIndexName();
+ final String filename = readLockKey.getFileName();
+ FileCacheKey key = new FileCacheKey(indexName, filename);
+ boolean batch = cache.startBatch();
+ FileMetadata file = (FileMetadata) cache.withFlags(Flag.SKIP_LOCKING).remove(key);
+ int numChunks = (int) (file.getSize() / chunkSize);
+ for (int i = 0; i <= numChunks; i++) {
+ ChunkCacheKey chunkKey = new ChunkCacheKey(indexName, filename, i);
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(chunkKey);
+ }
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(key);
+ //last operation, as being set as value==0 it prevents others from using it during the deletion process:
+ cache.withFlags(Flag.SKIP_REMOTE_LOOKUP).removeAsync(readLockKey);
+ if (batch) {
+ cache.endBatch(true);
+ }
+ }
+
+}
Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLocker.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,120 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.infinispan.Cache;
+import org.infinispan.lucene.InfinispanDirectory;
+
+/**
+ * LocalLockMergingSegmentReadLocker decorates the {@link DistributedSegmentReadLocker} to minimize
+ * remote operations in case several IndexReaders are opened on the same {@link InfinispanDirectory}.
+ * It keeps track of locks which where already acquired for a specific filename from another request on
+ * the same node and merges the request so that the different clients share the same remote lock.
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class LocalLockMergingSegmentReadLocker implements SegmentReadLocker {
+
+ private final ConcurrentHashMap<String, LocalReadLock> localLocks = new ConcurrentHashMap<String, LocalReadLock>();
+ private final DistributedSegmentReadLocker delegate;
+
+ /**
+ * Create a new ReadLockContainer.
+ *
+ * @param cache
+ * @param indexName
+ */
+ public LocalLockMergingSegmentReadLocker(Cache cache, String indexName, int chunkSize) {
+ this.delegate = new DistributedSegmentReadLocker(cache, indexName, chunkSize);
+ }
+
+ public boolean aquireReadLock(String name) {
+ LocalReadLock localReadLock = getLocalLockByName(name);
+ boolean aquired = localReadLock.aquire();
+ if (aquired) {
+ return true;
+ }
+ else {
+ // cleanup
+ localLocks.remove(name);
+ return false;
+ }
+ }
+
+ private LocalReadLock getLocalLockByName(String name) {
+ LocalReadLock localReadLock = localLocks.get(name);
+ if (localReadLock == null) {
+ LocalReadLock newReadLock = new LocalReadLock(name);
+ LocalReadLock prevReadLock = localLocks.putIfAbsent(name, newReadLock);
+ localReadLock = prevReadLock == null ? newReadLock : prevReadLock;
+ }
+ return localReadLock;
+ }
+
+ public void deleteOrReleaseReadLock(String name) {
+ getLocalLockByName(name).release();
+ }
+
+ private class LocalReadLock {
+ private final String name;
+ private int value = 0;
+
+ LocalReadLock(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return true if the lock was acquired, false if it's too late: the file
+ * was deleted and this LocalReadLock should be removed too.
+ */
+ synchronized boolean aquire() {
+ if (value == 0) {
+ boolean haveIt = delegate.aquireReadLock(name);
+ if (haveIt) {
+ value = 1;
+ return true;
+ } else {
+ value = -1;
+ return false;
+ }
+ } else if (value == -1) {
+ // it was deleted just a two lines ago
+ return false;
+ } else {
+ value++;
+ return true;
+ }
+ }
+
+ synchronized void release() {
+ value--;
+ if (value <= 0) {
+ localLocks.remove(name);
+ delegate.deleteOrReleaseReadLock(name);
+ }
+ }
+ }
+
+}
Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/NoopSegmentReadLocker.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,50 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+/**
+ * NoopSegmentReadLocker ignores requests to apply a readlock, but also ignores requests to delete files.
+ * It might be a good choice for read-only indexes, or cases in which leaving unused segments in the index is
+ * not considered a problem.
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public class NoopSegmentReadLocker implements SegmentReadLocker {
+
+ /**
+ * doesn't do anything and returns true
+ */
+ @Override
+ public boolean aquireReadLock(String filename) {
+ return true;
+ }
+
+ /**
+ * doesn't do anything
+ */
+ @Override
+ public void deleteOrReleaseReadLock(String filename) {
+ return;
+ }
+
+}
Added: branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java
===================================================================
--- branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/main/java/org/infinispan/lucene/readlocks/SegmentReadLocker.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,62 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import org.infinispan.lucene.InfinispanDirectory;
+
+/**
+ * <p>SegmentReadLocker implementations have to make sure that segments are not deleted while they are
+ * being used by an IndexReader.</p>
+ * <p>When an {@link org.infinispan.lucene.InfinispanIndexInput} is opened on a file which is split in smaller chunks,
+ * {@link #aquireReadLock(String)} is invoked; then the {@link #deleteOrReleaseReadLock(String)} is
+ * invoked when the stream is closed.</p>
+ * <p>The same {@link #deleteOrReleaseReadLock(String)} is invoked when a file is deleted, so if this invocation is not balancing
+ * a lock acquire this implementation must delete all segment chunks and the associated metadata.</p>
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+public interface SegmentReadLocker {
+
+ /**
+ * It will release a previously acquired readLock, or
+ * if no readLock was acquired it will mark the file to be deleted as soon
+ * as all pending locks are releases.
+ * If it's invoked on a file without pending locks the file is deleted.
+ *
+ * @param fileName of the file to release or delete
+ * @see InfinispanDirectory#deleteFile(String)
+ */
+ void deleteOrReleaseReadLock(String fileName);
+
+ /**
+ * Acquires a readlock, in order to prevent other invocations to {@link #deleteOrReleaseReadLock(String)}
+ * from deleting the file.
+ *
+ * @param filename
+ * @return true if the lock was acquired, false if the implementation
+ * detects the file does not exist, or that it's being deleted by some other thread.
+ * @see InfinispanDirectory#openInput(String)
+ */
+ boolean aquireReadLock(String filename);
+
+}
Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/DirectoryIntegrityCheck.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -51,6 +51,10 @@
* The name of the unique index stored in the cache
*/
public static void verifyDirectoryStructure(Cache cache, String indexName) {
+ verifyDirectoryStructure(cache, indexName, false);
+ }
+
+ public static void verifyDirectoryStructure(Cache cache, String indexName, boolean wasAStressTest) {
Set<String> fileList = (Set<String>) cache.get(new FileListCacheKey(indexName));
Assert.assertNotNull(fileList);
int fileListCacheKeyInstances = 0;
@@ -59,7 +63,10 @@
ChunkCacheKey existingChunkKey = (ChunkCacheKey) key;
String filename = existingChunkKey.getFileName();
Assert.assertEquals(existingChunkKey.getIndexName(), indexName);
- Assert.assertTrue(fileList.contains(filename));
+ // the chunk must either match an entry in fileList or have a pending readLock:
+// if (fileList.contains(filename) == false) {
+// verifyReadlockExists(cache, indexName, filename);
+// }
Object value = cache.get(existingChunkKey);
Assert.assertNotNull(value);
Assert.assertTrue(value instanceof byte[]);
@@ -68,7 +75,12 @@
} else if (key instanceof FileCacheKey) {
FileCacheKey fileCacheKey = (FileCacheKey) key;
Assert.assertEquals(fileCacheKey.getIndexName(), indexName);
- Assert.assertTrue(fileList.contains(fileCacheKey.getFileName()), fileCacheKey + " should not have existed");
+ String filename = fileCacheKey.getFileName();
+// if (fileList.contains(filename) == false) {
+// // if the file is not registered, assert that a readlock prevented it from being
+// // deleted:
+// verifyReadlockExists(cache, indexName, filename);
+// }
Object value = cache.get(fileCacheKey);
Assert.assertNotNull(value);
Assert.assertTrue(value instanceof FileMetadata);
@@ -76,24 +88,36 @@
long totalFileSize = metadata.getSize();
long actualFileSize = deepCountFileSize(fileCacheKey, cache);
Assert.assertEquals(actualFileSize, totalFileSize);
+ Assert.assertTrue(fileList.contains(fileCacheKey.getFileName()), fileCacheKey + " should not have existed");
} else if (key instanceof FileListCacheKey) {
fileListCacheKeyInstances++;
Assert.assertEquals(1, fileListCacheKeyInstances);
} else if (key instanceof FileReadLockKey) {
FileReadLockKey readLockKey = (FileReadLockKey) key;
Assert.assertEquals(readLockKey.getIndexName(), indexName);
- Assert.assertTrue(fileList.contains(readLockKey.getFileName()), readLockKey + " should not have existed");
Object value = cache.get(readLockKey);
- Assert.assertNotNull(value);
- Assert.assertTrue(value instanceof Integer);
- int readLockCount = (Integer) value;
- Assert.assertEquals(readLockCount, 1, " for FileReadLockKey " + readLockKey);
+ // we verify that a ReadLock exists only for existing files
+ Assert.assertTrue(cache.get(new FileCacheKey(indexName, readLockKey.getFileName())) != null, key + " left over from deleted "
+ + readLockKey.getFileName());
+ Assert.assertTrue(cache.get(new ChunkCacheKey(indexName, readLockKey.getFileName(), 0)) != null);
+ Assert.assertTrue(fileList.contains(readLockKey.getFileName()), "readlock still exists but the file was deleted: "
+ + readLockKey);
+ Assert.assertTrue(value == null || value.equals(1));
} else {
Assert.fail("an unexpected key was found in the cache having key type " + key.getClass() + " toString:" + key);
}
}
}
+ private static void verifyReadlockExists(Cache cache, String indexName, String filename) {
+ FileReadLockKey readLockKey = new FileReadLockKey(indexName, filename);
+ Object readLockValue = cache.get(readLockKey);
+ Assert.assertNotNull(readLockValue);
+ Assert.assertTrue(readLockValue instanceof Integer);
+ int v = (Integer) readLockValue;
+ Assert.assertTrue(v > 1, "readlock exists for unregistered file of unexpected value: " + v + " for file: " + filename);
+ }
+
/**
* For a given FileCacheKey return the total size of all chunks related to the file.
*
@@ -117,5 +141,44 @@
}
}
}
+
+ public static void assertFileNotExists(Cache cache, String indexName, String fileName) {
+ Set<String> fileList = (Set<String>) cache.get(new FileListCacheKey(indexName));
+ Assert.assertNotNull(fileList);
+ Assert.assertFalse(fileList.contains(fileName));
+ Assert.assertNull(cache.get(new FileCacheKey(indexName, fileName)), "metadata found for deleted file");
+ for (int i = 0; i < 10; i++) {
+ ChunkCacheKey key = new ChunkCacheKey(indexName, fileName, i);
+ Assert.assertNull(cache.get(key), "a chunk was found for a deleted file: " + key);
+ }
+ }
+ /**
+ * Verified the file exists and has a specified value for readLock;
+ * Consider that null should be interpreted as value 1;
+ */
+ public static void assertFileExistsHavingRLCount(Cache cache, String fileName, String indexName, int expectedReadcount, int chunkSize, boolean expectRegisteredInFat) {
+ Set<String> fileList = (Set<String>) cache.get(new FileListCacheKey(indexName));
+ Assert.assertNotNull(fileList);
+ Assert.assertTrue(fileList.contains(fileName) == expectRegisteredInFat);
+ FileMetadata metadata = (FileMetadata) cache.get(new FileCacheKey(indexName, fileName));
+ Assert.assertNotNull(metadata);
+ long totalFileSize = metadata.getSize();
+ int chunkNumbers = (int)(totalFileSize / chunkSize);
+ for (int i = 0; i < chunkNumbers; i++) {
+ Assert.assertNotNull(cache.get(new ChunkCacheKey(indexName, fileName, i)));
+ }
+ FileReadLockKey readLockKey = new FileReadLockKey(indexName,fileName);
+ Object value = cache.get(readLockKey);
+ if (expectedReadcount == 1) {
+ Assert.assertTrue(value == null || Integer.valueOf(1).equals(value), "readlock value is " + value);
+ }
+ else {
+ Assert.assertNotNull(value);
+ Assert.assertTrue(value instanceof Integer);
+ int v = (Integer)value;
+ Assert.assertEquals(v, expectedReadcount);
+ }
+ }
+
}
Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/CacheStoreStressTest.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -74,7 +74,7 @@
assert cache!=null;
InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
PerformanceCompareStressTest.stressTestDirectory(dir, "InfinispanClusteredWith-Store");
- DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+ DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName, true);
}
}
Modified: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java 2010-08-14 21:40:07 UTC (rev 2219)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/profiling/PerformanceCompareStressTest.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -97,7 +97,7 @@
// TestingUtil.setDelayForCache(cache, 0, 0);
InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
stressTestDirectory(dir, "InfinispanClustered-delayedIO:0");
- DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+ verifyDirectoryState();
}
@Test
@@ -105,7 +105,7 @@
TestingUtil.setDelayForCache(cache, 4, 4);
InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
stressTestDirectory(dir, "InfinispanClustered-delayedIO:4");
- DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+ verifyDirectoryState();
}
@Test
@@ -113,17 +113,17 @@
TestingUtil.setDelayForCache(cache, 40, 40);
InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
stressTestDirectory(dir, "InfinispanClustered-delayedIO:40");
- DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+ verifyDirectoryState();
}
@Test
public void profileInfinispanLocalDirectory() throws InterruptedException, IOException {
CacheContainer cacheContainer = CacheTestSupport.createLocalCacheManager();
try {
- Cache cache = cacheContainer.getCache();
+ cache = cacheContainer.getCache();
InfinispanDirectory dir = new InfinispanDirectory(cache, indexName);
stressTestDirectory(dir, "InfinispanLocal");
- DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName);
+ verifyDirectoryState();
} finally {
cacheContainer.stop();
}
@@ -166,5 +166,9 @@
TestingUtil.killCacheManagers(cacheFactory);
TestingUtil.recursiveFileRemove(indexName);
}
+
+ private void verifyDirectoryState() {
+ DirectoryIntegrityCheck.verifyDirectoryStructure(cache, indexName, true);
+ }
}
Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/DistributedSegmentReadLockerTest.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,123 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.infinispan.Cache;
+import org.infinispan.config.Configuration;
+import org.infinispan.lucene.CacheTestSupport;
+import org.infinispan.lucene.DirectoryIntegrityCheck;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.infinispan.test.MultipleCacheManagersTest;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * DistributedSegmentReadLockerTest represents a quick check on the functionality
+ * of {@link org.infinispan.lucene.readlocks.DistributedSegmentReadLocker}
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "lucene.readlocks.DistributedSegmentReadLockerTest")
+public class DistributedSegmentReadLockerTest extends MultipleCacheManagersTest {
+
+ /** The Index name */
+ protected static final String INDEX_NAME = "indexName";
+ /** The cache name */
+ protected static final String CACHE_NAME = "lucene";
+ /** Chunk Size **/
+ protected static final int CHUNK_SIZE = 6;
+ /** The name of the test file **/
+ protected static final String filename = "readme.txt";
+
+ protected Cache cache0;
+ protected Cache cache1;
+ protected Directory dirA;
+ protected Directory dirB;
+
+ @Override
+ protected void createCacheManagers() throws Throwable {
+ Configuration configuration = CacheTestSupport.createTestConfiguration();
+ createClusteredCaches(2, CACHE_NAME, configuration);
+ }
+
+ @BeforeMethod
+ protected void prepare() throws IOException {
+ cache0 = cache(0, CACHE_NAME);
+ cache1 = cache(1, CACHE_NAME);
+ dirA = createDirectory(cache0);
+ dirB = createDirectory(cache1);
+ CacheTestSupport.initializeDirectory(dirA);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testIndexWritingAndFinding() throws IOException {
+ verifyBoth(cache0,cache1);
+ IndexOutput indexOutput = dirA.createOutput(filename);
+ indexOutput.writeString("no need to write, nobody ever will read this");
+ indexOutput.flush();
+ indexOutput.close();
+ assertFileExistsHavingRLCount(filename, 1, true);
+ IndexInput openInput = dirB.openInput(filename);
+ assertFileExistsHavingRLCount(filename, 2, true);
+ dirA.deleteFile(filename);
+ assertFileExistsHavingRLCount(filename, 1, false);
+ //Lucene does use clone() - lock implementation ignores it as a clone is
+ //cast on locked segments and released before the close on the parent object
+ IndexInput clone = (IndexInput) openInput.clone();
+ assertFileExistsHavingRLCount(filename, 1, false);
+ clone.close();
+ assertFileExistsHavingRLCount(filename, 1, false);
+ openInput.close();
+ assertFileNotExists(filename);
+ dirA.close();
+ dirB.close();
+ verifyBoth(cache0, cache1);
+ }
+
+ void assertFileNotExists(String fileName) {
+ DirectoryIntegrityCheck.assertFileNotExists(cache0, INDEX_NAME, fileName);
+ DirectoryIntegrityCheck.assertFileNotExists(cache1, INDEX_NAME, fileName);
+ }
+
+ void assertFileExistsHavingRLCount(String fileName, int expectedReadcount, boolean expectRegisteredInFat) {
+ DirectoryIntegrityCheck.assertFileExistsHavingRLCount(cache0, fileName, INDEX_NAME, expectedReadcount, CHUNK_SIZE, expectRegisteredInFat);
+ DirectoryIntegrityCheck.assertFileExistsHavingRLCount(cache1, fileName, INDEX_NAME, expectedReadcount, CHUNK_SIZE, expectRegisteredInFat);
+ }
+
+ Directory createDirectory(Cache cache) {
+ return new InfinispanDirectory(cache, INDEX_NAME, CHUNK_SIZE,
+ new DistributedSegmentReadLocker(cache, INDEX_NAME, CHUNK_SIZE));
+ }
+
+ void verifyBoth(Cache cache0, Cache cache1) {
+ DirectoryIntegrityCheck.verifyDirectoryStructure(cache0, INDEX_NAME);
+ DirectoryIntegrityCheck.verifyDirectoryStructure(cache1, INDEX_NAME);
+ }
+
+}
Added: branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java
===================================================================
--- branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java (rev 0)
+++ branches/4.1.x/lucene-directory/src/test/java/org/infinispan/lucene/readlocks/LocalLockMergingSegmentReadLockerTest.java 2010-08-14 21:43:48 UTC (rev 2220)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.infinispan.lucene.readlocks;
+
+import java.io.IOException;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.infinispan.Cache;
+import org.infinispan.lucene.InfinispanDirectory;
+import org.testng.annotations.Test;
+
+/**
+ * LocalLockMergingSegmentReadLockerTest represents a quick check on the functionality
+ * of {@link org.infinispan.lucene.readlocks.LocalLockMergingSegmentReadLocker}
+ *
+ * @author Sanne Grinovero
+ * @since 4.1
+ */
+ at Test(groups = "functional", testName = "lucene.readlocks.LocalLockMergingSegmentReadLockerTest")
+public class LocalLockMergingSegmentReadLockerTest extends DistributedSegmentReadLockerTest {
+
+ @Test @Override
+ public void testIndexWritingAndFinding() throws IOException {
+ verifyBoth(cache0,cache1);
+ IndexOutput indexOutput = dirA.createOutput(filename);
+ indexOutput.writeString("no need to write, nobody ever will read this");
+ indexOutput.flush();
+ indexOutput.close();
+ assertFileExistsHavingRLCount(filename, 1, true);
+ IndexInput firstOpenOnB = dirB.openInput(filename);
+ assertFileExistsHavingRLCount(filename, 2, true);
+ dirA.deleteFile(filename);
+ assertFileExistsHavingRLCount(filename, 1, false);
+ //Lucene does use clone() - lock implementation ignores it as a clone is
+ //cast on locked segments and released before the close on the parent object
+ IndexInput cloneOfFirstOpenOnB = (IndexInput) firstOpenOnB.clone();
+ assertFileExistsHavingRLCount(filename, 1, false);
+ cloneOfFirstOpenOnB.close();
+ assertFileExistsHavingRLCount(filename, 1, false);
+ IndexInput firstOpenOnA = dirA.openInput(filename);
+ assertFileExistsHavingRLCount(filename, 2, false);
+ IndexInput secondOpenOnA = dirA.openInput(filename);
+ assertFileExistsHavingRLCount(filename, 2, false);
+ firstOpenOnA.close();
+ assertFileExistsHavingRLCount(filename, 2, false);
+ secondOpenOnA.close();
+ assertFileExistsHavingRLCount(filename, 1, false);
+ firstOpenOnB.close();
+ assertFileNotExists(filename);
+ dirA.close();
+ dirB.close();
+ verifyBoth(cache0, cache1);
+ }
+
+ @Override
+ Directory createDirectory(Cache cache) {
+ return new InfinispanDirectory(cache, INDEX_NAME, CHUNK_SIZE,
+ new LocalLockMergingSegmentReadLocker(cache, INDEX_NAME, CHUNK_SIZE));
+ }
+
+}
More information about the infinispan-commits
mailing list