[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ovidiu Feodorov ovidiu.feodorov at jboss.com
Tue Jan 16 03:15:05 EST 2007


  User: ovidiu  
  Date: 07/01/16 03:15:05

  Modified:    src/main/org/jboss/remoting/transport/socket    Tag:
                        remoting_2_x MicroSocketClientInvoker.java
                        ServerThread.java SocketServerInvoker.java
  Log:
  flushing minor changes (trivial refactoring, logging improvments) before attempting a fix for http://jira.jboss.org/jira/browse/JBREM-666
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.16.2.6  +5 -5      JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MicroSocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
  retrieving revision 1.16.2.5
  retrieving revision 1.16.2.6
  diff -u -b -r1.16.2.5 -r1.16.2.6
  --- MicroSocketClientInvoker.java	13 Jan 2007 12:42:36 -0000	1.16.2.5
  +++ MicroSocketClientInvoker.java	16 Jan 2007 08:15:05 -0000	1.16.2.6
  @@ -32,7 +32,7 @@
    *
    * @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
  - * @version $Revision: 1.16.2.5 $
  + * @version $Revision: 1.16.2.6 $
    */
   public class MicroSocketClientInvoker extends RemoteClientInvoker
   {
  @@ -190,7 +190,7 @@
               try
               {
                  enableTcpNoDelay = Boolean.valueOf((String) val).booleanValue();
  -               log.debug("Setting SocketClientInvoker::enableTcpNoDelay to: " + enableTcpNoDelay);
  +               log.debug(this + " setting SocketClientInvoker::enableTcpNoDelay to " + enableTcpNoDelay);
               }
               catch (Exception e)
               {
  @@ -204,11 +204,11 @@
               try
               {
                  maxPoolSize = Integer.valueOf((String)val).intValue();
  -               log.debug("Setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
  +               log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
               }
               catch (Exception e)
               {
  -               log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value.");
  +               log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
               }
            }
            // look for client socket class name
  @@ -219,7 +219,7 @@
               if (value.length() > 0)
               {
                  clientSocketClassName = value;
  -               log.debug("Setting ClientSocket class name to: " + clientSocketClassName);
  +               log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
               }
            }
   
  
  
  
  1.29.2.8  +14 -14    JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerThread.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerThread.java,v
  retrieving revision 1.29.2.7
  retrieving revision 1.29.2.8
  diff -u -b -r1.29.2.7 -r1.29.2.8
  --- ServerThread.java	16 Jan 2007 00:29:29 -0000	1.29.2.7
  +++ ServerThread.java	16 Jan 2007 08:15:05 -0000	1.29.2.8
  @@ -61,7 +61,7 @@
    * @author <a href="mailto:tom at jboss.org">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.29.2.7 $
  + * @version $Revision: 1.29.2.8 $
    */
   public class ServerThread extends Thread
   {
  @@ -115,20 +115,15 @@
         setName(getWorkerThreadName(socket));
   
         this.serverSocketClassName = serverSocketClassName;
  -      this.socketWrapper = createServerSocket(socket, timeout, invoker.getLocator().getParameters());
  +      this.socketWrapper = createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
         this.invoker = invoker;
         this.clientpool = clientpool;
         this.threadpool = threadpool;
   
  -      init();
  -   }
  -
  -   private void init()
  -   {
         if (invoker != null)
         {
            Map configMap = invoker.getConfiguration();
  -         String checkValue = (String) configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
  +         String checkValue = (String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
            if (checkValue != null && checkValue.length() > 0)
            {
               shouldCheckConnection = Boolean.valueOf(checkValue).booleanValue();
  @@ -188,7 +183,8 @@
         return this.shouldCheckConnection;
      }
   
  -   private SocketWrapper createServerSocket(Socket socket, int timeout, Map metadata) throws Exception
  +   private SocketWrapper createServerSocketWrapper(Socket socket, int timeout, Map metadata)
  +      throws Exception
      {
         if (serverSocketConstructor == null)
         {
  @@ -199,7 +195,8 @@
   
            try
            {
  -            serverSocketConstructor = serverSocketClass.getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
  +            serverSocketConstructor = serverSocketClass.
  +               getConstructor(new Class[]{Socket.class, Map.class, Integer.class});
            }
            catch (NoSuchMethodException e)
            {
  @@ -209,13 +206,16 @@
         }
   
         SocketWrapper serverSocketWrapper = null;
  +
         if (serverSocketConstructor.getParameterTypes().length == 3)
         {
  -         serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket, metadata, new Integer(timeout)});
  +         serverSocketWrapper = (SocketWrapper)serverSocketConstructor.
  +            newInstance(new Object[]{socket, metadata, new Integer(timeout)});
         }
         else
         {
  -         serverSocketWrapper = (SocketWrapper) serverSocketConstructor.newInstance(new Object[]{socket});
  +         serverSocketWrapper =
  +            (SocketWrapper)serverSocketConstructor.newInstance(new Object[]{socket});
            serverSocketWrapper.setTimeout(timeout);
         }
   
  @@ -273,11 +273,11 @@
         // rename the worker thread to reflect the new socket it is handling
         setName(getWorkerThreadName(socket));
   
  -      this.socketWrapper = createServerSocket(socket, timeout, metadata);
  +      socketWrapper = createServerSocketWrapper(socket, timeout, metadata);
   
         running = true;
         handlingResponse = true;
  -      this.notify();
  +      notify();
   
         if(trace) { log.trace(this + " has woken up"); }
      }
  
  
  
  1.30.2.7  +110 -84   JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
  retrieving revision 1.30.2.6
  retrieving revision 1.30.2.7
  diff -u -b -r1.30.2.6 -r1.30.2.7
  --- SocketServerInvoker.java	16 Jan 2007 00:29:29 -0000	1.30.2.6
  +++ SocketServerInvoker.java	16 Jan 2007 08:15:05 -0000	1.30.2.7
  @@ -49,13 +49,15 @@
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.30.2.6 $
  + * @version $Revision: 1.30.2.7 $
    * @jmx:mbean
    */
   public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
   {
      private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
   
  +   private static boolean trace = log.isTraceEnabled();
  +
      static int clientCount = 0;
   
      private Properties props = new Properties();
  @@ -93,12 +95,6 @@
      protected int idleTimeout = -1;
      protected IdleTimerTask idleTimerTask = null;
   
  -   /**
  -    * The logging trace level flag
  -    */
  -   protected boolean trace = false;
  -
  -
      public SocketServerInvoker(InvokerLocator locator)
      {
         super(locator);
  @@ -115,7 +111,8 @@
       * then a new ServerSocket is created that accepts the new connections
       * @param serverSocketFactory
    	*/
  -   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory){
  +   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
  +   {
   	   newServerSocketFactory=true;
   	   setServerSocketFactory(serverSocketFactory);
      }
  @@ -135,8 +132,7 @@
         serverSocket.setReuseAddress(reuseAddress);
      }
   
  -   protected void setup()
  -         throws Exception
  +   protected void setup() throws Exception
      {
         props.putAll(getConfiguration());
         PropertyEditors.mapJavaBeanProperties(this, props, false);
  @@ -164,11 +160,10 @@
       */
      public synchronized void start() throws IOException
      {
  -
  -      trace = log.isTraceEnabled();
  -
         if(!running)
         {
  +         log.debug(this + " starting");
  +
            InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
   
            if(maxPoolSize <= 0)
  @@ -183,7 +178,8 @@
            }
            catch(IOException e)
            {
  -            log.error("Error starting ServerSocket.  Bind port: " + getServerBindPort() + ", bind address: " + bindAddress);
  +            log.error("Error starting ServerSocket.  Bind port: " + getServerBindPort() +
  +               ", bind address: " + bindAddress);
               throw e;
            }
   
  @@ -192,21 +188,18 @@
            threadpool = new LinkedList();
   
            acceptThreads = new Thread[numAcceptThreads];
  +
            for(int i = 0; i < numAcceptThreads; i++)
            {
  -            if(trace)
  -            {
  -               log.trace("Creating another AcceptThread");
  -            }
  +            if(trace) { log.trace(this + " creating another AcceptThread"); }
  +
               String name = getThreadName(i);
               acceptThreads[i] = new Thread(this, name);
  -            //acceptThreads[i].start();
  -            if(trace)
  -            {
  -               log.trace("Thread accepted");
  -            }
  +
  +            if(trace) { log.trace(this + " created and registered " + acceptThreads[i]); }
            }
         }
  +
         try
         {
            super.start();
  @@ -243,9 +236,13 @@
            }
         }
   
  +      log.debug(this + " started");
  +
      }
   
  -   protected ServerSocket createServerSocket(int serverBindPort, int backlog, InetAddress bindAddress) throws IOException
  +   protected ServerSocket createServerSocket(int serverBindPort,
  +                                             int backlog,
  +                                             InetAddress bindAddress) throws IOException
      {
         return getServerSocketFactory().createServerSocket(serverBindPort, backlog, bindAddress);
      }
  @@ -272,7 +269,6 @@
       */
      public synchronized void stop()
      {
  -
         if(running)
         {
            cleanup();
  @@ -299,10 +295,8 @@
            }
         }
   
  -/*
  - * The following code has been changed to avoid a race condition with ServerThread.run() which
  - * can result in leaving ServerThreads alive, which causes a memory leak.
  - */
  +      // The following code has been changed to avoid a race condition with ServerThread.run() which
  +      // can result in leaving ServerThreads alive, which causes a memory leak.
         if (clientpool != null)
         {
            synchronized (clientpool)
  @@ -450,7 +444,6 @@
         }
      }
   
  -
      public int getIdleTimeout()
      {
         return idleTimeout;
  @@ -510,6 +503,12 @@
   
                  if(trace) { log.trace(this + " accepted " + socket); }
   
  +               // the acceptor thread should spend as little time as possbile doing any kind of
  +               // operation, and under no circumstances should perform IO on the new socket, which
  +               // can potentially block and lock up the server. For this reason, the acceptor thread
  +               // should grab a worker thread and delegate all subsequent work to it. This is what
  +               // processInvocation() does.
  +
                  processInvocation(socket);
               }
               catch (SSLException e)
  @@ -532,27 +531,34 @@
         }
      }
   
  +
  +   /**
  +    * The acceptor thread should spend as little time as possbile doing any kind of operation, and
  +    * under no circumstances should perform IO on the new socket, which can potentially block and
  +    * lock up the server. For this reason, the acceptor thread should grab a worker thread and
  +    * delegate all subsequent work to it.
  +    */
      protected void processInvocation(Socket socket) throws Exception
      {
  -      ServerThread thread = null;
  +      ServerThread worker = null;
         boolean newThread = false;
   
  -      while(thread == null)
  +      while(worker == null)
         {
  -         if(trace) { log.trace(this + " trying to get a worker thread for processing"); }
  +         if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
   
            synchronized(threadpool)
            {
               if(threadpool.size() > 0)
               {
  -               thread = (ServerThread)threadpool.removeFirst();
  +               worker = (ServerThread)threadpool.removeFirst();
   
  -               if(trace) { log.trace(this + (thread == null ? " found NO threads in pool" : " got " + thread + " from pool")); }
  +               if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
               }
  -            else if (trace) { { log.trace(this + " has an empty thread pool"); } }
  +            else if (trace) { { log.trace(this + " has an empty threadpool"); } }
            }
   
  -         if(thread == null)
  +         if(worker == null)
            {
               synchronized(clientpool)
               {
  @@ -560,16 +566,18 @@
                  {
                     if(trace) { log.trace(this + " creating new worker thread"); }
   
  -                  thread = new ServerThread(socket, this, clientpool, threadpool,
  +                  worker = new ServerThread(socket, this, clientpool, threadpool,
                                               getTimeout(), serverSocketClass);
   
  -                  if(trace) { log.trace(this + " created " + thread); }
  +                  if(trace) { log.trace(this + " created " + worker); }
   
                     newThread = true;
                  }
   
  -               if(thread == null)
  +               if(worker == null)
                  {
  +                  if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
  +
                     clientpool.evict();
   
                     if(trace) {log.trace(this + " waiting for a thread from clientpool"); }
  @@ -584,18 +592,18 @@
   
         synchronized(clientpool)
         {
  -         clientpool.insert(thread, thread);
  +         clientpool.insert(worker, worker);
         }
   
         if(newThread)
         {
  -         if(trace) {log.trace(this + " starting " + thread); }
  -         thread.start();
  +         if(trace) {log.trace(this + " starting " + worker); }
  +         worker.start();
         }
         else
         {
  -         if(trace) { log.trace(this + " reusing " + thread); }
  -         thread.wakeup(socket, getTimeout(), locator.getParameters());
  +         if(trace) { log.trace(this + " reusing " + worker); }
  +         worker.wakeup(socket, getTimeout(), locator.getParameters());
         }
      }
   
  @@ -634,33 +642,42 @@
       * @author Michael Voss
       *
       */
  -   public class ServerSocketRefresh extends Thread{
  +   public class ServerSocketRefresh extends Thread
  +   {
   	   boolean serverSocketLocked=false;
   
          public ServerSocketRefresh()
          {
             super("ServerSocketRefresh");
          }
  -	   public void run(){
  - 		  while(true){
  +
  +      public void run()
  +      {
  +         while(true)
  +         {
        		  if(isInterrupted())break;
  -     		  if(newServerSocketFactory){
  +            if(newServerSocketFactory)
  +            {
        			  log.debug("got notice about new ServerSocketFactory");
        			  serverSocketLocked=true;
  -				  try {
  +               try
  +               {
   					  log.debug("refreshing server socket");
   					  refreshServerSocket();
  -				  } catch (IOException e) {
  +               } catch (IOException e)
  +               {
   					  log.debug("could not refresh server socket");
   					  log.debug("message is: "+e.getMessage());
   				  }
   				  log.debug("server socket refreshed");
   				  serverSocketLocked=false;
  -
        		  }
  -     		  try {
  +            try
  +            {
        			  Thread.sleep(10000);
  -     		  } catch (InterruptedException e) {
  +            }
  +            catch (InterruptedException e)
  +            {
   				  interrupt();
        		  }
    		  }
  @@ -669,9 +686,18 @@
   	/**
   	 * let the run() method resume when refresh is completed
   	 */
  -	public void release(){
  - 		  while (serverSocketLocked){
  - 			  try{Thread.sleep(1000);}catch(InterruptedException ignored){}
  +      public void release()
  +      {
  +         while (serverSocketLocked)
  +         {
  +            try
  +            {
  +               Thread.sleep(1000);
  +            }
  +            catch(InterruptedException ignored)
  +            {
  +
  +            }
    		  }
    	  }
      }
  
  
  



More information about the jboss-cvs-commits mailing list