[hibernate-commits] Hibernate SVN: r14933 - search/trunk/src/java/org/hibernate/search/store.
hibernate-commits at lists.jboss.org
hibernate-commits at lists.jboss.org
Tue Jul 15 17:42:16 EDT 2008
Author: sannegrinovero
Date: 2008-07-15 17:42:16 -0400 (Tue, 15 Jul 2008)
New Revision: 14933
Modified:
search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java
search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java
Log:
HSEARCH-189 : visibility issues in concurrent code for Master/Slave DirectoryProviders
Modified: search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java 2008-07-15 21:27:03 UTC (rev 14932)
+++ search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java 2008-07-15 21:42:16 UTC (rev 14933)
@@ -6,6 +6,7 @@
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.io.File;
import java.io.IOException;
@@ -32,15 +33,17 @@
public class FSMasterDirectoryProvider implements DirectoryProvider<FSDirectory> {
private final Logger log = LoggerFactory.getLogger( FSMasterDirectoryProvider.class );
+ private final Timer timer = new Timer( true ); //daemon thread, the copy algorithm is robust
+ private volatile int current;
+
+ //variables having visibility granted by a read of "current"
private FSDirectory directory;
- private int current;
private String indexName;
- private Timer timer;
private SearchFactoryImplementor searchFactory;
private long copyChunkSize;
- //variables needed between initialize and start
+ //variables needed between initialize and start (used by same thread: no special care needed)
private File sourceDir;
private File indexDir;
private String directoryProviderName;
@@ -63,40 +66,44 @@
}
copyChunkSize = DirectoryProviderHelper.getCopyBufferSize( directoryProviderName, properties );
this.searchFactory = searchFactoryImplementor;
+ current = 0; //write to volatile to publish all state
}
public void start() {
- long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
+ int currentLocal = 0;
try {
//copy to source
if ( new File( sourceDir, "current1").exists() ) {
- current = 2;
+ currentLocal = 2;
}
else if ( new File( sourceDir, "current2").exists() ) {
- current = 1;
+ currentLocal = 1;
}
else {
log.debug( "Source directory for '{}' will be initialized", indexName);
- current = 1;
+ currentLocal = 1;
}
- String currentString = Integer.valueOf( current ).toString();
+ String currentString = Integer.valueOf( currentLocal ).toString();
File subDir = new File( sourceDir, currentString );
FileHelper.synchronize( indexDir, subDir, true, copyChunkSize );
new File( sourceDir, "current1 ").delete();
new File( sourceDir, "current2" ).delete();
//TODO small hole, no file can be found here
new File( sourceDir, "current" + currentString ).createNewFile();
- log.debug( "Current directory: {}", current );
+ log.debug( "Current directory: {}", currentLocal );
}
catch (IOException e) {
throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
}
- timer = new Timer( true ); //daemon thread, the copy algorithm is robust
TimerTask task = new FSMasterDirectoryProvider.TriggerTask( indexDir, sourceDir, this );
+ long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
timer.scheduleAtFixedRate( task, period, period );
+ this.current = currentLocal; //write to volatile to publish all state
}
public FSDirectory getDirectory() {
+ @SuppressWarnings("unused")
+ int readCurrentState = current; //Unneeded value, needed to ensure visibility of state protected by memory barrier
return directory;
}
@@ -107,7 +114,12 @@
// after initialize call
if ( obj == this ) return true;
if ( obj == null || !( obj instanceof FSMasterDirectoryProvider ) ) return false;
- return indexName.equals( ( (FSMasterDirectoryProvider) obj ).indexName );
+ FSMasterDirectoryProvider other = (FSMasterDirectoryProvider)obj;
+ //break both memory barriers by reading volatile variables:
+ @SuppressWarnings("unused")
+ int readCurrentState = other.current;
+ readCurrentState = this.current;
+ return indexName.equals( other.indexName );
}
@Override
@@ -115,13 +127,15 @@
// this code is actually broken since the value change after initialize call
// but from a practical POV this is fine since we only call this method
// after initialize call
+ @SuppressWarnings("unused")
+ int readCurrentState = current; //Unneeded value, to ensure visibility of state protected by memory barrier
int hash = 11;
return 37 * hash + indexName.hashCode();
}
-
-
public void stop() {
+ @SuppressWarnings("unused")
+ int readCurrentState = current; //Another unneeded value, to ensure visibility of state protected by memory barrier
timer.cancel();
try {
directory.close();
@@ -131,18 +145,18 @@
}
}
- class TriggerTask extends TimerTask {
+ private class TriggerTask extends TimerTask {
private final Executor executor;
private final FSMasterDirectoryProvider.CopyDirectory copyTask;
- public TriggerTask(File source, File destination, DirectoryProvider directoryProvider) {
+ public TriggerTask(File source, File destination, DirectoryProvider<FSDirectory> directoryProvider) {
executor = Executors.newSingleThreadExecutor();
copyTask = new FSMasterDirectoryProvider.CopyDirectory( source, destination, directoryProvider );
}
public void run() {
- if ( ! copyTask.inProgress ) {
+ if ( copyTask.inProgress.compareAndSet( false, true ) ) {
executor.execute( copyTask );
}
else {
@@ -151,33 +165,25 @@
}
}
- class CopyDirectory implements Runnable {
+ private class CopyDirectory implements Runnable {
private final File source;
private final File destination;
- private volatile boolean inProgress;
- private Lock directoryProviderLock;
- private DirectoryProvider directoryProvider;
+ private final AtomicBoolean inProgress = new AtomicBoolean( false );
+ private final Lock directoryProviderLock;
- public CopyDirectory(File source, File destination, DirectoryProvider directoryProvider) {
+ public CopyDirectory(File source, File destination, DirectoryProvider<FSDirectory> directoryProvider) {
this.source = source;
this.destination = destination;
- this.directoryProvider = directoryProvider;
+ this.directoryProviderLock = searchFactory.getDirectoryProviderLock( directoryProvider );
}
public void run() {
//TODO get rid of current and use the marker file instead?
- long start = System.currentTimeMillis();
- inProgress = true;
- if ( directoryProviderLock == null ) {
- directoryProviderLock = searchFactory.getDirectoryProviderLock( directoryProvider );
- directoryProvider = null;
- searchFactory = null; //get rid of any useless link (help hot redeployment?)
- }
+ directoryProviderLock.lock();
try {
- directoryProviderLock.lock();
+ long start = System.currentTimeMillis();//keep time after lock is acquired for correct measure
int oldIndex = current;
- int index = current == 1 ? 2 : 1;
-
+ int index = oldIndex == 1 ? 2 : 1;
File destinationFile = new File( destination, Integer.valueOf(index).toString() );
try {
log.trace( "Copying {} into {}", source, destinationFile );
@@ -187,7 +193,6 @@
catch (IOException e) {
//don't change current
log.error( "Unable to synchronize source of " + indexName, e );
- inProgress = false;
return;
}
if ( ! new File( destination, "current" + oldIndex ).delete() ) {
@@ -199,12 +204,12 @@
catch( IOException e ) {
log.warn( "Unable to create current marker in source of " + indexName, e );
}
+ log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
}
finally {
directoryProviderLock.unlock();
- inProgress = false;
+ inProgress.set( false );
}
- log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
}
}
}
Modified: search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java 2008-07-15 21:27:03 UTC (rev 14932)
+++ search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java 2008-07-15 21:42:16 UTC (rev 14933)
@@ -6,6 +6,7 @@
import java.util.TimerTask;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.io.File;
import java.io.IOException;
@@ -32,15 +33,17 @@
public class FSSlaveDirectoryProvider implements DirectoryProvider<FSDirectory> {
private final Logger log = LoggerFactory.getLogger( FSSlaveDirectoryProvider.class );
+ private final Timer timer = new Timer( true ); //daemon thread, the copy algorithm is robust
+ private volatile int current; //used also as memory barrier of all other values, which are set once.
+
+ //variables having visibility granted by a read of "current"
private FSDirectory directory1;
private FSDirectory directory2;
- private int current;
private String indexName;
- private Timer timer;
private long copyChunkSize;
- //variables needed between initialize and start
+ //variables needed between initialize and start (used by same thread: no special care needed)
private File sourceIndexDir;
private File indexDir;
private String directoryProviderName;
@@ -64,29 +67,31 @@
throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
}
copyChunkSize = DirectoryProviderHelper.getCopyBufferSize( directoryProviderName, properties );
+ current = 0; //publish all state to other threads
}
public void start() {
- long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
+ int readCurrentState = current; //Unneeded value, but ensure visibility of state protected by memory barrier
+ int currentToBe = 0;
try {
- directory1 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "1") );
- directory2 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "2") );
+ directory1 = DirectoryProviderHelper.createFSIndex( new File( indexDir, "1" ) );
+ directory2 = DirectoryProviderHelper.createFSIndex( new File( indexDir, "2" ) );
File currentMarker = new File( indexDir, "current1" );
File current2Marker = new File( indexDir, "current2" );
if ( currentMarker.exists() ) {
- current = 1;
+ currentToBe = 1;
if ( current2Marker.exists() ) {
current2Marker.delete(); //TODO or throw an exception?
}
}
else if ( current2Marker.exists() ) {
- current = 2;
+ currentToBe = 2;
}
else {
//no default
log.debug( "Setting directory 1 as current");
- current = 1;
- File destinationFile = new File( indexDir, Integer.valueOf( current ).toString() );
+ currentToBe = 1;
+ File destinationFile = new File( indexDir, Integer.valueOf( readCurrentState ).toString() );
int sourceCurrent;
if ( new File( sourceIndexDir, "current1").exists() ) {
sourceCurrent = 1;
@@ -98,7 +103,7 @@
throw new AssertionFailure( "No current file marker found in source directory: " + sourceIndexDir.getPath() );
}
try {
- FileHelper.synchronize( new File( sourceIndexDir, String.valueOf(sourceCurrent) ),
+ FileHelper.synchronize( new File( sourceIndexDir, String.valueOf( sourceCurrent ) ),
destinationFile, true, copyChunkSize );
}
catch (IOException e) {
@@ -108,26 +113,27 @@
throw new SearchException( "Unable to create the directory marker file: " + indexName );
}
}
- log.debug( "Current directory: {}", current);
+ log.debug( "Current directory: {}", currentToBe);
}
catch (IOException e) {
throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
}
- timer = new Timer(true); //daemon thread, the copy algorithm is robust
TimerTask task = new TriggerTask( sourceIndexDir, indexDir );
+ long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
timer.scheduleAtFixedRate( task, period, period );
+ this.current = currentToBe;
}
- //FIXME this is Thread-Unsafe! A memory barrier is missing.
public FSDirectory getDirectory() {
- if (current == 1) {
+ int readState = current;// to have the read consistent in the next two "if"s.
+ if (readState == 1) {
return directory1;
}
- else if (current == 2) {
+ else if (readState == 2) {
return directory2;
}
else {
- throw new AssertionFailure( "Illegal current directory: " + current );
+ throw new AssertionFailure( "Illegal current directory: " + readState );
}
}
@@ -138,7 +144,12 @@
// after initialize call
if ( obj == this ) return true;
if ( obj == null || !( obj instanceof FSSlaveDirectoryProvider ) ) return false;
- return indexName.equals( ( (FSSlaveDirectoryProvider) obj ).indexName );
+ FSSlaveDirectoryProvider other = (FSSlaveDirectoryProvider)obj;
+ //need to break memory barriers on both instances:
+ @SuppressWarnings("unused")
+ int readCurrentState = this.current; //unneded value, but ensure visibility of indexName
+ readCurrentState = other.current; //another unneded value, but ensure visibility of indexName
+ return indexName.equals( other.indexName );
}
@Override
@@ -146,6 +157,8 @@
// this code is actually broken since the value change after initialize call
// but from a practical POV this is fine since we only call this method
// after initialize call
+ @SuppressWarnings("unused")
+ int readCurrentState = current; //unneded value, but ensure visibility of indexName
int hash = 11;
return 37 * hash + indexName.hashCode();
}
@@ -161,11 +174,15 @@
}
public void run() {
- if ( ! copyTask.inProgress ) {
+ if ( copyTask.inProgress.compareAndSet( false, true ) ) {
executor.execute( copyTask );
}
else {
- log.trace( "Skipping directory synchronization, previous work still in progress: {}", indexName);
+ if (log.isTraceEnabled()) {
+ @SuppressWarnings("unused")
+ int unneeded = current;//ensure visibility of indexName in Timer threads.
+ log.trace( "Skipping directory synchronization, previous work still in progress: {}", indexName);
+ }
}
}
}
@@ -173,7 +190,7 @@
class CopyDirectory implements Runnable {
private final File source;
private final File destination;
- private volatile boolean inProgress;
+ private final AtomicBoolean inProgress = new AtomicBoolean( false );
public CopyDirectory(File sourceIndexDir, File destination) {
this.source = sourceIndexDir;
@@ -183,9 +200,8 @@
public void run() {
long start = System.currentTimeMillis();
try {
- inProgress = true;
int oldIndex = current;
- int index = current == 1 ? 2 : 1;
+ int index = oldIndex == 1 ? 2 : 1;
File sourceFile;
if ( new File( source, "current1" ).exists() ) {
sourceFile = new File(source, "1");
@@ -195,10 +211,8 @@
}
else {
log.error( "Unable to determine current in source directory" );
- inProgress = false;
return;
}
-
File destinationFile = new File( destination, Integer.valueOf( index ).toString() );
try {
log.trace( "Copying {} into {}", sourceFile, destinationFile );
@@ -208,7 +222,6 @@
catch (IOException e) {
//don't change current
log.error( "Unable to synchronize " + indexName, e);
- inProgress = false;
return;
}
if ( ! new File( indexName, "current" + oldIndex ).delete() ) {
@@ -222,13 +235,15 @@
}
}
finally {
- inProgress = false;
+ inProgress.set( false );
}
log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
}
}
public void stop() {
+ @SuppressWarnings("unused")
+ int readCurrentState = current; //unneded value, but ensure visibility of state protected by memory barrier
timer.cancel();
try {
directory1.close();
More information about the hibernate-commits
mailing list