Author: hardy.ferentschik
Date: 2008-11-05 15:14:41 -0500 (Wed, 05 Nov 2008)
New Revision: 15523
Modified:
search/trunk/src/java/org/hibernate/search/reader/SharingBufferReaderProvider.java
search/trunk/src/test/org/hibernate/search/test/reader/functionality/SharingBufferIndexProviderTest.java
search/trunk/src/test/org/hibernate/search/test/reader/functionality/TestableSharingBufferReaderProvider.java
Log:
HSEARCH-250
* Refactored SharingBufferReaderprovider to cache reader per directory not per direcory
provider since some providers have more than one directory (eg FSSlaveDirectoryProvider).
The refactoring ment that now not all directories/readers can be initalised in the
initialize() mehod. Some readers can only be initilized when the provider switched to a
new directory.
* Updated tests
Modified:
search/trunk/src/java/org/hibernate/search/reader/SharingBufferReaderProvider.java
===================================================================
---
search/trunk/src/java/org/hibernate/search/reader/SharingBufferReaderProvider.java 2008-11-05
20:06:43 UTC (rev 15522)
+++
search/trunk/src/java/org/hibernate/search/reader/SharingBufferReaderProvider.java 2008-11-05
20:14:41 UTC (rev 15523)
@@ -2,8 +2,6 @@
package org.hibernate.search.reader;
import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -14,6 +12,7 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.store.Directory;
import org.slf4j.Logger;
import org.hibernate.annotations.common.AssertionFailure;
@@ -23,33 +22,35 @@
import org.hibernate.search.util.LoggerFactory;
/**
- * This ReaderProvider shares IndexReaders as long as they are "current";
+ * This <code>ReaderProvider</code> shares IndexReaders as long as they are
"current";
* main difference with SharedReaderProvider is the way to update the Readers when
needed:
* this uses IndexReader.reopen() which should improve performance on larger indexes
* as it shares buffers with previous IndexReader generation for the segments which
didn't change.
- *
+ *
* @author Sanne Grinovero
*/
public class SharingBufferReaderProvider implements ReaderProvider {
- private static final Logger log = LoggerFactory.make();
+ private static final Logger log = LoggerFactory.make();
/**
- * contains all Readers (most current per DP and all unclosed old)
+ * contains all Readers (most current per DP and all unclosed old)
*/
//TODO ConcurrentHashMap's constructor could benefit from some hints as arguments.
- protected final Map<IndexReader,ReaderUsagePair> allReaders = new
ConcurrentHashMap<IndexReader,ReaderUsagePair>();
-
+ protected final Map<IndexReader, ReaderUsagePair> allReaders = new
ConcurrentHashMap<IndexReader, ReaderUsagePair>();
+
/**
* contains last updated Reader; protected by lockOnOpenLatest (in the values)
*/
- protected Map<DirectoryProvider,PerDirectoryLatestReader> currentReaders;
+ protected final Map<Directory, PerDirectoryLatestReader> currentReaders = new
ConcurrentHashMap<Directory, PerDirectoryLatestReader>();
public void closeReader(IndexReader multiReader) {
- if ( multiReader == null ) return;
+ if ( multiReader == null ) {
+ return;
+ }
IndexReader[] readers;
if ( multiReader instanceof MultiReader ) {
- readers = ReaderProviderHelper.getSubReadersFromMultiReader( (MultiReader) multiReader
);
+ readers = ReaderProviderHelper.getSubReadersFromMultiReader( ( MultiReader )
multiReader );
}
else {
throw new AssertionFailure( "Everything should be wrapped in a MultiReader"
);
@@ -57,33 +58,55 @@
log.debug( "Closing MultiReader: {}", multiReader );
for ( IndexReader reader : readers ) {
ReaderUsagePair container = allReaders.get( reader );
- container.close();//virtual
+ container.close(); //virtual
}
log.trace( "IndexReader closed." );
}
public void initialize(Properties props, SearchFactoryImplementor
searchFactoryImplementor) {
- Map<DirectoryProvider,PerDirectoryLatestReader> map = new
HashMap<DirectoryProvider,PerDirectoryLatestReader>();
Set<DirectoryProvider<?>> providers =
searchFactoryImplementor.getDirectoryProviders();
+
+ // create the readers for the known providers. Unfortunately, it is not possible to
+ // create all readers in initalize since some providers have more than one directory
(eg
+ // FSSlaveDirectoryProvider). See also HSEARCH-250.
for ( DirectoryProvider provider : providers ) {
- try {
- map.put( provider, new PerDirectoryLatestReader( provider ) );
- } catch (IOException e) {
- throw new SearchException( "Unable to open Lucene IndexReader", e );
- }
+ createReader( provider.getDirectory() );
}
- //FIXME I'm not convinced this non-final fields are safe without locks, but I may
be wrong.
- currentReaders = Collections.unmodifiableMap( map );
}
+ /**
+ * Thread safe creation of <code>PerDirectoryLatestReader</code>.
+ *
+ * @param directory The Lucene directory for which to create the reader.
+ * @return either the cached instance for the specified
<code>Directory</code> or a newly created one.
+ * @see <a
href="http://opensource.atlassian.com/projects/hibernate/browse/HSEA...
+ */
+ private synchronized PerDirectoryLatestReader createReader(Directory directory) {
+ PerDirectoryLatestReader reader = currentReaders.get( directory );
+ if ( reader != null ) {
+ return reader;
+ }
+
+ try {
+ reader = new PerDirectoryLatestReader( directory );
+ currentReaders.put( directory, reader );
+ return reader;
+ }
+ catch ( IOException e ) {
+ throw new SearchException( "Unable to open Lucene IndexReader", e );
+ }
+ }
+
public void destroy() {
IndexReader[] readers = allReaders.keySet().toArray( new IndexReader[allReaders.size()]
);
- for (IndexReader reader : readers) {
- ReaderUsagePair usage = allReaders.get( reader );
+ for ( IndexReader reader : readers ) {
+ ReaderUsagePair usage = allReaders.get( reader );
usage.close();
}
- if ( allReaders.size() != 0 ) log.warn( "ReaderProvider contains readers not
properly closed at destroy time" );
+ if ( allReaders.size() != 0 ) {
+ log.warn( "ReaderProvider contains readers not properly closed at destroy
time" );
+ }
}
public IndexReader openReader(DirectoryProvider... directoryProviders) {
@@ -95,7 +118,10 @@
if ( log.isTraceEnabled() ) {
log.trace( "Opening IndexReader from {}", directoryProvider.getDirectory()
);
}
- PerDirectoryLatestReader directoryLatestReader = currentReaders.get( directoryProvider
);
+ PerDirectoryLatestReader directoryLatestReader = currentReaders.get(
directoryProvider.getDirectory() );
+ if ( directoryLatestReader == null ) { // might eg happen for
FSSlaveDirectoryProvider
+ directoryLatestReader = createReader( directoryProvider.getDirectory() );
+ }
readers[index] = directoryLatestReader.refreshAndGet();
}
// don't use ReaderProviderHelper.buildMultiReader as we need our own cleanup.
@@ -106,7 +132,7 @@
try {
return new CacheableMultiReader( readers );
}
- catch (Exception e) {
+ catch ( Exception e ) {
//Lucene 2.2 used to throw IOExceptions here
for ( IndexReader ir : readers ) {
ReaderUsagePair readerUsagePair = allReaders.get( ir );
@@ -116,17 +142,17 @@
}
}
}
-
+
//overridable method for testability:
- protected IndexReader readerFactory(DirectoryProvider provider) throws IOException {
- return IndexReader.open( provider.getDirectory() );
+ protected IndexReader readerFactory(Directory directory) throws IOException {
+ return IndexReader.open( directory );
}
/**
* Container for the couple IndexReader,UsageCounter.
*/
protected final class ReaderUsagePair {
-
+
public final IndexReader reader;
/**
* When reaching 0 (always test on change) the reader should be really
@@ -136,60 +162,64 @@
* additionally when creating it will be used (+1)
*/
protected final AtomicInteger usageCounter = new AtomicInteger( 2 );
-
+
ReaderUsagePair(IndexReader r) {
reader = r;
}
-
+
/**
- * closes the IndexReader if no other resource is using it;
- * in this case the reference to this container will also be removed.
+ * Closes the <code>IndexReader</code> if no other resource is using it
+ * in which case the reference to this container will also be removed.
*/
public void close() {
int refCount = usageCounter.decrementAndGet();
- if ( refCount==0 ) {
+ if ( refCount == 0 ) {
//TODO I've been experimenting with the idea of an async-close: didn't appear
to have an interesting benefit,
//so discarded the code. should try with bigger indexes to see if the effect gets
more impressive.
ReaderUsagePair removed = allReaders.remove( reader );//remove ourself
try {
reader.close();
- } catch (IOException e) {
+ }
+ catch ( IOException e ) {
log.warn( "Unable to close Lucene IndexReader", e );
}
assert removed != null;
}
- else if ( refCount<0 ) {
+ else if ( refCount < 0 ) {
//doesn't happen with current code, could help spotting future bugs?
- throw new AssertionFailure( "Closing an IndexReader for which you didn't own
a lock-token, or somebody else which didn't own closed already." );
+ throw new AssertionFailure(
+ "Closing an IndexReader for which you didn't own a lock-token, or somebody
else which didn't own closed already."
+ );
}
}
-
- public String toString(){
+
+ public String toString() {
return "Reader:" + this.hashCode() + " ref.count=" +
usageCounter.get();
}
-
+
}
-
+
/**
* An instance for each DirectoryProvider,
* establishing the association between "current" ReaderUsagePair
* for a DirectoryProvider and it's lock.
*/
protected final class PerDirectoryLatestReader {
-
+
/**
* Reference to the most current IndexReader for a DirectoryProvider;
* guarded by lockOnReplaceCurrent;
*/
public ReaderUsagePair current; //guarded by lockOnReplaceCurrent
private final Lock lockOnReplaceCurrent = new ReentrantLock();
-
+
/**
- * @param provider The DirectoryProvider for which we manage the IndexReader.
+ * @param directory The <code>Directory</code> for which we manage the
<code>IndexReader</code>.
+ *
* @throws IOException when the index initialization fails.
*/
- public PerDirectoryLatestReader(DirectoryProvider provider) throws IOException {
- IndexReader reader = readerFactory( provider );
+ public PerDirectoryLatestReader(Directory directory) throws IOException {
+ IndexReader reader = readerFactory( directory );
ReaderUsagePair initialPair = new ReaderUsagePair( reader );
initialPair.usageCounter.set( 1 );//a token to mark as active (preventing real
close).
lockOnReplaceCurrent.lock();//no harm, just ensuring safe publishing.
@@ -201,6 +231,7 @@
/**
* Gets an updated IndexReader for the current DirectoryProvider;
* the index status will be checked.
+ *
* @return the current IndexReader if it's in sync with underlying index, a new one
otherwise.
*/
public IndexReader refreshAndGet() {
@@ -211,7 +242,8 @@
IndexReader beforeUpdateReader = current.reader;
try {
updatedReader = beforeUpdateReader.reopen();
- } catch (IOException e) {
+ }
+ catch ( IOException e ) {
throw new SearchException( "Unable to reopen IndexReader", e );
}
if ( beforeUpdateReader == updatedReader ) {
@@ -226,7 +258,8 @@
current = newPair;
allReaders.put( updatedReader, newPair );//unfortunately still needs lock
}
- } finally {
+ }
+ finally {
lockOnReplaceCurrent.unlock();
}
// doesn't need lock:
@@ -235,7 +268,5 @@
}
return updatedReader;
}
-
}
-
}
Modified:
search/trunk/src/test/org/hibernate/search/test/reader/functionality/SharingBufferIndexProviderTest.java
===================================================================
---
search/trunk/src/test/org/hibernate/search/test/reader/functionality/SharingBufferIndexProviderTest.java 2008-11-05
20:06:43 UTC (rev 15522)
+++
search/trunk/src/test/org/hibernate/search/test/reader/functionality/SharingBufferIndexProviderTest.java 2008-11-05
20:14:41 UTC (rev 15523)
@@ -62,10 +62,10 @@
}
}
- private DirectoryProvider[] getRandomEvailableDPs() {
+ private DirectoryProvider[] getRandomAvailableDPs() {
int arraySize = random.nextInt( readerProvider.manipulators.size() - 1 ) + 1;
DirectoryProvider[] array = new DirectoryProvider[arraySize];
- List<DirectoryProvider> availableDPs = new ArrayList<DirectoryProvider>(
readerProvider.manipulators.keySet() );
+ List<DirectoryProvider> availableDPs = new ArrayList<DirectoryProvider>(
readerProvider.directoryProviders );
for (int i=0; i<arraySize; i++){
int chosenDpIndex = random.nextInt( availableDPs.size() );
array[i] = availableDPs.get( chosenDpIndex );
@@ -82,7 +82,7 @@
//manage termination:
return;
}
- IndexReader fakeOpenReader = readerProvider.openReader( getRandomEvailableDPs() );
+ IndexReader fakeOpenReader = readerProvider.openReader( getRandomAvailableDPs() );
Thread.yield();
readerProvider.closeReader( fakeOpenReader );
countDoneSearches.incrementAndGet();
@@ -93,9 +93,9 @@
public void run() {
super.run();
Thread.yield();
- DirectoryProvider[] randomEvailableDPs = getRandomEvailableDPs();
+ DirectoryProvider[] randomEvailableDPs = getRandomAvailableDPs();
for ( DirectoryProvider dp : randomEvailableDPs ) {
- TestManipulatorPerDP testManipulatorPerDP = readerProvider.manipulators.get( dp );
+ TestManipulatorPerDP testManipulatorPerDP = readerProvider.manipulators.get(
dp.getDirectory() );
testManipulatorPerDP.setIndexChanged();
}
countDoneIndexmods.incrementAndGet();
Modified:
search/trunk/src/test/org/hibernate/search/test/reader/functionality/TestableSharingBufferReaderProvider.java
===================================================================
---
search/trunk/src/test/org/hibernate/search/test/reader/functionality/TestableSharingBufferReaderProvider.java 2008-11-05
20:06:43 UTC (rev 15522)
+++
search/trunk/src/test/org/hibernate/search/test/reader/functionality/TestableSharingBufferReaderProvider.java 2008-11-05
20:14:41 UTC (rev 15523)
@@ -3,12 +3,12 @@
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -22,6 +22,8 @@
import org.apache.lucene.index.TermFreqVector;
import org.apache.lucene.index.TermPositions;
import org.apache.lucene.index.TermVectorMapper;
+import org.apache.lucene.store.Directory;
+
import org.hibernate.search.SearchException;
import org.hibernate.search.engine.SearchFactoryImplementor;
import org.hibernate.search.reader.ReaderProviderHelper;
@@ -33,127 +35,133 @@
* @author Sanne Grinovero
*/
public class TestableSharingBufferReaderProvider extends SharingBufferReaderProvider {
-
+
private static final int NUM_DIRECTORY_PROVIDERS = 4;
private final Vector<MockIndexReader> createdReadersHistory = new
Vector<MockIndexReader>( 500 );
- final Map<DirectoryProvider,TestManipulatorPerDP> manipulators = new
ConcurrentHashMap<DirectoryProvider,TestManipulatorPerDP>();
+ final Map<Directory, TestManipulatorPerDP> manipulators = new
ConcurrentHashMap<Directory, TestManipulatorPerDP>();
+ final List<DirectoryProvider> directoryProviders =
Collections.synchronizedList(new ArrayList<DirectoryProvider>());
public TestableSharingBufferReaderProvider() {
- for (int i=0; i<NUM_DIRECTORY_PROVIDERS; i++) {
+ for ( int i = 0; i < NUM_DIRECTORY_PROVIDERS; i++ ) {
TestManipulatorPerDP tm = new TestManipulatorPerDP( i );
- manipulators.put( tm.dp, tm );
+ manipulators.put( tm.dp.getDirectory(), tm );
+ directoryProviders.add( tm.dp );
}
}
-
+
public static class TestManipulatorPerDP {
private final AtomicBoolean isIndexReaderCurrent = new AtomicBoolean( false );//starts
at true, see MockIndexReader contructor
private final AtomicBoolean isReaderCreated = new AtomicBoolean( false );
private final DirectoryProvider dp = new RAMDirectoryProvider();
-
- public TestManipulatorPerDP( int seed ) {
+
+ public TestManipulatorPerDP(int seed) {
dp.initialize( "dp" + seed, null, null );
dp.start();
}
-
+
public void setIndexChanged() {
isIndexReaderCurrent.set( false );
}
-
+
}
-
+
public boolean isReaderCurrent(MockIndexReader reader) {
//avoid usage of allReaders or test would be useless
- for (PerDirectoryLatestReader latest : super.currentReaders.values() ) {
+ for ( PerDirectoryLatestReader latest : currentReaders.values() ) {
IndexReader latestReader = latest.current.reader;
- if ( latestReader == reader) {
+ if ( latestReader == reader ) {
return true;
}
}
return false;
}
-
+
@Override
- protected IndexReader readerFactory(DirectoryProvider provider) {
- TestManipulatorPerDP manipulatorPerDP = manipulators.get( provider );
- if ( ! manipulatorPerDP.isReaderCreated.compareAndSet( false, true ) ) {
+ protected IndexReader readerFactory(Directory directory) {
+ TestManipulatorPerDP manipulatorPerDP = manipulators.get( directory );
+ if ( !manipulatorPerDP.isReaderCreated.compareAndSet( false, true ) ) {
throw new IllegalStateException( "IndexReader1 created twice" );
}
else {
return new MockIndexReader( manipulatorPerDP.isIndexReaderCurrent );
}
}
-
+
@Override
public void initialize(Properties props, SearchFactoryImplementor
searchFactoryImplementor) {
- Map<DirectoryProvider,PerDirectoryLatestReader> map = new
HashMap<DirectoryProvider,PerDirectoryLatestReader>();
try {
- for ( DirectoryProvider dp : manipulators.keySet() ) {
- map.put( dp, new PerDirectoryLatestReader( dp ) );
+ for ( Directory directory : manipulators.keySet() ) {
+ currentReaders.put( directory, new PerDirectoryLatestReader( directory ) );
}
- } catch (IOException e) {
+ }
+ catch ( IOException e ) {
throw new SearchException( "Unable to open Lucene IndexReader", e );
}
- currentReaders = Collections.unmodifiableMap( map );
}
-
+
public boolean areAllOldReferencesGone() {
- int numReferencesReaders = super.allReaders.size();
+ int numReferencesReaders = allReaders.size();
int numExpectedActiveReaders = manipulators.size();
return numReferencesReaders == numExpectedActiveReaders;
}
-
- public List<MockIndexReader> getCreatedIndexReaders(){
+
+ public List<MockIndexReader> getCreatedIndexReaders() {
return createdReadersHistory;
}
-
+
public MockIndexReader getCurrentMockReaderPerDP(DirectoryProvider dp) {
- IndexReader[] indexReaders = ReaderProviderHelper.getSubReadersFromMultiReader(
(MultiReader) super.openReader( new DirectoryProvider[]{ dp } ) );
- if ( indexReaders.length != 1 ){
+ IndexReader[] indexReaders = ReaderProviderHelper.getSubReadersFromMultiReader(
+ ( MultiReader ) super.openReader(
+ new DirectoryProvider[] { dp }
+ )
+ );
+ if ( indexReaders.length != 1 ) {
throw new IllegalStateException( "Expecting one reader" );
}
- return (MockIndexReader) indexReaders[0];
+ return ( MockIndexReader ) indexReaders[0];
}
-
+
public class MockIndexReader extends IndexReader {
-
+
private final AtomicBoolean closed = new AtomicBoolean( false );
private final AtomicBoolean hasAlreadyBeenReOpened = new AtomicBoolean( false );
private final AtomicBoolean isIndexReaderCurrent;
-
+
MockIndexReader(AtomicBoolean isIndexReaderCurrent) {
this.isIndexReaderCurrent = isIndexReaderCurrent;
- if ( ! isIndexReaderCurrent.compareAndSet(false, true) ) {
+ if ( !isIndexReaderCurrent.compareAndSet( false, true ) ) {
throw new IllegalStateException( "Unnecessarily reopened" );
}
createdReadersHistory.add( this );
}
-
+
public final boolean isClosed() {
return closed.get();
}
-
+
@Override
protected void doClose() throws IOException {
boolean okToClose = closed.compareAndSet( false, true );
- if ( ! okToClose ) {
+ if ( !okToClose ) {
throw new IllegalStateException( "Attempt to close a closed IndexReader"
);
}
- if ( ! hasAlreadyBeenReOpened.get() ) {
+ if ( !hasAlreadyBeenReOpened.get() ) {
throw new IllegalStateException( "Attempt to close the most current
IndexReader" );
}
}
-
+
@Override
- public synchronized IndexReader reopen(){
+ public synchronized IndexReader reopen() {
if ( isIndexReaderCurrent.get() ) {
return this;
}
else {
- if ( hasAlreadyBeenReOpened.compareAndSet( false, true) ) {
+ if ( hasAlreadyBeenReOpened.compareAndSet( false, true ) ) {
return new MockIndexReader( isIndexReaderCurrent );
}
- else
+ else {
throw new IllegalStateException( "Attempt to reopen an old IndexReader more
than once" );
+ }
}
}
@@ -169,7 +177,7 @@
@Override
protected void doSetNorm(int doc, String field, byte value) {
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException();
}
@Override
@@ -261,7 +269,7 @@
public TermEnum terms(Term t) throws IOException {
throw new UnsupportedOperationException();
}
-
+
}
}