[hibernate-commits] Hibernate SVN: r14933 - search/trunk/src/java/org/hibernate/search/store.

hibernate-commits at lists.jboss.org hibernate-commits at lists.jboss.org
Tue Jul 15 17:42:16 EDT 2008


Author: sannegrinovero
Date: 2008-07-15 17:42:16 -0400 (Tue, 15 Jul 2008)
New Revision: 14933

Modified:
   search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java
   search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java
Log:
HSEARCH-189 : visibility issues in concurrent code for Master/Slave DirectoryProviders

Modified: search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java	2008-07-15 21:27:03 UTC (rev 14932)
+++ search/trunk/src/java/org/hibernate/search/store/FSMasterDirectoryProvider.java	2008-07-15 21:42:16 UTC (rev 14933)
@@ -6,6 +6,7 @@
 import java.util.TimerTask;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.io.File;
 import java.io.IOException;
@@ -32,15 +33,17 @@
 public class FSMasterDirectoryProvider implements DirectoryProvider<FSDirectory> {
 	
 	private final Logger log = LoggerFactory.getLogger( FSMasterDirectoryProvider.class );
+	private final Timer timer = new Timer( true ); //daemon thread, the copy algorithm is robust
 	
+	private volatile int current;
+	
+	//variables having visibility granted by a read of "current"
 	private FSDirectory directory;
-	private int current;
 	private String indexName;
-	private Timer timer;
 	private SearchFactoryImplementor searchFactory;
 	private long copyChunkSize;
 
-	//variables needed between initialize and start
+	//variables needed between initialize and start (used by same thread: no special care needed)
 	private File sourceDir;
 	private File indexDir;
 	private String directoryProviderName;
@@ -63,40 +66,44 @@
 		}
 		copyChunkSize = DirectoryProviderHelper.getCopyBufferSize( directoryProviderName, properties );
 		this.searchFactory = searchFactoryImplementor;
+		current = 0; //write to volatile to publish all state
 	}
 
 	public void start() {
-		long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
+		int currentLocal = 0;
 		try {
 			//copy to source
 			if ( new File( sourceDir, "current1").exists() ) {
-				current = 2;
+				currentLocal = 2;
 			}
 			else if ( new File( sourceDir, "current2").exists() ) {
-				current = 1;
+				currentLocal = 1;
 			}
 			else {
 				log.debug( "Source directory for '{}' will be initialized", indexName);
-				current = 1;
+				currentLocal = 1;
 			}
-			String currentString = Integer.valueOf( current ).toString();
+			String currentString = Integer.valueOf( currentLocal ).toString();
 			File subDir = new File( sourceDir, currentString );
 			FileHelper.synchronize( indexDir, subDir, true, copyChunkSize );
 			new File( sourceDir, "current1 ").delete();
 			new File( sourceDir, "current2" ).delete();
 			//TODO small hole, no file can be found here
 			new File( sourceDir, "current" + currentString ).createNewFile();
-			log.debug( "Current directory: {}", current );
+			log.debug( "Current directory: {}", currentLocal );
 		}
 		catch (IOException e) {
 			throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
 		}
-		timer = new Timer( true ); //daemon thread, the copy algorithm is robust
 		TimerTask task = new FSMasterDirectoryProvider.TriggerTask( indexDir, sourceDir, this );
+		long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
 		timer.scheduleAtFixedRate( task, period, period );
+		this.current = currentLocal; //write to volatile to publish all state
 	}
 
 	public FSDirectory getDirectory() {
+		@SuppressWarnings("unused")
+		int readCurrentState = current; //Unneeded value, needed to ensure visibility of state protected by memory barrier
 		return directory;
 	}
 
@@ -107,7 +114,12 @@
 		// after initialize call
 		if ( obj == this ) return true;
 		if ( obj == null || !( obj instanceof FSMasterDirectoryProvider ) ) return false;
-		return indexName.equals( ( (FSMasterDirectoryProvider) obj ).indexName );
+		FSMasterDirectoryProvider other = (FSMasterDirectoryProvider)obj;
+		//break both memory barriers by reading volatile variables:
+		@SuppressWarnings("unused")
+		int readCurrentState = other.current;
+		readCurrentState = this.current;
+		return indexName.equals( other.indexName );
 	}
 
 	@Override
@@ -115,13 +127,15 @@
 		// this code is actually broken since the value change after initialize call
 		// but from a practical POV this is fine since we only call this method
 		// after initialize call
+		@SuppressWarnings("unused")
+		int readCurrentState = current; //Unneeded value, to ensure visibility of state protected by memory barrier
 		int hash = 11;
 		return 37 * hash + indexName.hashCode();
 	}
 
-
-
 	public void stop() {
+		@SuppressWarnings("unused")
+		int readCurrentState = current; //Another unneeded value, to ensure visibility of state protected by memory barrier
 		timer.cancel();
 		try {
 			directory.close();
@@ -131,18 +145,18 @@
 		}
 	}
 
-	class TriggerTask extends TimerTask {
+	private class TriggerTask extends TimerTask {
 
 		private final Executor executor;
 		private final FSMasterDirectoryProvider.CopyDirectory copyTask;
 
-		public TriggerTask(File source, File destination, DirectoryProvider directoryProvider) {
+		public TriggerTask(File source, File destination, DirectoryProvider<FSDirectory> directoryProvider) {
 			executor = Executors.newSingleThreadExecutor();
 			copyTask = new FSMasterDirectoryProvider.CopyDirectory( source, destination, directoryProvider );
 		}
 
 		public void run() {
-			if ( ! copyTask.inProgress ) {
+			if ( copyTask.inProgress.compareAndSet( false, true ) ) {
 				executor.execute( copyTask );
 			}
 			else {
@@ -151,33 +165,25 @@
 		}
 	}
 
-	class CopyDirectory implements Runnable {
+	private class CopyDirectory implements Runnable {
 		private final File source;
 		private final File destination;
-		private volatile boolean inProgress;
-		private Lock directoryProviderLock;
-		private DirectoryProvider directoryProvider;
+		private final AtomicBoolean inProgress = new AtomicBoolean( false );
+		private final Lock directoryProviderLock;
 
-		public CopyDirectory(File source, File destination, DirectoryProvider directoryProvider) {
+		public CopyDirectory(File source, File destination, DirectoryProvider<FSDirectory> directoryProvider) {
 			this.source = source;
 			this.destination = destination;
-			this.directoryProvider = directoryProvider;
+			this.directoryProviderLock = searchFactory.getDirectoryProviderLock( directoryProvider );
 		}
 
 		public void run() {
 			//TODO get rid of current and use the marker file instead?
-			long start = System.currentTimeMillis();
-			inProgress = true;
-			if ( directoryProviderLock == null ) {
-				directoryProviderLock = searchFactory.getDirectoryProviderLock( directoryProvider );
-				directoryProvider = null;
-				searchFactory = null; //get rid of any useless link (help hot redeployment?)
-			}
+			directoryProviderLock.lock();
 			try {
-				directoryProviderLock.lock();
+				long start = System.currentTimeMillis();//keep time after lock is acquired for correct measure
 				int oldIndex = current;
-				int index = current == 1 ? 2 : 1;
-
+				int index = oldIndex == 1 ? 2 : 1;
 				File destinationFile = new File( destination, Integer.valueOf(index).toString() );
 				try {
 					log.trace( "Copying {} into {}", source, destinationFile );
@@ -187,7 +193,6 @@
 				catch (IOException e) {
 					//don't change current
 					log.error( "Unable to synchronize source of " + indexName, e );
-					inProgress = false;
 					return;
 				}
 				if ( ! new File( destination, "current" + oldIndex ).delete() ) {
@@ -199,12 +204,12 @@
 				catch( IOException e ) {
 					log.warn( "Unable to create current marker in source of " + indexName, e );
 				}
+				log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
 			}
 			finally {
 				directoryProviderLock.unlock();
-				inProgress = false;
+				inProgress.set( false );
 			}
-			log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
 		}
 	}
 }

Modified: search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java
===================================================================
--- search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java	2008-07-15 21:27:03 UTC (rev 14932)
+++ search/trunk/src/java/org/hibernate/search/store/FSSlaveDirectoryProvider.java	2008-07-15 21:42:16 UTC (rev 14933)
@@ -6,6 +6,7 @@
 import java.util.TimerTask;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.io.File;
 import java.io.IOException;
 
@@ -32,15 +33,17 @@
 public class FSSlaveDirectoryProvider implements DirectoryProvider<FSDirectory> {
 	
 	private final Logger log = LoggerFactory.getLogger( FSSlaveDirectoryProvider.class );
+	private final Timer timer = new Timer( true ); //daemon thread, the copy algorithm is robust
 	
+	private volatile int current; //used also as memory barrier of all other values, which are set once.
+	
+	//variables having visibility granted by a read of "current"
 	private FSDirectory directory1;
 	private FSDirectory directory2;
-	private int current;
 	private String indexName;
-	private Timer timer;
 	private long copyChunkSize;
 	
-	//variables needed between initialize and start
+	//variables needed between initialize and start (used by same thread: no special care needed)
 	private File sourceIndexDir;
 	private File indexDir;
 	private String directoryProviderName;
@@ -64,29 +67,31 @@
 			throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
 		}
 		copyChunkSize = DirectoryProviderHelper.getCopyBufferSize( directoryProviderName, properties );
+		current = 0; //publish all state to other threads
 	}
 
 	public void start() {
-		long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
+		int readCurrentState = current; //Unneeded value, but ensure visibility of state protected by memory barrier
+		int currentToBe = 0;
 		try {
-			directory1 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "1") );
-			directory2 = DirectoryProviderHelper.createFSIndex( new File(indexDir, "2") );
+			directory1 = DirectoryProviderHelper.createFSIndex( new File( indexDir, "1" ) );
+			directory2 = DirectoryProviderHelper.createFSIndex( new File( indexDir, "2" ) );
 			File currentMarker = new File( indexDir, "current1" );
 			File current2Marker = new File( indexDir, "current2" );
 			if ( currentMarker.exists() ) {
-				current = 1;
+				currentToBe = 1;
 				if ( current2Marker.exists() ) {
 					current2Marker.delete(); //TODO or throw an exception?
 				}
 			}
 			else if ( current2Marker.exists() ) {
-				current = 2;
+				currentToBe = 2;
 			}
 			else {
 				//no default
 				log.debug( "Setting directory 1 as current");
-				current = 1;
-				File destinationFile = new File( indexDir, Integer.valueOf( current ).toString() );
+				currentToBe = 1;
+				File destinationFile = new File( indexDir, Integer.valueOf( readCurrentState ).toString() );
 				int sourceCurrent;
 				if ( new File( sourceIndexDir, "current1").exists() ) {
 					sourceCurrent = 1;
@@ -98,7 +103,7 @@
 					throw new AssertionFailure( "No current file marker found in source directory: " + sourceIndexDir.getPath() );
 				}
 				try {
-					FileHelper.synchronize( new File( sourceIndexDir, String.valueOf(sourceCurrent) ),
+					FileHelper.synchronize( new File( sourceIndexDir, String.valueOf( sourceCurrent ) ),
 							destinationFile, true, copyChunkSize );
 				}
 				catch (IOException e) {
@@ -108,26 +113,27 @@
 					throw new SearchException( "Unable to create the directory marker file: " + indexName );
 				}
 			}
-			log.debug( "Current directory: {}", current);
+			log.debug( "Current directory: {}", currentToBe);
 		}
 		catch (IOException e) {
 			throw new SearchException( "Unable to initialize index: " + directoryProviderName, e );
 		}
-		timer = new Timer(true); //daemon thread, the copy algorithm is robust
 		TimerTask task = new TriggerTask( sourceIndexDir, indexDir );
+		long period = DirectoryProviderHelper.getRefreshPeriod( properties, directoryProviderName );
 		timer.scheduleAtFixedRate( task, period, period );
+		this.current = currentToBe;
 	}
 
-	//FIXME this is Thread-Unsafe! A memory barrier is missing.
 	public FSDirectory getDirectory() {
-		if (current == 1) {
+		int readState = current;// to have the read consistent in the next two "if"s.
+		if (readState == 1) {
 			return directory1;
 		}
-		else if (current == 2) {
+		else if (readState == 2) {
 			return directory2;
 		}
 		else {
-			throw new AssertionFailure( "Illegal current directory: " + current );
+			throw new AssertionFailure( "Illegal current directory: " + readState );
 		}
 	}
 
@@ -138,7 +144,12 @@
 		// after initialize call
 		if ( obj == this ) return true;
 		if ( obj == null || !( obj instanceof FSSlaveDirectoryProvider ) ) return false;
-		return indexName.equals( ( (FSSlaveDirectoryProvider) obj ).indexName );
+		FSSlaveDirectoryProvider other = (FSSlaveDirectoryProvider)obj;
+		//need to break memory barriers on both instances:
+		@SuppressWarnings("unused")
+		int readCurrentState = this.current; //unneded value, but ensure visibility of indexName
+		readCurrentState = other.current; //another unneded value, but ensure visibility of indexName
+		return indexName.equals( other.indexName );
 	}
 
 	@Override
@@ -146,6 +157,8 @@
 		// this code is actually broken since the value change after initialize call
 		// but from a practical POV this is fine since we only call this method
 		// after initialize call
+		@SuppressWarnings("unused")
+		int readCurrentState = current; //unneded value, but ensure visibility of indexName
 		int hash = 11;
 		return 37 * hash + indexName.hashCode();
 	}
@@ -161,11 +174,15 @@
 		}
 
 		public void run() {
-			if ( ! copyTask.inProgress ) {
+			if ( copyTask.inProgress.compareAndSet( false, true ) ) {
 				executor.execute( copyTask );
 			}
 			else {
-				log.trace( "Skipping directory synchronization, previous work still in progress: {}", indexName);
+				if (log.isTraceEnabled()) {
+					@SuppressWarnings("unused")
+					int unneeded = current;//ensure visibility of indexName in Timer threads.
+					log.trace( "Skipping directory synchronization, previous work still in progress: {}", indexName);
+				}
 			}
 		}
 	}
@@ -173,7 +190,7 @@
 	class CopyDirectory implements Runnable {
 		private final File source;
 		private final File destination;
-		private volatile boolean inProgress;
+		private final AtomicBoolean inProgress = new AtomicBoolean( false );
 
 		public CopyDirectory(File sourceIndexDir, File destination) {
 			this.source = sourceIndexDir;
@@ -183,9 +200,8 @@
 		public void run() {
 			long start = System.currentTimeMillis();
 			try {
-				inProgress = true;
 				int oldIndex = current;
-				int index = current == 1 ? 2 : 1;
+				int index = oldIndex == 1 ? 2 : 1;
 				File sourceFile;
 				if ( new File( source, "current1" ).exists() ) {
 					sourceFile = new File(source, "1");
@@ -195,10 +211,8 @@
 				}
 				else {
 					log.error( "Unable to determine current in source directory" );
-					inProgress = false;
 					return;
 				}
-
 				File destinationFile = new File( destination, Integer.valueOf( index ).toString() );
 				try {
 					log.trace( "Copying {} into {}", sourceFile, destinationFile );
@@ -208,7 +222,6 @@
 				catch (IOException e) {
 					//don't change current
 					log.error( "Unable to synchronize " + indexName, e);
-					inProgress = false;
 					return;
 				}
 				if ( ! new File( indexName, "current" + oldIndex ).delete() ) {
@@ -222,13 +235,15 @@
 				}
 			}
 			finally {
-				inProgress = false;
+				inProgress.set( false );
 			}
 			log.trace( "Copy for {} took {} ms", indexName, (System.currentTimeMillis() - start) );
 		}
 	}
 
 	public void stop() {
+		@SuppressWarnings("unused")
+		int readCurrentState = current; //unneded value, but ensure visibility of state protected by memory barrier
 		timer.cancel();
 		try {
 			directory1.close();




More information about the hibernate-commits mailing list