[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ron Sigal ron_sigal at yahoo.com
Mon Nov 12 01:48:39 EST 2007


  User: rsigal  
  Date: 07/11/12 01:48:39

  Modified:    src/main/org/jboss/remoting/transport/socket   Tag:
                        remoting_2_x SocketServerInvoker.java
                        ServerThread.java
  Log:
  JBREM-807, JBREM-821:  Reorganized synchronization during wakeup and shutdown.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.30.2.14 +22 -34    JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
  retrieving revision 1.30.2.13
  retrieving revision 1.30.2.14
  diff -u -b -r1.30.2.13 -r1.30.2.14
  --- SocketServerInvoker.java	30 Aug 2007 17:48:38 -0000	1.30.2.13
  +++ SocketServerInvoker.java	12 Nov 2007 06:48:39 -0000	1.30.2.14
  @@ -49,7 +49,7 @@
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.30.2.13 $
  + * @version $Revision: 1.30.2.14 $
    * @jmx:mbean
    */
   public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
  @@ -297,6 +297,14 @@
      {
         running = false;
   
  +      try
  +      {
  +         serverSocket.close();
  +      }
  +      catch(Exception e)
  +      {
  +      }
  +
         maxPoolSize = 0; // so ServerThreads don't reinsert themselves
         if(acceptThreads != null)
         {
  @@ -333,8 +341,6 @@
   
               if (threadpool != null)
               {
  -               synchronized(threadpool)
  -               {
                     int threadsToShutdown = threadpool.size();
                     for(int i = 0; i < threadsToShutdown; i++)
                     {
  @@ -344,15 +350,8 @@
                  }
               }
            }
  -      }
   
  -      try
  -      {
  -         serverSocket.close();
  -      }
  -      catch(Exception e)
  -      {
  -      }
  +      log.debug(this + " exiting");
      }
   
      /**
  @@ -555,6 +554,7 @@
         finally
         {
            thread.shutdown();
  +         log.debug(this + "shut down");
         }
      }
   
  @@ -570,55 +570,42 @@
         ServerThread worker = null;
         boolean newThread = false;
   
  -      while(worker == null)
  +      synchronized(clientpool)
  +      {
  +         while(worker == null && running)
         {
            if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
   
  -         synchronized(threadpool)
  -         {
               if(threadpool.size() > 0)
               {
                  worker = (ServerThread)threadpool.removeFirst();
  -
                  if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
               }
               else if (trace) { { log.trace(this + " has an empty threadpool"); } }
  -         }
   
            if(worker == null)
            {
  -            synchronized(clientpool)
  -            {
                  if(clientpool.size() < maxPoolSize)
                  {
                     if(trace) { log.trace(this + " creating new worker thread"); }
   
                     worker = new ServerThread(socket, this, clientpool, threadpool,
                                               getTimeout(), serverSocketClass);
  -
                     if(trace) { log.trace(this + " created " + worker); }
  -
                     newThread = true;
                  }
   
                  if(worker == null)
                  {
                     if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
  -
                     clientpool.evict();
  -
                     if(trace) {log.trace(this + " waiting for a thread from clientpool"); }
  -
                     clientpool.wait();
  -
                     if(trace) { log.trace(this + " notified of clientpool thread availability"); }
                  }
               }
            }
  -      }
   
  -      synchronized(clientpool)
  -      {
            clientpool.insert(worker, worker);
         }
   
  @@ -709,6 +696,7 @@
                  }
               }
            }
  +         log.debug("ServerSocketRefresh shutting down");
         }
   
         /**
  
  
  
  1.29.2.25 +70 -43    JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerThread.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
  retrieving revision 1.29.2.24
  retrieving revision 1.29.2.25
  diff -u -b -r1.29.2.24 -r1.29.2.25
  --- ServerThread.java	30 Aug 2007 17:35:14 -0000	1.29.2.24
  +++ ServerThread.java	12 Nov 2007 06:48:39 -0000	1.29.2.25
  @@ -68,7 +68,7 @@
    * @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.29.2.24 $
  + * @version $Revision: 1.29.2.25 $
    */
   public class ServerThread extends Thread
   {
  @@ -175,35 +175,41 @@
               //
               // If both occur around the same time, a problem arises.  If a ServerThread starts to
               // shut down because the client shut down, it will test shutdown, and if it gets to the
  -            // test before SocketServerInvoker.cleanup() calls ServerThread.stop() to set shutdown
  +            // test before SocketServerInvoker.cleanup() calls ServerThread.shutdown() to set shutdown
               // to true, it will return itself to threadpool.  If it moves from clientpool to
               // threadpool at just the right time, SocketServerInvoker could miss it in both places
  -            // and never call stop(), leaving it alive, resulting in a memory leak.  The solution is
  +            // and never call shutdown(), leaving it alive, resulting in a memory leak.  The solution is
               // to synchronize parts of ServerThread.run() and SocketServerInvoker.cleanup() so that
               // they interact atomically.
   
  -            synchronized (this)
  -            {
                  synchronized (clientpool)
                  {
  -                  synchronized (threadpool)
  -                  {
  +               if(trace) { log.trace(this + " removing itself from clientpool"); }
  +               clientpool.remove(this);
  +               clientpool.notifyAll();
  +               
                        if (shutdown)
                        {
  +                  if (trace) log.trace(this + " exiting");
                           invoker = null;
                           return; // exit thread
                        }
                        else
                        {
  -                        if(trace) { log.trace(this + " removing itself from clientpool and going to threadpool"); }
  -                        clientpool.remove(this);
  +                  if(trace) { log.trace(this + " returning itself to threadpool"); }
                           threadpool.add(this);
                           Thread.interrupted(); // clear any interruption so that we can be pooled.
  -                        clientpool.notify();
  -                     }
                     }
                  }
   
  +            synchronized (this)
  +            {
  +               // If running == true, then SocketServerInvoker has already removed this
  +               // ServerThread from threadpool and called wakeup(), in which case run()
  +               // should continue immediately.
  +               if (running)
  +                  continue;
  +               
                  while (true)
                  {
                     try
  @@ -214,15 +220,17 @@
   
                        if(trace) { log.trace(this + " woke up after wait"); }
                        
  -                     break;
  -                  }
  -                  catch (InterruptedException e)
  -                  {
                        if (shutdown)
                        {
                           invoker = null;
  +                        if (trace) log.trace(this + " exiting");
                           return; // exit thread
                        }
  +
  +                     break;
  +                  }
  +                  catch (InterruptedException e)
  +                  {
                     }
                  }
               }
  @@ -259,7 +267,7 @@
         return lastRequestHandledTimestamp;
      }
   
  -   public void shutdown()
  +   public synchronized void shutdown()
      {
         shutdown = true;
         running = false;
  @@ -270,17 +278,25 @@
         // NOTE ALSO!: Shutdown should never be synchronized. We don't want to hold up accept()
         // thread! (via LRUpool)
   
  -      if (!handlingResponse)
  +      synchronized (this)
         {
            try
            {
  -            this.interrupt();
  -            Thread.interrupted(); // clear
  +            if (socketWrapper != null)
  +            {
  +               log.debug(this + " closing socketWrapper: " + socketWrapper);
  +               socketWrapper.close();
            }
  -         catch (Exception ignored)
  +         }
  +         catch (Exception ex)
            {
  +            log.debug("failed to close socket wrapper", ex);
            }
  +         socketWrapper = null;
         }
  +      
  +      if (trace) log.trace(this + " shutting down");
  +      notifyAll();
      }
   
      /**
  @@ -478,6 +494,8 @@
   //         log.debug("failed to close in/out", ex);
   //      }
   
  +      synchronized (this)
  +      {
         try
         {
            if (socketWrapper != null)
  @@ -492,6 +510,7 @@
         }
         socketWrapper = null;
      }
  +   }
   
      protected void processInvocation(SocketWrapper socketWrapper) throws Exception
      {
  @@ -532,6 +551,14 @@
            }
         }
   
  +      completeProcessInvocation(inputStream, performVersioning, version);
  +   }
  +   
  +   protected synchronized void completeProcessInvocation(InputStream inputStream,
  +                                                         boolean performVersioning,
  +                                                         int version)
  +   throws Exception
  +   {
         Object obj = versionedRead(inputStream, invoker, getClass().getClassLoader(), version);
   
         // setting timestamp since about to start processing
  
  
  



More information about the jboss-cvs-commits mailing list