[jboss-svn-commits] JBL Code SVN: r27390 - in labs/jbosstm/trunk/ArjunaCore/arjuna: classes/com/arjuna/ats/arjuna/coordinator and 3 other directories.

jboss-svn-commits at lists.jboss.org jboss-svn-commits at lists.jboss.org
Wed Jul 1 11:37:19 EDT 2009


Author: adinn
Date: 2009-07-01 11:37:19 -0400 (Wed, 01 Jul 2009)
New Revision: 27390

Added:
   labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java
Modified:
   labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java
   labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java
Log:
ensured all recovery threads start and stop cleanly -- fixes JBTM-569

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/build.xml	2009-07-01 15:37:19 UTC (rev 27390)
@@ -52,6 +52,9 @@
         <run.tests.macro>
             <tests><fileset dir="tests/classes" includes="**/PersistenceUnitTest.java"/></tests>
         </run.tests.macro>
+        <run.tests.macro>
+            <tests><fileset dir="tests/classes" includes="**/RecoveryManagerStartStopTest.java"/></tests>
+        </run.tests.macro>
     </target>
 
-</project>
\ No newline at end of file
+</project>

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/coordinator/TxControl.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -65,10 +65,16 @@
     {
         public void run()
         {
-            if (transactionStatusManager != null)
+            // guard against simultaneous user-initiated shutdown
+            // synchronize on the class since the shutdown method on TxControl is static synchronized
+            synchronized (TxControl.class) {
+            // check that this hook is still active
+            if (_shutdownHook == this && transactionStatusManager != null)
             {
-                transactionStatusManager.finalize();
+                transactionStatusManager.shutdown();
+                transactionStatusManager = null;
             }
+            }
         }
     };
     
@@ -298,11 +304,11 @@
 	    {
 	        Runtime.getRuntime().removeShutdownHook(_shutdownHook);
 	        
-	        _shutdownHook = null;
-	        
+            _shutdownHook = null;
+
 	        if (transactionStatusManager != null)
 	        {
-	            transactionStatusManager.finalize();
+	            transactionStatusManager.shutdown();
 	            transactionStatusManager = null;
 	        }
 	    }
@@ -326,6 +332,9 @@
 
 	static int numberOfTransactions = 100;
 
+    /**
+     * flag which is tue if TxControl is enabled and false if it is disabled
+     */
 	static boolean enable = true;
 
 	private static TransactionStatusManager transactionStatusManager = null;
@@ -336,6 +345,12 @@
 
 	static int _defaultTimeout = 60; // 60 seconds
 
+    /**
+     * flag which is true if enable and disable operations, respectively, start and stop the transaction status
+     * manager and false if they do not perform a start and stop. this flag is true by default and can only be
+     * set to false by setting property {@link#com.arjuna.ats.arjuna.common.TRANSACTION_STATUS_MANAGER_ENABLE}
+     * to value "NO"
+     */
 	static boolean _enableTSM = true;
     
     static boolean beforeCompletionWhenRollbackOnly = false;
@@ -515,7 +530,11 @@
 		if ("NO".equalsIgnoreCase(enableTSM))
 		    _enableTSM = false;
 
+        // TODO -- add this check to respect the environment setting for Environment.START_DISABLED?
+        // TODO -- is this feature actually needed (it appears not to be used internally)
+        // if (enable) {
 		createTransactionStatusManager();
+        // }
 	}
 
 }

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/ActionStatusService.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -141,9 +141,10 @@
 	       }
 
 	       /*
-		* TODO: why do we need to check this? No nulls should
-		* ever be sent and the connection is blocking!
-		*/
+	        * check for null - in theory we get this from readLine when EOF has been reached, although in practice
+	        * since we are reading from a socket we will probably get an IOException in which case we will still
+	        * see null
+		    */
 
 	       if ((transactionType == null) && (strUid == null))
 		   return;
@@ -445,7 +446,6 @@
     */
 
    private static ObjectStore _objectStore = null;
-
 }
 
 

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/RecoveryManager.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -262,6 +262,9 @@
     // does nothing when running embedded.
 
     /**
+     * wait for the recovery thread to be shutdown. n.b. this will not return unless and until shutdown
+     * is called.
+     *
      * @throws IllegalStateException if the recovery manager has been shutdown.
      */
 
@@ -516,6 +519,10 @@
                 System.out.println("Ready");
             }
 
+            // this is never going to return because it only returns when shutdon is called and
+            // there is nothing which is going to call shutdown. we probably oght  to provide a
+            // clean way of terminating this process.
+
             manager.waitForTermination();
 
         }

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/Service.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -33,6 +33,17 @@
 
 import java.io.* ;
 
+/**
+ * a service is used to serve one or more requests from an input stream and post results on an output stream.
+ * when the input sream is closed it is expected to close its output stream.</p>
+ * 
+ * note that a single service instance may be requested to process incoming requests from multiple input
+ * streams in parallel.</p>
+ *
+ * note also that the service should be resilient to closure of the input and output streams during request
+ * processing which can happen in resposne to asynchronous dispatch of a shutdown request to the object which
+ * invoked the service.
+ */
 public interface Service
 {
    public void doWork ( InputStream in, OutputStream out )

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/arjuna/recovery/TransactionStatusManager.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -88,8 +88,10 @@
 
    /**
     * The work item to be executed.
+    *
+    * this must be private as it should only be called once. otherwise we leak listener threads
     */
-   public void addService( Service service, ServerSocket serverSocket )
+   private void addService( Service service, ServerSocket serverSocket )
    {
       try
       {
@@ -111,11 +113,14 @@
       }
    }
 
-   /**
+   /*
     * Removes the TransactionStatusManager from the object store
     * and closes down the listener thread.
-    */
 
+    * this will never work as a finalizer because the listener thread is always running and keeping this
+    * instance from being garbage collected. we need a proper shutdonw method which closes the
+    * listener socket causing the thread to shut down
+    *
    // TODO consider adding a shutdown operation (signature change)
    public void finalize()
    {
@@ -127,6 +132,7 @@
 	   TransactionStatusManagerItem.removeThis( Utility.getProcessUid() ) ;
       }
    }
+    */
 
    /**
     * Create service and Transaction status manager item.
@@ -187,6 +193,14 @@
       }
    }
 
+    public void shutdown()
+    {
+        if (_listener != null) {
+            _listener.stopListener() ;
+            TransactionStatusManagerItem.removeThis( Utility.getProcessUid() ) ;
+            _listener = null;
+        }
+    }
     /**
      * Lookup the listener port for the transaction manager
      * @param defValue the value to use if no valid port number can be found

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Connection.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -43,12 +43,21 @@
 {
    /**
     * Takes socket and service to execute.
-    *
-    * @message com.arjuna.ats.internal.arjuna.recovery.Connection_2 [com.arjuna.ats.internal.arjuna.recovery.Connection_2] - Setting timeout exception.
     */
 
-   public Connection( Socket server_socket, Service service )
+   public Connection( Socket server_socket, Service service)
    {
+       this(server_socket, service, null);
+   }
+
+    /**
+     * Takes socket and service to execute and a callback to run when processing of the connection has completed
+     *
+     * @message com.arjuna.ats.internal.arjuna.recovery.Connection_2 [com.arjuna.ats.internal.arjuna.recovery.Connection_2] - Setting timeout exception.
+     */
+
+    public Connection( Socket server_socket, Service service, Callback callback )
+   {
       super( "Server.Connection:" + server_socket.getInetAddress().getHostAddress() + ":" + server_socket.getPort() );
               
       _server_socket = server_socket;
@@ -64,6 +73,8 @@
       }
 
       _service = service;
+
+       _callback = callback;
    }
    
    /**
@@ -87,11 +98,29 @@
 	  if (tsLogger.arjLoggerI18N.isWarnEnabled())
 	      tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.Connection_1");
       }
+      finally
+      {
+
+      // run the callback to notify completion of processing for this connection
+
+      if (_callback != null) {
+          _callback.run();
+      }
+      }
    }
 
    // What client (RecoveryManager) talks to.
    private Socket  _server_socket;
    
    // What Service is provided to the client(RecoveryManager).
-   private Service _service;        
+   private Service _service;
+
+   private Callback _callback;
+
+    // abstract class instantiated by clients to allow notification that a connection has been closed
+    
+   public static abstract class Callback
+   {
+       abstract void run();
+   }
 }

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/ExpiredEntryMonitor.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -72,52 +72,24 @@
 
 public class ExpiredEntryMonitor extends Thread
 {
-   /**
+    /**
     *  Start the monitor thread, if the properties make it appropriate
     */
 
-  public static boolean startUp()
+  public static synchronized boolean startUp()
   {
       // ensure singleton
-      if ( _started )
+      if ( _theInstance != null )
       {
 	  return false;
       }
+
+      // process system properties -- n.b. we only do this once!
+
+      if (!initialised) {
+          initialise();
+      }
       
-      /*
-       * Read the system properties to set the configurable options
-       */
-      
-      String scanIntervalString = 
-	  arjPropertyManager.propertyManager.getProperty( RecoveryEnvironment.EXPIRY_SCAN_INTERVAL );
-      
-      if ( scanIntervalString != null )
-	  {
-	      try
-		  {
-		      Integer scanIntervalInteger = new Integer(scanIntervalString);
-		      // convert to seconds
-		      _scanIntervalSeconds = scanIntervalInteger.intValue() * 60 * 60;
-		      
-		      if (tsLogger.arjLoggerI18N.debugAllowed())
-		      {
-			  tsLogger.arjLoggerI18N.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
-							FacilityCode.FAC_CRASH_RECOVERY, 
-							"com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_1", 
-							new Object[]{Integer.toString(_scanIntervalSeconds)});
-		      }
-		  }
-	      catch ( NumberFormatException e )
-	      {
-		  if (tsLogger.arjLoggerI18N.isWarnEnabled())
-		  {
-		      tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_11", 
-						  new Object[]{RecoveryEnvironment.EXPIRY_SCAN_INTERVAL,
-							       scanIntervalString});
-		  }
-	      }
-	  }
-      
       if ( _scanIntervalSeconds == 0 )
       {
 	  // no scanning wanted
@@ -131,16 +103,7 @@
 	  
 	  return false;
       }
-      
-      // is it being used to skip the first time
-      if ( _scanIntervalSeconds < 0 )
-      {
-	  notSkipping = false;
-	  _scanIntervalSeconds = - _scanIntervalSeconds;
-      }
-      
-      loadScanners();
-      
+
       if ( _expiryScanners.size() == 0 )
       {
 	  // nothing to do
@@ -156,23 +119,34 @@
       }
       
       // create, and thus launch the monitor
-      _theInstance = new ExpiredEntryMonitor();
+
+      _theInstance = new ExpiredEntryMonitor(_skipFirst);
+
+      _theInstance.start();
       
-      return _started;
+      return true;
   }
-    
-  public static void shutdown()
+
+    /**
+     * terminate any currently active monitor thread, cancelling any further scans but waiting for the
+     * thread to exit before returning
+     */
+  public synchronized static void shutdown()
   {
-      _started = false;
-      _expiryScanners = new Vector();
-      _scanIntervalSeconds = 12 * 60 * 60;
-      notSkipping = true;
-      
-      _theInstance.interrupt();
+      if (_theInstance != null) {
+          _theInstance.terminate();
+          // now wait for it to finish
+          try {
+              _theInstance.join();
+          } catch (InterruptedException e) {
+              // ignore
+          }
+      }
+
       _theInstance = null;
   }
 
-  private ExpiredEntryMonitor()
+  private ExpiredEntryMonitor(boolean skipFirst)
   {
     if (tsLogger.arjLoggerI18N.isDebugEnabled())
     {
@@ -180,16 +154,14 @@
 				      FacilityCode.FAC_CRASH_RECOVERY, 
 				      "com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_4");
     }
-    
-    _started = true;
+    _skipNext = skipFirst;
+    _stop = false;
 
     this.setDaemon(true);
-
-    start();
   }
     
   /**
-   * Start the background thread to perform the periodic scans
+   * performs periodic scans until a shutdwn is notified
    */
   public void run()
   {
@@ -201,7 +173,18 @@
 				      _theTimestamper.format(new Date()) + "----" );
 	}
 	
-	if ( notSkipping )
+	if (_skipNext)
+    {
+        // make sure we skip at most one scan
+
+        _skipNext = false;
+
+        if (tsLogger.arjLoggerI18N.isInfoEnabled())
+        {
+            tsLogger.arjLoggerI18N.info("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_5");
+        }
+    }
+    else
 	{
 	    Enumeration scanners = _expiryScanners.elements();
 	    
@@ -209,6 +192,15 @@
 	    {
 		ExpiryScanner m = (ExpiryScanner)scanners.nextElement();
 
+            // check for a shutdown request before starting a scan
+            synchronized (this) {
+                if (_stop) {
+                    break;
+                }
+            }
+
+            // ok go ahead and scan
+
 		m.scan();
 			
 		if (tsLogger.arjLogger.isDebugEnabled())
@@ -217,37 +209,95 @@
 					      FacilityCode.FAC_CRASH_RECOVERY,"  "); 
 		    // bit of space if detailing
 		}
-	    }
+        }
 	}
-	else
-	{
-	    if (tsLogger.arjLoggerI18N.isInfoEnabled())
-	    {
-		tsLogger.arjLoggerI18N.info("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_5");
-	    }
-	    
-	    notSkipping = true;
-	}
-	
+
 	// wait for a bit to avoid catching (too many) transactions etc. that
 	// are really progressing quite happily
 
 	try
 	{
-	    Thread.sleep( _scanIntervalSeconds * 1000 );
+        // check for shutdown request before we sleep
+        synchronized (this) {
+        if (_stop) {
+            break;
+        }
+	    wait( _scanIntervalSeconds * 1000 );
+        // check if we were woken because of a shutdown
+        if (_stop) {
+            break;
+        }
+        }
 	}
 	catch ( InterruptedException e1 )
 	{
-	    break;
+        // we should only get shut down by a shutdown request so ignore interrupts
 	}
-	
-	if ( !_started )
-            return;
     }
   }
-    
+
+  private synchronized void terminate()
+  {
+      _stop = true;
+      notify();
+  }
+
+    private static void initialise()
+    {
+        /*
+         * Read the system properties to set the configurable options
+         */
+
+        String scanIntervalString =
+        arjPropertyManager.propertyManager.getProperty( RecoveryEnvironment.EXPIRY_SCAN_INTERVAL );
+
+        if ( scanIntervalString != null )
+        {
+            try
+            {
+                Integer scanIntervalInteger = new Integer(scanIntervalString);
+                // convert to seconds
+                _scanIntervalSeconds = scanIntervalInteger.intValue() * 60 * 60;
+
+                if (tsLogger.arjLoggerI18N.debugAllowed())
+                {
+                tsLogger.arjLoggerI18N.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
+                              FacilityCode.FAC_CRASH_RECOVERY,
+                              "com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_1",
+                              new Object[]{Integer.toString(_scanIntervalSeconds)});
+                }
+            }
+            catch ( NumberFormatException e )
+            {
+            if (tsLogger.arjLoggerI18N.isWarnEnabled())
+            {
+                tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.ExpiredEntryMonitor_11",
+                            new Object[]{RecoveryEnvironment.EXPIRY_SCAN_INTERVAL,
+                                     scanIntervalString});
+            }
+            }
+        }
+
+        if (_scanIntervalSeconds != 0)
+        {
+
+            // is it being used to skip the first time
+            if ( _scanIntervalSeconds < 0 )
+            {
+                _skipFirst = true;
+                _scanIntervalSeconds = - _scanIntervalSeconds;
+            }
+
+            loadScanners();
+        }
+
+        initialised = true;
+    }
+
   private static void loadScanners()
   {
+      _expiryScanners = new Vector();
+
     // search our properties
     Properties properties = arjPropertyManager.propertyManager.getProperties();
     
@@ -261,7 +311,7 @@
 	    
 	    if ( propertyName.startsWith(RecoveryEnvironment.SCANNER_PROPERTY_PREFIX) )
 	    {
-		loadScanner( properties.getProperty(propertyName) );
+		loadScanner( properties.getProperty(propertyName));
 	    }
 	}
     }
@@ -346,21 +396,46 @@
 	  }
       }
   }
-    
-    private static boolean _started = false;
-    
-    private static Vector _expiryScanners = new Vector();
-    
+
+    /**
+     * flag which causes the next scan to be skipped if it is true. this is set from _skipFirst when a
+     * monitor is created and rest to false each time a scan is considered.
+     */
+    private boolean _skipNext;
+
+    /**
+     * flag which causes the monitor thread to stop running when it is set to true
+     */
+    private boolean _stop;
+
+    /**
+     * list of scanners to be invoked by the monitor thread in order to check for expired log entries
+     */
+    private static Vector _expiryScanners;
+
+    /**
+     * flag which guards processing of properties ensuirng it is only performed once
+     */
+    private static boolean initialised = false;
+
+    /**
+     * the default scanning interval if the property file does not supply one
+     */
     private static int _scanIntervalSeconds = 12 * 60 * 60;
-    
+
+    /**
+     * a date format used to log the time for a scan
+     */
     private static SimpleDateFormat _theTimestamper = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss");
-    
-    private static boolean notSkipping = true;
-    
-    private static ExpiredEntryMonitor _theInstance = null;
-    
-}
 
+    /**
+     * a flag which if true causes the scanner to perform a scan when it is first starts or if false skip this
+     * first scan. it can be set to true by supplying a negative scan interval in the property file.
+     */
+    private static boolean _skipFirst = false;
 
-
-
+    /**
+     * the currently active monitor instance or null if no scanner is active
+     */
+    private static ExpiredEntryMonitor _theInstance = null;
+}
\ No newline at end of file

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/Listener.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -33,6 +33,8 @@
 
 import java.io.*;
 import java.net.*;
+import java.util.List;
+import java.util.LinkedList;
 
 import com.arjuna.ats.arjuna.common.*;
 import com.arjuna.ats.arjuna.logging.FacilityCode;
@@ -61,6 +63,8 @@
 
       if (Listener.setTimeout)
 	  _listener_socket.setSoTimeout( _listener_socket_timeout_in_msecs );
+
+       connections = new LinkedList<Socket>();
    }
    
    /**
@@ -80,14 +84,18 @@
 
       if (Listener.setTimeout)
 	  _listener_socket.setSoTimeout( _listener_socket_timeout_in_msecs );
+
+      connections = new LinkedList<Socket>();
    }
-   
-   /**
+
+   /*
     * Close down the socket.
     *
     * @message com.arjuna.ats.internal.arjuna.recovery.Listener_1 [com.arjuna.ats.internal.arjuna.recovery.Listener_1] - failed to close listener socket
-    */   
 
+    * this is pointless because this instance is a thread so never gets garbage collected until it has stopped running.
+    * but that means shutdown will have been called making the call to close in this method redundant.
+
    public void finalize()
    {
       stopListener();
@@ -101,6 +109,7 @@
 	  tsLogger.arjLoggerI18N.warn("com.arjuna.ats.internal.arjuna.recovery.Listener_1");
       }
    }      
+    */
 
    /**
     * Loops waiting for connection requests from client,
@@ -115,9 +124,16 @@
       {
          try
          {
-            Socket conn = _listener_socket.accept();
+            final Socket conn = _listener_socket.accept();
+            addConnection(conn);
+            Connection.Callback callback = new Connection.Callback() {
+                private Socket _conn = conn;
+                public void run() {
+                    removeConnection(_conn);
+                }
+            };
 
-            Connection new_conn = new Connection( conn, _listener_service );
+            Connection new_conn = new Connection( conn, _listener_service, callback );
 
      	    if (tsLogger.arjLogger.debugAllowed())
 	    {
@@ -139,6 +155,7 @@
          }
          catch (final SocketException ex)
          {
+             // we get this if the socket is closed under a call to shutdown
              if (tsLogger.arjLogger.debugAllowed())
              {
                  tsLogger.arjLogger.debug( DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC, FacilityCode.FAC_CRASH_RECOVERY,
@@ -156,23 +173,61 @@
          }
       }
    }
-   
+
+    public synchronized void addConnection(Socket conn)
+    {
+        if (!_stop_listener) {
+            connections.add(conn);
+        }
+    }
+
+    public synchronized void removeConnection(Socket conn)
+    {
+        connections.remove(conn);
+        notifyAll();
+    }
+
    /**
     * Halts running of the listener thread.
     */
 
-   public void stopListener()
+   public synchronized void stopListener()
    {
       _stop_listener = true;
-      this.interrupt();
+
+       try
+       {
+           _listener_socket.close();  // in case we're still in accept
+       }
+       catch (final Exception ex)
+       {
+       }
+      // there is no need for this as the close will interrupt any i/o that is in progress
+      // this.interrupt();
+
+       // ok, closing a connection socket will cause the connection thread to remove it from the list as it
+       // exits so we keep on closing them and waiting until the list is empty
+
+       while(connections.size() > 0) {
+           Socket conn = connections.get(0);
+           try {
+               conn.close();
+           } catch (Exception e) {
+               // ignore
+           }
+           try {
+               wait();
+           } catch (InterruptedException e) {
+               // ignore
+           }
+       }
       
-      try
-      {
-          _listener_socket.close();  // in case we're still in accept
-      }
-      catch (final Exception ex)
-      {
-      }
+       // make sure this listener thread has exited before we return
+
+       try {
+           this.join();
+       } catch (InterruptedException ie) {
+       }
    }
 
    // Socket & port which client(RecoveryManager) connects to.
@@ -188,6 +243,8 @@
    // The work item to execute.
    private Service _listener_service;
 
+    private List<Socket> connections;
+
     private static boolean setTimeout = false;
     
     static

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/PeriodicRecovery.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -81,38 +81,23 @@
 {
    /***** public API *****/
 
-/*
- * TODO uncomment for JDK 1.5.
- *
-   public static enum Status
-   {
-       INACTIVE, SCANNING
-   }
-
-   public static enum Mode
-   {
-       ENABLED, SUSPENDED, TERMINATED
-   }
-*/
     /**
      *  state values indicating whether or not some thread is currently scanning. used to define values of field
      * {@link PeriodicRecovery#_currentStatus}
      */
-    public class Status
-    {
-        /**
-         * state value indicating that no thread is scanning
-         */
-        public static final int INACTIVE = 0;
-        /**
-         * state value indicating that some thread is scanning.
-         * n.b. the scanning thread may not be the singleton PeriodicRecovery thread instance
-         */
-        public static final int SCANNING = 1;
+   public static enum Status
+   {
+       /**
+        * state value indicating that no thread is scanning
+        */
+       INACTIVE,
+       /**
+        * state value indicating that some thread is scanning.
+        * n.b. the scanning thread may not be the singleton PeriodicRecovery thread instance
+        */
+       SCANNING
+   }
 
-        private Status() { }
-    }
-
     /**
      * state values indicating operating mode of scanning process for ad hoc threads and controlling behaviour of
      * singleton periodic recovery thread. used to define values of field {@link PeriodicRecovery#_currentMode}
@@ -123,25 +108,23 @@
      * {@link PeriodicRecovery#_currentStatus} may (temporarily) remain in state SCANNING before transitioning
      * to state INACTIVE.
      */
-    public class Mode
-    {
-        /**
-         * state value indicating that new scans may proceed
-         */
-        public static final int ENABLED = 0;
-        /**
-         * state value indicating that new scans may not proceed and the periodic recovery thread should suspend
-         */
-        public static final int SUSPENDED = 1;
-        /**
-         * state value indicating that new scans may not proceed and that the singleton
-         * PeriodicRecovery thread instance should exit if it is still running
-         */
-        public static final int TERMINATED = 2;
+   public static enum Mode
+   {
+       /**
+        * state value indicating that new scans may proceed
+        */
+       ENABLED,
+       /**
+        * state value indicating that new scans may not proceed and the periodic recovery thread should suspend
+        */
+       SUSPENDED,
+       /**
+        * state value indicating that new scans may not proceed and that the singleton
+        * PeriodicRecovery thread instance should exit if it is still running
+        */
+       TERMINATED
+   }
 
-        private Mode() { }
-    }
-
     /**
      *
      *
@@ -150,6 +133,8 @@
      */
     public PeriodicRecovery (boolean threaded, boolean useListener)
     {
+        super("Periodic Recovery");
+        
         initialise();
 
         // Load the recovery modules that actually do the work.
@@ -191,7 +176,7 @@
             start();
         }
 
-        if(useListener)
+        if(useListener && _listener != null)
         {
             if (tsLogger.arjLogger.isDebugEnabled())
             {
@@ -212,6 +197,11 @@
      */
    public void shutdown (boolean async)
    {
+       // stop the lsitener from adding threads which can exercise the worker
+       if (_listener != null) {
+           _listener.stopListener();
+       }
+
        synchronized (_stateLock) {
            if (getMode() != Mode.TERMINATED) {
                if (tsLogger.arjLogger.isDebugEnabled())
@@ -369,7 +359,7 @@
                } else {
                    // status == INACTIVE so we can go ahead and scan if scanning is enabled
                    switch (getMode()) {
-                       case Mode.ENABLED:
+                       case ENABLED:
                            // ok grab our chance to be the scanning thread
                            if (tsLogger.arjLogger.isDebugEnabled())
                            {
@@ -381,7 +371,7 @@
                            _stateLock.notifyAll();
                            workToDo = true;
                            break;
-                       case Mode.SUSPENDED:
+                       case SUSPENDED:
                            // we need to wait while we are suspended
                            if (tsLogger.arjLogger.isDebugEnabled())
                            {
@@ -392,7 +382,7 @@
                            // we come out of here with the lock and either ENABLED or TERMINATED
                            finished = (getMode() == Mode.TERMINATED);
                            break;
-                       case Mode.TERMINATED:
+                       case TERMINATED:
                            finished = true;
                            break;
                    }
@@ -438,6 +428,12 @@
            }
        } while (!finished);
 
+       // make sure the worker thread is not wedged waiting for a scan to complete
+
+       synchronized(_stateLock) {
+           notifyWorker();
+       }
+
        if (tsLogger.arjLogger.isDebugEnabled())
        {
               tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
@@ -545,7 +541,7 @@
                     tsLogger.arjLogger.debug(DebugLevel.FUNCTIONS, VisibilityLevel.VIS_PUBLIC,
                             FacilityCode.FAC_CRASH_RECOVERY, "PeriodicRecovery: listener worker interrupts background thread");
                 }
-                this.interrupt();
+                _stateLock.notifyAll();
             }
         }
     }
@@ -614,7 +610,7 @@
      * <b>Caveats:</b> must only be called while synchronized on {@link PeriodicRecovery#_stateLock}
      * @return INACTIVE if no scan is in progress or SCANNING if some thread is performing a scan
      */
-    private int getStatus ()
+    private Status getStatus ()
     {
         return _currentStatus;
     }
@@ -625,7 +621,7 @@
      * <b>Caveats:</b> must only be called while synchronized on {@link PeriodicRecovery#_stateLock}
      * @return the current recovery operation mode
      */
-    private int getMode ()
+    private Mode getMode ()
     {
         return _currentMode;
     }
@@ -634,7 +630,7 @@
      * set the current activity status
      * @param status the new status to be used
      */
-    private void setStatus (int status)
+    private void setStatus (Status status)
     {
         _currentStatus = status;
     }
@@ -643,7 +639,7 @@
      * set the current recovery operation mode
      * @param mode the new mode to be used
      */
-    private void setMode (int mode)
+    private void setMode (Mode mode)
     {
         _currentMode = mode;
     }
@@ -829,7 +825,7 @@
             }
             if(_workerService != null)
             {
-                _workerService.signalDone();
+                _workerService.notifyDone();
             }
             _workerScanRequested = false;
         }
@@ -1033,12 +1029,12 @@
     /**
      * activity status indicating whether we IDLING or some thread is SCANNING
      */
-   private static int _currentStatus;
+   private static Status _currentStatus;
 
     /**
      * operating mode indicating whether scanning is ENABLED, SUSPENDED or TERMINATED
      */
-   private static int _currentMode;
+   private static Mode _currentMode;
 
     /**
      *  flag indicating whether the listener has prodded the recovery thread

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/RecoveryManagerImple.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -243,11 +243,11 @@
      */
         public void stop (boolean async)
         {
-                _periodicRecovery.shutdown(async);
+            // must ensure we clean up dependent threads
 
-                // TODO why?
+                ExpiredEntryMonitor.shutdown();
 
-                // ExpiredEntryMonitor.shutdown();
+                _periodicRecovery.shutdown(async);
         }
 
         /**
@@ -272,7 +272,10 @@
         {
                 stop(true);
         }
-        
+
+       /**
+        * wait for the recovery implementation to be shut down.
+        */
         public void waitForTermination ()
         {
             try

Modified: labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java	2009-07-01 15:36:18 UTC (rev 27389)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/classes/com/arjuna/ats/internal/arjuna/recovery/WorkerService.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -50,7 +50,7 @@
 	_periodicRecovery = pr;
     }
     
-    public synchronized void doWork (InputStream is, OutputStream os) throws IOException
+    public void doWork (InputStream is, OutputStream os) throws IOException
     {
 	BufferedReader in  = new BufferedReader(new InputStreamReader(is));
 	PrintWriter out = new PrintWriter(new OutputStreamWriter(os));
@@ -66,12 +66,29 @@
         else
 	    if (request.equals("SCAN") || (request.equals("ASYNC_SCAN")))
 	    {
+            // hmm, we need to synchronize on the periodic recovery object in order to wake it up via notify.
+            // but the periodic recovery object has to synchronize on this object and then call notify
+            // in order to tell it that the last requested scan has completed. i.e. we have a two way
+            // wakeup here. so we have to be careful to avoid a deadlock.
+
+            if (request.equals("SCAN")) {
+                // do this before kicking the periodic recovery thread
+                synchronized (this) {
+                    doWait = true;
+                }
+            }
+            // now we only need to hold one lock
 		_periodicRecovery.wakeUp();
 
 		tsLogger.arjLogger.info("com.arjuna.ats.internal.arjuna.recovery.WorkerService_3");
 
 		if (request.equals("SCAN"))
 		{
+            synchronized (this) {
+                if (doWait) {
+                    // ok, the periodic recovery thread cannot have finished responding to the last scan request
+                    // so it is safe to wait. if we delivered the request while the last scan was still going
+                    // then it will have been ignored but that is ok.
 		    try
 		    {
 			wait();
@@ -80,8 +97,9 @@
 		    {
 			tsLogger.arjLogger.info("com.arjuna.ats.internal.arjuna.recovery.WorkerService_4");
 		    }
+                }
+            }
 		}
-
 		out.println("DONE");
 	    }
 	    else
@@ -103,18 +121,20 @@
 	}
     }
 
-    public synchronized void signalDone ()
+    public synchronized void notifyDone()
     {
 	try
 	{
 	    notifyAll();
+        doWait = false;
 	}
 	catch (Exception ex)
 	{
 	}
     }
-    
+
     private PeriodicRecovery _periodicRecovery = null;
+    private boolean doWait = false;
 
 }
 

Added: labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java
===================================================================
--- labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java	                        (rev 0)
+++ labs/jbosstm/trunk/ArjunaCore/arjuna/tests/classes/com/hp/mwtests/ts/arjuna/recovery/RecoveryManagerStartStopTest.java	2009-07-01 15:37:19 UTC (rev 27390)
@@ -0,0 +1,196 @@
+package com.hp.mwtests.ts.arjuna.recovery;
+
+import junit.framework.TestCase;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import com.arjuna.ats.arjuna.recovery.RecoveryManager;
+import com.arjuna.ats.internal.arjuna.recovery.PeriodicRecovery;
+import com.arjuna.ats.internal.arjuna.recovery.Listener;
+
+import java.net.Socket;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * test to ensure that the recovery manager cleans up all its threads when terminated
+ */
+public class RecoveryManagerStartStopTest extends TestCase
+{
+    public static Test suite()
+    {
+        return new TestSuite(RecoveryManagerStartStopTest.class);
+    }
+
+    public void testStartStop() throws Exception
+    {
+        // check how many threads there are running
+
+        ThreadGroup thg = Thread.currentThread().getThreadGroup();
+        int activeCount = thg.activeCount();
+
+        dumpThreadGroup(thg, "Before recovery manager create");
+
+        RecoveryManager.delayRecoveryManagerThread() ;
+        RecoveryManager manager = RecoveryManager.manager(RecoveryManager.INDIRECT_MANAGEMENT);
+
+        dumpThreadGroup(thg, "Before recovery manager initialize");
+
+        manager.initialize();
+
+        // give threads a chance to start
+
+        Thread.sleep(2000);
+
+        dumpThreadGroup(thg, "Before recovery manager start periodic recovery thread");
+
+        manager.startRecoveryManagerThread();
+        
+        dumpThreadGroup(thg, "Before recovery manager client create");
+
+        // we need to open several connections to the recovery manager listener service and then
+        // ensure they get closed dowm
+
+        addRecoveryClient();
+        addRecoveryClient();
+
+        Thread.sleep(5000);
+
+        dumpThreadGroup(thg, "Before recovery manager terminate");
+
+        manager.terminate();
+
+        // ensure the client threads get killed
+        
+        ensureRecoveryClientsTerminated();
+
+        // ensure there are no extra threads running
+
+        Thread.sleep(2000);
+
+        dumpThreadGroup(thg, "After recovery manager terminate");
+
+        int newActiveCount = thg.activeCount();
+
+        assertEquals(activeCount, newActiveCount);
+    }
+
+    private void ensureRecoveryClientsTerminated() {
+        // check that any threads added to talk to the reocvery listener get their sockets closed
+
+        for (RecoveryManagerStartStopTestThread client : clients) {
+            try {
+                client.join();
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            assertFalse(client.failed());
+        }
+    }
+
+    private void addRecoveryClient() {
+        // open a connection to the recovery listener service in a new thread and ensure that the
+        // thread is terminated by habing its socket closed.
+
+        RecoveryManagerStartStopTestThread client = new RecoveryManagerStartStopTestThread();
+        clients.add(client);
+        client.start();
+    }
+
+    private void dumpThreadGroup(ThreadGroup thg, String header)
+    {
+        int activeCount = thg.activeCount();
+        Thread[] threads = new Thread[activeCount];
+        int reported = thg.enumerate(threads);
+
+        System.out.println(header);
+        System.out.println("Thread count == " + activeCount);
+        for (int i = 0; i < reported ; i++) {
+            System.out.println("Thread[" + i + "] == " + threads[i].getName());
+        }
+
+        System.out.flush();
+    }
+
+    private List<RecoveryManagerStartStopTestThread> clients = new ArrayList<RecoveryManagerStartStopTestThread>();
+
+    private static class RecoveryManagerStartStopTestThread extends Thread
+    {
+        private boolean failed = true;
+
+        public RecoveryManagerStartStopTestThread()
+        {
+            super("Recovery Listener Client");
+        }
+
+        public boolean failed()
+        {
+            return failed;
+        }
+
+        public void run() {
+            BufferedReader fromServer = null;
+            Socket connectorSocket = null;
+            // get a socket connected to the listener
+            // don't write anything just sit on a read until the socket is closed
+            try {
+                ServerSocket socket = PeriodicRecovery.getServerSocket();
+                InetAddress address;
+                String host;
+                int port;
+
+                address = socket.getInetAddress();
+
+                host = InetAddress.getLocalHost().getHostName();
+
+                port = PeriodicRecovery.getServerSocket().getLocalPort();
+
+                System.out.println("client atempting to connect to host " + host + " port " + port);
+                System.out.flush();
+
+                connectorSocket = new Socket(host, port);
+
+                System.out.println("connected!!!");
+                System.out.flush();
+
+                fromServer = new BufferedReader(new InputStreamReader(connectorSocket.getInputStream())) ;
+            } catch (Exception e) {
+                System.out.println("Failed to set up listener input stream!!!");
+                e.printStackTrace();
+                System.out.flush();
+
+                return;
+            }
+
+            try {
+                String result = fromServer.readLine();
+                if (result == null || result.equals("")) {
+                    System.out.flush();
+                    System.out.println("Recovery Listener Client got empty string from readline() as expected");
+                    failed = false;
+                }
+            } catch (IOException e) {
+                if (!connectorSocket.isClosed()) {
+                    System.out.println("Recovery Listener Client got IO exception without socket being closed");
+                    System.out.flush();
+                    e.printStackTrace();
+                    try {
+                        connectorSocket.close();
+                    } catch (IOException e1) {
+                        // ignore
+                    }
+                } else {
+                    System.out.flush();
+                    System.out.println("Recovery Listener Client got IO exception under readline() as expected");
+                    failed = false;
+                }
+            } catch (Exception e) {
+                System.out.println("Recovery Listener Client got non IO exception");
+                e.printStackTrace();
+                System.out.flush();
+            }
+        }
+    }
+}




More information about the jboss-svn-commits mailing list