[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ron Sigal ron_sigal at yahoo.com
Sat Feb 3 00:20:19 EST 2007


  User: rsigal  
  Date: 07/02/03 00:20:19

  Modified:    src/main/org/jboss/remoting/transport/socket 
                        SocketServerInvoker.java
  Log:
  Ovidiu's logging changes and reorganization: sync with remoting_2_x.
  
  Revision  Changes    Path
  1.34      +156 -131  JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketServerInvoker.java,v
  retrieving revision 1.33
  retrieving revision 1.34
  diff -u -b -r1.33 -r1.34
  --- SocketServerInvoker.java	27 Dec 2006 05:56:48 -0000	1.33
  +++ SocketServerInvoker.java	3 Feb 2007 05:20:19 -0000	1.34
  @@ -24,11 +24,13 @@
   
   import org.jboss.remoting.InvokerLocator;
   import org.jboss.remoting.ServerInvoker;
  -import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
   import org.jboss.remoting.util.TimerUtil;
  +import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
   import org.jboss.util.propertyeditor.PropertyEditors;
  +import org.jboss.logging.Logger;
   
   import javax.net.ServerSocketFactory;
  +import javax.net.ssl.SSLException;
   import java.io.IOException;
   import java.net.InetAddress;
   import java.net.ServerSocket;
  @@ -45,13 +47,17 @@
    *
    * @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
    * @author <a href="mailto:tom.elrod at jboss.com">Tom Elrod</a>
  - * @version $Revision: 1.33 $
  + * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  + *
  + * @version $Revision: 1.34 $
    * @jmx:mbean
    */
   public class SocketServerInvoker extends ServerInvoker implements Runnable, SocketServerInvokerMBean
   {
  -   private InetAddress addr;
  -   private int port;
  +   private static final Logger log = Logger.getLogger(SocketServerInvoker.class);
  +
  +   private static boolean trace = log.isTraceEnabled();
  +
      static int clientCount = 0;
   
      private Properties props = new Properties();
  @@ -89,12 +95,6 @@
      protected int idleTimeout = -1;
      protected IdleTimerTask idleTimerTask = null;
   
  -   /**
  -    * The logging trace level flag
  -    */
  -   protected boolean trace = false;
  -
  -
      public SocketServerInvoker(InvokerLocator locator)
      {
         super(locator);
  @@ -111,7 +111,8 @@
       * then a new ServerSocket is created that accepts the new connections
       * @param serverSocketFactory
    	*/
  -   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory){
  +   public void setNewServerSocketFactory(ServerSocketFactory serverSocketFactory)
  +   {
   	   newServerSocketFactory=true;
   	   setServerSocketFactory(serverSocketFactory);
      }
  @@ -121,17 +122,17 @@
       * creating a new ServerSocket from new ServerSocketFactory
       * @throws IOException
       */
  -   protected void refreshServerSocket() throws IOException{
  +   protected void refreshServerSocket() throws IOException
  +   {
   	   newServerSocketFactory=false;
   	   serverSocket.close();
  -	   serverSocket=null;
  +	   serverSocket = null;
   	   InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
   	   serverSocket = createServerSocket(getServerBindPort(), backlog, bindAddress);
         serverSocket.setReuseAddress(reuseAddress);
      }
   
  -   protected void setup()
  -         throws Exception
  +   protected void setup() throws Exception
      {
         props.putAll(getConfiguration());
         PropertyEditors.mapJavaBeanProperties(this, props, false);
  @@ -159,11 +160,10 @@
       */
      public synchronized void start() throws IOException
      {
  -
  -      trace = log.isTraceEnabled();
  -
         if(!running)
         {
  +         log.debug(this + " starting");
  +
            InetAddress bindAddress = InetAddress.getByName(getServerBindAddress());
   
            if(maxPoolSize <= 0)
  @@ -178,7 +178,8 @@
            }
            catch(IOException e)
            {
  -            log.error("Error starting ServerSocket.  Bind port: " + getServerBindPort() + ", bind address: " + bindAddress);
  +            log.error("Error starting ServerSocket.  Bind port: " + getServerBindPort() +
  +               ", bind address: " + bindAddress);
               throw e;
            }
   
  @@ -187,21 +188,18 @@
            threadpool = new LinkedList();
   
            acceptThreads = new Thread[numAcceptThreads];
  +
            for(int i = 0; i < numAcceptThreads; i++)
            {
  -            if(trace)
  -            {
  -               log.trace("Creating another AcceptThread");
  -            }
  +            if(trace) { log.trace(this + " creating another AcceptThread"); }
  +
               String name = getThreadName(i);
               acceptThreads[i] = new Thread(this, name);
  -            //acceptThreads[i].start();
  -            if(trace)
  -            {
  -               log.trace("Thread accepted");
  -            }
  +
  +            if(trace) { log.trace(this + " created and registered " + acceptThreads[i]); }
            }
         }
  +
         try
         {
            super.start();
  @@ -238,16 +236,20 @@
            }
         }
   
  +      log.debug(this + " started");
  +
      }
   
  -   protected ServerSocket createServerSocket(int serverBindPort, int backlog, InetAddress bindAddress) throws IOException
  +   protected ServerSocket createServerSocket(int serverBindPort,
  +                                             int backlog,
  +                                             InetAddress bindAddress) throws IOException
      {
         return getServerSocketFactory().createServerSocket(serverBindPort, backlog, bindAddress);
      }
   
      protected String getThreadName(int i)
      {
  -      return "SocketServerInvoker#" + i + "-" + getServerBindPort();
  +      return "AcceptorThread#" + i + ":" + getServerBindPort();
      }
   
      public void destroy()
  @@ -267,7 +269,6 @@
       */
      public synchronized void stop()
      {
  -
         if(running)
         {
            cleanup();
  @@ -294,10 +295,8 @@
            }
         }
   
  -/*
  - * The following code has been changed to avoid a race condition with ServerThread.run() which
  - * can result in leaving ServerThreads alive, which causes a memory leak.
  - */
  +      // The following code has been changed to avoid a race condition with ServerThread.run() which
  +      // can result in leaving ServerThreads alive, which causes a memory leak.
         if (clientpool != null)
         {
            synchronized (clientpool)
  @@ -342,7 +341,6 @@
      /**
       * Indicates if SO_REUSEADDR is enabled on server sockets
       * Default is true.
  -    * @return
       */
      public boolean getReuseAddress()
      {
  @@ -446,7 +444,6 @@
         }
      }
   
  -
      public int getIdleTimeout()
      {
         return idleTimeout;
  @@ -487,12 +484,9 @@
   
      public void run()
      {
  -      if(trace)
  -      {
  -         log.trace("Started execution of method run");
  -      }
  +      if(trace) { log.trace(this + " started execution of method run()"); }
   
  -      ServerSocketRefresh thread=new ServerSocketRefresh();
  +      ServerSocketRefresh thread = new ServerSocketRefresh();
         thread.start();
   
         try
  @@ -501,20 +495,23 @@
            {
               try
               {
  -               if(trace)
  -               {
  -                  log.trace("Socket is going to be accepted");
  -               }
  -
                  thread.release(); //goes on if serversocket refresh is completed
  +
  +               if(trace) { log.trace(this + " is going to wait on serverSocket.accept()"); }
  +
                  Socket socket = serverSocket.accept();
  -               if(trace)
  -               {
  -                  log.trace("Accepted: " + socket);
  -               }
  +
  +               if(trace) { log.trace(this + " accepted " + socket); }
  +
  +               // the acceptor thread should spend as little time as possbile doing any kind of
  +               // operation, and under no circumstances should perform IO on the new socket, which
  +               // can potentially block and lock up the server. For this reason, the acceptor thread
  +               // should grab a worker thread and delegate all subsequent work to it. This is what
  +               // processInvocation() does.
  +
                  processInvocation(socket);
               }
  -            catch (javax.net.ssl.SSLException e)
  +            catch (SSLException e)
               {
                  log.error("SSLServerSocket error", e);
                  return;
  @@ -523,7 +520,7 @@
               {
                  if(running)
                  {
  -                  log.error("Failed to accept socket connection", ex);
  +                  log.error(this + " failed to handle socket", ex);
                  }
               }
            }
  @@ -534,74 +531,79 @@
         }
      }
   
  -   protected void processInvocation(Socket socket)
  -         throws Exception
  +
  +   /**
  +    * The acceptor thread should spend as little time as possbile doing any kind of operation, and
  +    * under no circumstances should perform IO on the new socket, which can potentially block and
  +    * lock up the server. For this reason, the acceptor thread should grab a worker thread and
  +    * delegate all subsequent work to it.
  +    */
  +   protected void processInvocation(Socket socket) throws Exception
      {
  -      ServerThread thread = null;
  +      ServerThread worker = null;
         boolean newThread = false;
   
  -      while(thread == null)
  +      while(worker == null)
         {
  -         if(log.isTraceEnabled())
  -         {
  -            log.trace("try to get a thread for processing");
  -         }
  +         if(trace) { log.trace(this + " trying to get a worker thread from threadpool for processing"); }
  +
            synchronized(threadpool)
            {
               if(threadpool.size() > 0)
               {
  -               thread = (ServerThread) threadpool.removeFirst();
  -               if(log.isTraceEnabled())
  -               {
  -                  log.trace("Got thread for processing - " + thread);
  -               }
  +               worker = (ServerThread)threadpool.removeFirst();
  +
  +               if(trace) { log.trace(this + (worker == null ? " found NO threads in threadpool" : " got " + worker + " from threadpool")); }
               }
  +            else if (trace) { { log.trace(this + " has an empty threadpool"); } }
            }
  -         if(thread == null)
  +
  +         if(worker == null)
            {
               synchronized(clientpool)
               {
                  if(clientpool.size() < maxPoolSize)
                  {
  -                  thread = new ServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
  +                  if(trace) { log.trace(this + " creating new worker thread"); }
  +
  +                  worker = new ServerThread(socket, this, clientpool, threadpool,
  +                                            getTimeout(), serverSocketClass);
  +
  +                  if(trace) { log.trace(this + " created " + worker); }
  +
                     newThread = true;
                  }
  -               if(thread == null)
  +
  +               if(worker == null)
                  {
  +                  if(trace) {log.trace(this + " trying to evict a thread from clientpool"); }
  +
                     clientpool.evict();
  -                  if(trace)
  -                  {
  -                     log.trace("Waiting for a thread...");
  -                  }
  +
  +                  if(trace) {log.trace(this + " waiting for a thread from clientpool"); }
  +
                     clientpool.wait();
  -                  if(trace)
  -                  {
  -                     log.trace("Notified of available thread");
  -                  }
  +
  +                  if(trace) { log.trace(this + " notified of clientpool thread availability"); }
                  }
               }
            }
         }
  +
         synchronized(clientpool)
         {
  -         clientpool.insert(thread, thread);
  +         clientpool.insert(worker, worker);
         }
   
         if(newThread)
         {
  -         if(trace)
  -         {
  -            log.trace("Created a new thread, t=" + thread);
  -         }
  -         thread.start();
  +         if(trace) {log.trace(this + " starting " + worker); }
  +         worker.start();
         }
         else
         {
  -         if(trace)
  -         {
  -            log.trace("Reusing thread t=" + thread);
  -         }
  -         thread.wakeup(socket, getTimeout(), locator.getParameters());
  +         if(trace) { log.trace(this + " reusing " + worker); }
  +         worker.wakeup(socket, getTimeout(), this);
         }
      }
   
  @@ -609,20 +611,25 @@
       * returns true if the transport is bi-directional in nature, for example,
       * SOAP in unidirectional and SOCKETs are bi-directional (unless behind a firewall
       * for example).
  -    *
  -    * @return
       */
      public boolean isTransportBiDirectional()
      {
         return true;
      }
   
  +   public String toString()
  +   {
  +      return "SocketServerInvoker[" +
  +         (serverSocket == null ?
  +            "UNINITIALIZED" :
  +            serverSocket.getInetAddress().getHostAddress() + ":" + serverSocket.getLocalPort()) +
  +         "]";
  +   }
  +
      /**
       * Each implementation of the remote client invoker should have
       * a default data type that is uses in the case it is not specified
       * in the invoker locator uri.
  -    *
  -    * @return
       */
      protected String getDefaultDataType()
      {
  @@ -635,33 +642,42 @@
       * @author Michael Voss
       *
       */
  -   public class ServerSocketRefresh extends Thread{
  +   public class ServerSocketRefresh extends Thread
  +   {
   	   boolean serverSocketLocked=false;
   
          public ServerSocketRefresh()
          {
             super("ServerSocketRefresh");
          }
  -	   public void run(){
  - 		  while(true){
  +
  +      public void run()
  +      {
  +         while(true)
  +         {
        		  if(isInterrupted())break;
  -     		  if(newServerSocketFactory){
  +            if(newServerSocketFactory)
  +            {
        			  log.debug("got notice about new ServerSocketFactory");
        			  serverSocketLocked=true;
  -				  try {
  +               try
  +               {
   					  log.debug("refreshing server socket");
   					  refreshServerSocket();
  -				  } catch (IOException e) {
  +               } catch (IOException e)
  +               {
   					  log.debug("could not refresh server socket");
   					  log.debug("message is: "+e.getMessage());
   				  }
   				  log.debug("server socket refreshed");
   				  serverSocketLocked=false;
  -
        		  }
  -     		  try {
  +            try
  +            {
        			  Thread.sleep(10000);
  -     		  } catch (InterruptedException e) {
  +            }
  +            catch (InterruptedException e)
  +            {
   				  interrupt();
        		  }
    		  }
  @@ -670,9 +686,18 @@
   	/**
   	 * let the run() method resume when refresh is completed
   	 */
  -	public void release(){
  - 		  while (serverSocketLocked){
  - 			  try{Thread.sleep(1000);}catch(InterruptedException ignored){}
  +      public void release()
  +      {
  +         while (serverSocketLocked)
  +         {
  +            try
  +            {
  +               Thread.sleep(1000);
  +            }
  +            catch(InterruptedException ignored)
  +            {
  +
  +            }
    		  }
    	  }
      }
  
  
  



More information about the jboss-cvs-commits mailing list