[jboss-svn-commits] JBL Code SVN: r27390 - in labs/jbosstm/trunk/ArjunaCore/arjuna: classes/com/arjuna/ats/arjuna/coordinator and 3 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Jul 1 11:37:19 EDT 2009
Author: adinn
Date: 2009-07-01 11:37:19 -0400 (Wed, 01 Jul 2009)
New Revision: 27390
Added:
labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java
Modified:
labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java
labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java
Log:
ensured all recovery threads start and stop cleanly -- fixes JBTM-569
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml 2009-07-01 15:37:19 UTC (rev 27390)
@@ -52,6 +52,9 @@
<run.tests.macro>
<tests><fileset dir="tests/classes" includes="**/PersistenceUnitTest.java"/></tests>
</run.tests.macro>
+ <run.tests.macro>
+ <tests><fileset dir="tests/classes" includes="**/RecoveryManagerStartStopTest.java"/></tests>
+ </run.tests.macro>
</target>
-</project>
\ No newline at end of file
+</project>
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -65,10 +65,16 @@
{
public void run()
{
- if (transactionStatusManager != null)
+ // guard against simultaneous user-initiated shutdown
+ // synchronize on the class since the shutdown method on TxControl is static synchronized
+ synchronized (TxControl.class) {
+ // check that this hook is still active
+ if (_shutdownHook == this && transactionStatusManager != null)
{
- transactionStatusManager.finalize();
+ transactionStatusManager.shutdown();
+ transactionStatusManager = null;
}
+ }
}
};
@@ -298,11 +304,11 @@
{
Runtime.getRuntime().removeShutdownHook(_shutdownHook);
- _shutdownHook = null;
-
+ _shutdownHook = null;
+
if (transactionStatusManager != null)
{
- transactionStatusManager.finalize();
+ transactionStatusManager.shutdown();
transactionStatusManager = null;
}
}
@@ -326,6 +332,9 @@
static int numberOfTransactions = 100;
+ /**
+ * flag which is tue if TxControl is enabled and false if it is disabled
+ */
static boolean enable = true;
private static TransactionStatusManager transactionStatusManager = null;
@@ -336,6 +345,12 @@
static int _defaultTimeout = 60; // 60 seconds
+ /**
+ * flag which is true if enable and disable operations, respectively, start and stop the transaction status
+ * manager and false if they do not perform a start and stop. this flag is true by default and can only be
+ * set to false by setting property {@link#com.arjuna.ats.arjuna.common.TRANSACTION_STATUS_MANAGER_ENABLE}
+ * to value "NO"
+ */
static boolean _enableTSM = true;
static boolean beforeCompletionWhenRollbackOnly = false;
@@ -515,7 +530,11 @@
if ("NO".equalsIgnoreCase(enableTSM))
_enableTSM = false;
+ // TODO -- add this check to respect the environment setting for Environment.START_DISABLED?
+ // TODO -- is this feature actually needed (it appears not to be used internally)
+ // if (enable) {
createTransactionStatusManager();
+ // }
}
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -141,9 +141,10 @@
}
/*
- * TODO: why do we need to check this? No nulls should
- * ever be sent and the connection is blocking!
- */
+ * check for null - in theory we get this from readLine when EOF has been reached, although in practice
+ * since we are reading from a socket we will probably get an IOException in which case we will still
+ * see null
+ */
if ((transactionType == null) && (strUid == null))
return;
@@ -445,7 +446,6 @@
*/
private static ObjectStore _objectStore = null;
-
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -262,6 +262,9 @@
// does nothing when running embedded.
/**
+ * wait for the recovery thread to be shutdown. n.b. this will not return unless and until shutdown
+ * is called.
+ *
* @throws IllegalStateException if the recovery manager has been shutdown.
*/
@@ -516,6 +519,10 @@
System.out.println("Ready");
}
+ // this is never going to return because it only returns when shutdon is called and
+ // there is nothing which is going to call shutdown. we probably oght to provide a
+ // clean way of terminating this process.
+
manager.waitForTermination();
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -33,6 +33,17 @@
import java.io.* ;
+/**
+ * a service is used to serve one or more requests from an input stream and post results on an output stream.
+ * when the input sream is closed it is expected to close its output stream.</p>
+ *
+ * note that a single service instance may be requested to process incoming requests from multiple input
+ * streams in parallel.</p>
+ *
+ * note also that the service should be resilient to closure of the input and output streams during request
+ * processing which can happen in resposne to asynchronous dispatch of a shutdown request to the object which
+ * invoked the service.
+ */
public interface Service
{
public void doWork ( InputStream in, OutputStream out )
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -88,8 +88,10 @@
/**
* The work item to be executed.
+ *
+ * this must be private as it should only be called once. otherwise we leak listener threads
*/
- public void addService( Service service, ServerSocket serverSocket )
+ private void addService( Service service, ServerSocket serverSocket )
{
try
{
@@ -111,11 +113,14 @@
}
}
- /**
+ /*
* Removes the TransactionStatusManager from the object store
* and closes down the listener thread.
- */
+ * this will never work as a finalizer because the listener thread is always running and keeping this
+ * instance from being garbage collected. we need a proper shutdonw method which closes the
+ * listener socket causing the thread to shut down
+ *
// TODO consider adding a shutdown operation (signature change)
public void finalize()
{
@@ -127,6 +132,7 @@
TransactionStatusManagerItem.removeThis( Utility.getProcessUid() ) ;
}
}
+ */
/**
* Create service and Transaction status manager item.
@@ -187,6 +193,14 @@
}
}
+ public void shutdown()
+ {
+ if (_listener != null) {
+ _listener.stopListener() ;
+ TransactionStatusManagerItem.removeThis( Utility.getProcessUid() ) ;
+ _listener = null;
+ }
+ }
/**
* Lookup the listener port for the transaction manager
* @param defValue the value to use if no valid port number can be found
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -43,12 +43,21 @@
{
/**
* Takes socket and service to execute.
- *
- * @message com.arjuna.ats.internal.arjuna.recovery.Connection_2 [com.arjuna.ats.internal.arjuna.recovery.Connection_2] - Setting timeout exception.
*/
- public Connection( Socket server_socket, Service service )
+ public Connection( Socket server_socket, Service service)
{
+ this(server_socket, service, null);
+ }
+
+ /**
+ * Takes socket and service to execute and a callback to run when processing of the connection has completed
+ *
+ * @message com.arjuna.ats.internal.arjuna.recovery.Connection_2 [com.arjuna.ats.internal.arjuna.recovery.Connection_2] - Setting timeout exception.
+ */
+
+ public Connection( Socket server_socket, Service service, Callback callback )
+ {
super( "Server.Connection:" + server_socket.getInetAddress().getHostAddress() + ":" + server_socket.getPort() );
_server_socket = server_socket;
@@ -64,6 +73,8 @@
}
_service = service;
+
+ _callback = callback;
}
/**
@@ -87,11 +98,29 @@
if (tsLogger.arjLoggerI18N.isWarnEnabled())
tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.Connection_1");
}
+ finally
+ {
+
+ // run the callback to notify completion of processing for this connection
+
+ if (_callback != null) {
+ _callback.run();
+ }
+ }
}
// What client (RecoveryManager) talks to.
private Socket _server_socket;
// What Service is provided to the client(RecoveryManager).
- private Service _service;
+ private Service _service;
+
+ private Callback _callback;
+
+ // abstract class instantiated by clients to allow notification that a connection has been closed
+
+ public static abstract class Callback
+ {
+ abstract void run();
+ }
}
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -72,52 +72,24 @@
public class ExpiredEntryMonitor extends Thread
{
- /**
+ /**
* Start the monitor thread, if the properties make it appropriate
*/
- public static boolean startUp()
+ public static synchronized boolean startUp()
{
// ensure singleton
- if ( _started )
+ if ( _theInstance != null )
{
return false;
}
+
+ // process system properties -- n.b. we only do this once!
+
+ if (!initialised) {
+ initialise();
+ }
- /*
- * Read the system properties to set the configurable options
- */
-
- String scanIntervalString =
- arjPropertyManager.propertyManager.getProperty( RecoveryEnvironment.EXPIRY_SCAN_INTERVAL );
-
- if ( scanIntervalString != null )
- {
- try
- {
- Integer scanIntervalInteger = new Integer(scanIntervalString);
- // convert to seconds
- _scanIntervalSeconds = scanIntervalInteger.intValue() * 60 * 60;
-
- if (tsLogger.arjLoggerI18N.debugAllowed())
- {
- tsLogger.arjLoggerI18N.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
- FacilityCode.FAC_CRASH_RECOVERY,
- "com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_1",
- new Object[]{Integer.toString(_scanIntervalSeconds)});
- }
- }
- catch ( NumberFormatException e )
- {
- if (tsLogger.arjLoggerI18N.isWarnEnabled())
- {
- tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_11",
- new Object[]{RecoveryEnvironment.EXPIRY_SCAN_INTERVAL,
- scanIntervalString});
- }
- }
- }
-
if ( _scanIntervalSeconds == 0 )
{
// no scanning wanted
@@ -131,16 +103,7 @@
return false;
}
-
- // is it being used to skip the first time
- if ( _scanIntervalSeconds < 0 )
- {
- notSkipping = false;
- _scanIntervalSeconds = - _scanIntervalSeconds;
- }
-
- loadScanners();
-
+
if ( _expiryScanners.size() == 0 )
{
// nothing to do
@@ -156,23 +119,34 @@
}
// create, and thus launch the monitor
- _theInstance = new ExpiredEntryMonitor();
+
+ _theInstance = new ExpiredEntryMonitor(_skipFirst);
+
+ _theInstance.start();
- return _started;
+ return true;
}
-
- public static void shutdown()
+
+ /**
+ * terminate any currently active monitor thread, cancelling any further scans but waiting for the
+ * thread to exit before returning
+ */
+ public synchronized static void shutdown()
{
- _started = false;
- _expiryScanners = new Vector();
- _scanIntervalSeconds = 12 * 60 * 60;
- notSkipping = true;
-
- _theInstance.interrupt();
+ if (_theInstance != null) {
+ _theInstance.terminate();
+ // now wait for it to finish
+ try {
+ _theInstance.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
_theInstance = null;
}
- private ExpiredEntryMonitor()
+ private ExpiredEntryMonitor(boolean skipFirst)
{
if (tsLogger.arjLoggerI18N.isDebugEnabled())
{
@@ -180,16 +154,14 @@
FacilityCode.FAC_CRASH_RECOVERY,
"com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_4");
}
-
- _started = true;
+ _skipNext = skipFirst;
+ _stop = false;
this.setDaemon(true);
-
- start();
}
/**
- * Start the background thread to perform the periodic scans
+ * performs periodic scans until a shutdwn is notified
*/
public void run()
{
@@ -201,7 +173,18 @@
_theTimestamper.format(new Date()) + "----" );
}
- if ( notSkipping )
+ if (_skipNext)
+ {
+ // make sure we skip at most one scan
+
+ _skipNext = false;
+
+ if (tsLogger.arjLoggerI18N.isInfoEnabled())
+ {
+ tsLogger.arjLoggerI18N.info("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_5");
+ }
+ }
+ else
{
Enumeration scanners = _expiryScanners.elements();
@@ -209,6 +192,15 @@
{
ExpiryScanner m = (ExpiryScanner)scanners.nextElement();
+ // check for a shutdown request before starting a scan
+ synchronized (this) {
+ if (_stop) {
+ break;
+ }
+ }
+
+ // ok go ahead and scan
+
m.scan();
if (tsLogger.arjLogger.isDebugEnabled())
@@ -217,37 +209,95 @@
FacilityCode.FAC_CRASH_RECOVERY," ");
// bit of space if detailing
}
- }
+ }
}
- else
- {
- if (tsLogger.arjLoggerI18N.isInfoEnabled())
- {
- tsLogger.arjLoggerI18N.info("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_5");
- }
-
- notSkipping = true;
- }
-
+
// wait for a bit to avoid catching (too many) transactions etc. that
// are really progressing quite happily
try
{
- Thread.sleep( _scanIntervalSeconds * 1000 );
+ // check for shutdown request before we sleep
+ synchronized (this) {
+ if (_stop) {
+ break;
+ }
+ wait( _scanIntervalSeconds * 1000 );
+ // check if we were woken because of a shutdown
+ if (_stop) {
+ break;
+ }
+ }
}
catch ( InterruptedException e1 )
{
- break;
+ // we should only get shut down by a shutdown request so ignore interrupts
}
-
- if ( !_started )
- return;
}
}
-
+
+ private synchronized void terminate()
+ {
+ _stop = true;
+ notify();
+ }
+
+ private static void initialise()
+ {
+ /*
+ * Read the system properties to set the configurable options
+ */
+
+ String scanIntervalString =
+ arjPropertyManager.propertyManager.getProperty( RecoveryEnvironment.EXPIRY_SCAN_INTERVAL );
+
+ if ( scanIntervalString != null )
+ {
+ try
+ {
+ Integer scanIntervalInteger = new Integer(scanIntervalString);
+ // convert to seconds
+ _scanIntervalSeconds = scanIntervalInteger.intValue() * 60 * 60;
+
+ if (tsLogger.arjLoggerI18N.debugAllowed())
+ {
+ tsLogger.arjLoggerI18N.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+ FacilityCode.FAC_CRASH_RECOVERY,
+ "com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_1",
+ new Object[]{Integer.toString(_scanIntervalSeconds)});
+ }
+ }
+ catch ( NumberFormatException e )
+ {
+ if (tsLogger.arjLoggerI18N.isWarnEnabled())
+ {
+ tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_11",
+ new Object[]{RecoveryEnvironment.EXPIRY_SCAN_INTERVAL,
+ scanIntervalString});
+ }
+ }
+ }
+
+ if (_scanIntervalSeconds != 0)
+ {
+
+ // is it being used to skip the first time
+ if ( _scanIntervalSeconds < 0 )
+ {
+ _skipFirst = true;
+ _scanIntervalSeconds = - _scanIntervalSeconds;
+ }
+
+ loadScanners();
+ }
+
+ initialised = true;
+ }
+
private static void loadScanners()
{
+ _expiryScanners = new Vector();
+
// search our properties
Properties properties = arjPropertyManager.propertyManager.getProperties();
@@ -261,7 +311,7 @@
if ( propertyName.startsWith(RecoveryEnvironment.SCANNER_PROPERTY_PREFIX) )
{
- loadScanner( properties.getProperty(propertyName) );
+ loadScanner( properties.getProperty(propertyName));
}
}
}
@@ -346,21 +396,46 @@
}
}
}
-
- private static boolean _started = false;
-
- private static Vector _expiryScanners = new Vector();
-
+
+ /**
+ * flag which causes the next scan to be skipped if it is true. this is set from _skipFirst when a
+ * monitor is created and rest to false each time a scan is considered.
+ */
+ private boolean _skipNext;
+
+ /**
+ * flag which causes the monitor thread to stop running when it is set to true
+ */
+ private boolean _stop;
+
+ /**
+ * list of scanners to be invoked by the monitor thread in order to check for expired log entries
+ */
+ private static Vector _expiryScanners;
+
+ /**
+ * flag which guards processing of properties ensuirng it is only performed once
+ */
+ private static boolean initialised = false;
+
+ /**
+ * the default scanning interval if the property file does not supply one
+ */
private static int _scanIntervalSeconds = 12 * 60 * 60;
-
+
+ /**
+ * a date format used to log the time for a scan
+ */
private static SimpleDateFormat _theTimestamper = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss");
-
- private static boolean notSkipping = true;
-
- private static ExpiredEntryMonitor _theInstance = null;
-
-}
+ /**
+ * a flag which if true causes the scanner to perform a scan when it is first starts or if false skip this
+ * first scan. it can be set to true by supplying a negative scan interval in the property file.
+ */
+ private static boolean _skipFirst = false;
-
-
+ /**
+ * the currently active monitor instance or null if no scanner is active
+ */
+ private static ExpiredEntryMonitor _theInstance = null;
+}
\ No newline at end of file
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -33,6 +33,8 @@
import java.io.*;
import java.net.*;
+import java.util.List;
+import java.util.LinkedList;
import com.arjuna.ats.arjuna.common.*;
import com.arjuna.ats.arjuna.logging.FacilityCode;
@@ -61,6 +63,8 @@
if (Listener.setTimeout)
_listener_socket.setSoTimeout( _listener_socket_timeout_in_msecs );
+
+ connections = new LinkedList<Socket>();
}
/**
@@ -80,14 +84,18 @@
if (Listener.setTimeout)
_listener_socket.setSoTimeout( _listener_socket_timeout_in_msecs );
+
+ connections = new LinkedList<Socket>();
}
-
- /**
+
+ /*
* Close down the socket.
*
* @message com.arjuna.ats.internal.arjuna.recovery.Listener_1 [com.arjuna.ats.internal.arjuna.recovery.Listener_1] - failed to close listener socket
- */
+ * this is pointless because this instance is a thread so never gets garbage collected until it has stopped running.
+ * but that means shutdown will have been called making the call to close in this method redundant.
+
public void finalize()
{
stopListener();
@@ -101,6 +109,7 @@
tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.Listener_1");
}
}
+ */
/**
* Loops waiting for connection requests from client,
@@ -115,9 +124,16 @@
{
try
{
- Socket conn = _listener_socket.accept();
+ final Socket conn = _listener_socket.accept();
+ addConnection(conn);
+ Connection.Callback callback = new Connection.Callback() {
+ private Socket _conn = conn;
+ public void run() {
+ removeConnection(_conn);
+ }
+ };
- Connection new_conn = new Connection( conn, _listener_service );
+ Connection new_conn = new Connection( conn, _listener_service, callback );
if (tsLogger.arjLogger.debugAllowed())
{
@@ -139,6 +155,7 @@
}
catch (final SocketException ex)
{
+ // we get this if the socket is closed under a call to shutdown
if (tsLogger.arjLogger.debugAllowed())
{
tsLogger.arjLogger.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_CRASH_RECOVERY,
@@ -156,23 +173,61 @@
}
}
}
-
+
+ public synchronized void addConnection(Socket conn)
+ {
+ if (!_stop_listener) {
+ connections.add(conn);
+ }
+ }
+
+ public synchronized void removeConnection(Socket conn)
+ {
+ connections.remove(conn);
+ notifyAll();
+ }
+
/**
* Halts running of the listener thread.
*/
- public void stopListener()
+ public synchronized void stopListener()
{
_stop_listener = true;
- this.interrupt();
+
+ try
+ {
+ _listener_socket.close(); // in case we're still in accept
+ }
+ catch (final Exception ex)
+ {
+ }
+ // there is no need for this as the close will interrupt any i/o that is in progress
+ // this.interrupt();
+
+ // ok, closing a connection socket will cause the connection thread to remove it from the list as it
+ // exits so we keep on closing them and waiting until the list is empty
+
+ while(connections.size() > 0) {
+ Socket conn = connections.get(0);
+ try {
+ conn.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
- try
- {
- _listener_socket.close(); // in case we're still in accept
- }
- catch (final Exception ex)
- {
- }
+ // make sure this listener thread has exited before we return
+
+ try {
+ this.join();
+ } catch (InterruptedException ie) {
+ }
}
// Socket & port which client(RecoveryManager) connects to.
@@ -188,6 +243,8 @@
// The work item to execute.
private Service _listener_service;
+ private List<Socket> connections;
+
private static boolean setTimeout = false;
static
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -81,38 +81,23 @@
{
/***** public API *****/
-/*
- * TODO uncomment for JDK 1.5.
- *
- public static enum Status
- {
- INACTIVE, SCANNING
- }
-
- public static enum Mode
- {
- ENABLED, SUSPENDED, TERMINATED
- }
-*/
/**
* state values indicating whether or not some thread is currently scanning. used to define values of field
* {@link PeriodicRecovery#_currentStatus}
*/
- public class Status
- {
- /**
- * state value indicating that no thread is scanning
- */
- public static final int INACTIVE = 0;
- /**
- * state value indicating that some thread is scanning.
- * n.b. the scanning thread may not be the singleton PeriodicRecovery thread instance
- */
- public static final int SCANNING = 1;
+ public static enum Status
+ {
+ /**
+ * state value indicating that no thread is scanning
+ */
+ INACTIVE,
+ /**
+ * state value indicating that some thread is scanning.
+ * n.b. the scanning thread may not be the singleton PeriodicRecovery thread instance
+ */
+ SCANNING
+ }
- private Status() { }
- }
-
/**
* state values indicating operating mode of scanning process for ad hoc threads and controlling behaviour of
* singleton periodic recovery thread. used to define values of field {@link PeriodicRecovery#_currentMode}
@@ -123,25 +108,23 @@
* {@link PeriodicRecovery#_currentStatus} may (temporarily) remain in state SCANNING before transitioning
* to state INACTIVE.
*/
- public class Mode
- {
- /**
- * state value indicating that new scans may proceed
- */
- public static final int ENABLED = 0;
- /**
- * state value indicating that new scans may not proceed and the periodic recovery thread should suspend
- */
- public static final int SUSPENDED = 1;
- /**
- * state value indicating that new scans may not proceed and that the singleton
- * PeriodicRecovery thread instance should exit if it is still running
- */
- public static final int TERMINATED = 2;
+ public static enum Mode
+ {
+ /**
+ * state value indicating that new scans may proceed
+ */
+ ENABLED,
+ /**
+ * state value indicating that new scans may not proceed and the periodic recovery thread should suspend
+ */
+ SUSPENDED,
+ /**
+ * state value indicating that new scans may not proceed and that the singleton
+ * PeriodicRecovery thread instance should exit if it is still running
+ */
+ TERMINATED
+ }
- private Mode() { }
- }
-
/**
*
*
@@ -150,6 +133,8 @@
*/
public PeriodicRecovery (boolean threaded, boolean useListener)
{
+ super("Periodic Recovery");
+
initialise();
// Load the recovery modules that actually do the work.
@@ -191,7 +176,7 @@
start();
}
- if(useListener)
+ if(useListener && _listener != null)
{
if (tsLogger.arjLogger.isDebugEnabled())
{
@@ -212,6 +197,11 @@
*/
public void shutdown (boolean async)
{
+ // stop the lsitener from adding threads which can exercise the worker
+ if (_listener != null) {
+ _listener.stopListener();
+ }
+
synchronized (_stateLock) {
if (getMode() != Mode.TERMINATED) {
if (tsLogger.arjLogger.isDebugEnabled())
@@ -369,7 +359,7 @@
} else {
// status == INACTIVE so we can go ahead and scan if scanning is enabled
switch (getMode()) {
- case Mode.ENABLED:
+ case ENABLED:
// ok grab our chance to be the scanning thread
if (tsLogger.arjLogger.isDebugEnabled())
{
@@ -381,7 +371,7 @@
_stateLock.notifyAll();
workToDo = true;
break;
- case Mode.SUSPENDED:
+ case SUSPENDED:
// we need to wait while we are suspended
if (tsLogger.arjLogger.isDebugEnabled())
{
@@ -392,7 +382,7 @@
// we come out of here with the lock and either ENABLED or TERMINATED
finished = (getMode() == Mode.TERMINATED);
break;
- case Mode.TERMINATED:
+ case TERMINATED:
finished = true;
break;
}
@@ -438,6 +428,12 @@
}
} while (!finished);
+ // make sure the worker thread is not wedged waiting for a scan to complete
+
+ synchronized(_stateLock) {
+ notifyWorker();
+ }
+
if (tsLogger.arjLogger.isDebugEnabled())
{
tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
@@ -545,7 +541,7 @@
tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: listener worker interrupts background thread");
}
- this.interrupt();
+ _stateLock.notifyAll();
}
}
}
@@ -614,7 +610,7 @@
* <b>Caveats:</b> must only be called while synchronized on {@link PeriodicRecovery#_stateLock}
* @return INACTIVE if no scan is in progress or SCANNING if some thread is performing a scan
*/
- private int getStatus ()
+ private Status getStatus ()
{
return _currentStatus;
}
@@ -625,7 +621,7 @@
* <b>Caveats:</b> must only be called while synchronized on {@link PeriodicRecovery#_stateLock}
* @return the current recovery operation mode
*/
- private int getMode ()
+ private Mode getMode ()
{
return _currentMode;
}
@@ -634,7 +630,7 @@
* set the current activity status
* @param status the new status to be used
*/
- private void setStatus (int status)
+ private void setStatus (Status status)
{
_currentStatus = status;
}
@@ -643,7 +639,7 @@
* set the current recovery operation mode
* @param mode the new mode to be used
*/
- private void setMode (int mode)
+ private void setMode (Mode mode)
{
_currentMode = mode;
}
@@ -829,7 +825,7 @@
}
if(_workerService != null)
{
- _workerService.signalDone();
+ _workerService.notifyDone();
}
_workerScanRequested = false;
}
@@ -1033,12 +1029,12 @@
/**
* activity status indicating whether we IDLING or some thread is SCANNING
*/
- private static int _currentStatus;
+ private static Status _currentStatus;
/**
* operating mode indicating whether scanning is ENABLED, SUSPENDED or TERMINATED
*/
- private static int _currentMode;
+ private static Mode _currentMode;
/**
* flag indicating whether the listener has prodded the recovery thread
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -243,11 +243,11 @@
*/
public void stop (boolean async)
{
- _periodicRecovery.shutdown(async);
+ // must ensure we clean up dependent threads
- // TODO why?
+ ExpiredEntryMonitor.shutdown();
- // ExpiredEntryMonitor.shutdown();
+ _periodicRecovery.shutdown(async);
}
/**
@@ -272,7 +272,10 @@
{
stop(true);
}
-
+
+ /**
+ * wait for the recovery implementation to be shut down.
+ */
public void waitForTermination ()
{
try
Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java 2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -50,7 +50,7 @@
_periodicRecovery = pr;
}
- public synchronized void doWork (InputStream is, OutputStream os) throws IOException
+ public void doWork (InputStream is, OutputStream os) throws IOException
{
BufferedReader in = new BufferedReader(new InputStreamReader(is));
PrintWriter out = new PrintWriter(new OutputStreamWriter(os));
@@ -66,12 +66,29 @@
else
if (request.equals("SCAN") || (request.equals("ASYNC_SCAN")))
{
+ // hmm, we need to synchronize on the periodic recovery object in order to wake it up via notify.
+ // but the periodic recovery object has to synchronize on this object and then call notify
+ // in order to tell it that the last requested scan has completed. i.e. we have a two way
+ // wakeup here. so we have to be careful to avoid a deadlock.
+
+ if (request.equals("SCAN")) {
+ // do this before kicking the periodic recovery thread
+ synchronized (this) {
+ doWait = true;
+ }
+ }
+ // now we only need to hold one lock
_periodicRecovery.wakeUp();
tsLogger.arjLogger.info("com.arjuna.ats.internal.arjuna.recovery.WorkerService_3");
if (request.equals("SCAN"))
{
+ synchronized (this) {
+ if (doWait) {
+ // ok, the periodic recovery thread cannot have finished responding to the last scan request
+ // so it is safe to wait. if we delivered the request while the last scan was still going
+ // then it will have been ignored but that is ok.
try
{
wait();
@@ -80,8 +97,9 @@
{
tsLogger.arjLogger.info("com.arjuna.ats.internal.arjuna.recovery.WorkerService_4");
}
+ }
+ }
}
-
out.println("DONE");
}
else
@@ -103,18 +121,20 @@
}
}
- public synchronized void signalDone ()
+ public synchronized void notifyDone()
{
try
{
notifyAll();
+ doWait = false;
}
catch (Exception ex)
{
}
}
-
+
private PeriodicRecovery _periodicRecovery = null;
+ private boolean doWait = false;
}
Added: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java 2009-07-01 15:37:19 UTC (rev 27390)
@@ -0,0 +1,196 @@
+package com.hp.mwtests.ts.arjuna.recovery;
+
+import junit.framework.TestCase;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import com.arjuna.ats.arjuna.recovery.RecoveryManager;
+import com.arjuna.ats.internal.arjuna.recovery.PeriodicRecovery;
+import com.arjuna.ats.internal.arjuna.recovery.Listener;
+
+import java.net.Socket;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * test to ensure that the recovery manager cleans up all its threads when terminated
+ */
+public class RecoveryManagerStartStopTest extends TestCase
+{
+ public static Test suite()
+ {
+ return new TestSuite(RecoveryManagerStartStopTest.class);
+ }
+
+ public void testStartStop() throws Exception
+ {
+ // check how many threads there are running
+
+ ThreadGroup thg = Thread.currentThread().getThreadGroup();
+ int activeCount = thg.activeCount();
+
+ dumpThreadGroup(thg, "Before recovery manager create");
+
+ RecoveryManager.delayRecoveryManagerThread() ;
+ RecoveryManager manager = RecoveryManager.manager(RecoveryManager.INDIRECT_MANAGEMENT);
+
+ dumpThreadGroup(thg, "Before recovery manager initialize");
+
+ manager.initialize();
+
+ // give threads a chance to start
+
+ Thread.sleep(2000);
+
+ dumpThreadGroup(thg, "Before recovery manager start periodic recovery thread");
+
+ manager.startRecoveryManagerThread();
+
+ dumpThreadGroup(thg, "Before recovery manager client create");
+
+ // we need to open several connections to the recovery manager listener service and then
+ // ensure they get closed dowm
+
+ addRecoveryClient();
+ addRecoveryClient();
+
+ Thread.sleep(5000);
+
+ dumpThreadGroup(thg, "Before recovery manager terminate");
+
+ manager.terminate();
+
+ // ensure the client threads get killed
+
+ ensureRecoveryClientsTerminated();
+
+ // ensure there are no extra threads running
+
+ Thread.sleep(2000);
+
+ dumpThreadGroup(thg, "After recovery manager terminate");
+
+ int newActiveCount = thg.activeCount();
+
+ assertEquals(activeCount, newActiveCount);
+ }
+
+ private void ensureRecoveryClientsTerminated() {
+ // check that any threads added to talk to the reocvery listener get their sockets closed
+
+ for (RecoveryManagerStartStopTestThread client : clients) {
+ try {
+ client.join();
+ } catch (InterruptedException e) {
+ // do nothing
+ }
+ assertFalse(client.failed());
+ }
+ }
+
+ private void addRecoveryClient() {
+ // open a connection to the recovery listener service in a new thread and ensure that the
+ // thread is terminated by habing its socket closed.
+
+ RecoveryManagerStartStopTestThread client = new RecoveryManagerStartStopTestThread();
+ clients.add(client);
+ client.start();
+ }
+
+ private void dumpThreadGroup(ThreadGroup thg, String header)
+ {
+ int activeCount = thg.activeCount();
+ Thread[] threads = new Thread[activeCount];
+ int reported = thg.enumerate(threads);
+
+ System.out.println(header);
+ System.out.println("Thread count == " + activeCount);
+ for (int i = 0; i < reported ; i++) {
+ System.out.println("Thread[" + i + "] == " + threads[i].getName());
+ }
+
+ System.out.flush();
+ }
+
+ private List<RecoveryManagerStartStopTestThread> clients = new ArrayList<RecoveryManagerStartStopTestThread>();
+
+ private static class RecoveryManagerStartStopTestThread extends Thread
+ {
+ private boolean failed = true;
+
+ public RecoveryManagerStartStopTestThread()
+ {
+ super("Recovery Listener Client");
+ }
+
+ public boolean failed()
+ {
+ return failed;
+ }
+
+ public void run() {
+ BufferedReader fromServer = null;
+ Socket connectorSocket = null;
+ // get a socket connected to the listener
+ // don't write anything just sit on a read until the socket is closed
+ try {
+ ServerSocket socket = PeriodicRecovery.getServerSocket();
+ InetAddress address;
+ String host;
+ int port;
+
+ address = socket.getInetAddress();
+
+ host = InetAddress.getLocalHost().getHostName();
+
+ port = PeriodicRecovery.getServerSocket().getLocalPort();
+
+ System.out.println("client atempting to connect to host " + host + " port " + port);
+ System.out.flush();
+
+ connectorSocket = new Socket(host, port);
+
+ System.out.println("connected!!!");
+ System.out.flush();
+
+ fromServer = new BufferedReader(new InputStreamReader(connectorSocket.getInputStream())) ;
+ } catch (Exception e) {
+ System.out.println("Failed to set up listener input stream!!!");
+ e.printStackTrace();
+ System.out.flush();
+
+ return;
+ }
+
+ try {
+ String result = fromServer.readLine();
+ if (result == null || result.equals("")) {
+ System.out.flush();
+ System.out.println("Recovery Listener Client got empty string from readline() as expected");
+ failed = false;
+ }
+ } catch (IOException e) {
+ if (!connectorSocket.isClosed()) {
+ System.out.println("Recovery Listener Client got IO exception without socket being closed");
+ System.out.flush();
+ e.printStackTrace();
+ try {
+ connectorSocket.close();
+ } catch (IOException e1) {
+ // ignore
+ }
+ } else {
+ System.out.flush();
+ System.out.println("Recovery Listener Client got IO exception under readline() as expected");
+ failed = false;
+ }
+ } catch (Exception e) {
+ System.out.println("Recovery Listener Client got non IO exception");
+ e.printStackTrace();
+ System.out.flush();
+ }
+ }
+ }
+}
More information about the jboss-svn-commits
mailing list