[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ovidiu Feodorov ovidiu.feodorov at jboss.com
Mon Jan 29 02:10:00 EST 2007


  User: ovidiu  
  Date: 07/01/29 02:10:00

  Modified:    src/main/org/jboss/remoting/transport/socket     Tag:
                        remoting_2_x MicroSocketClientInvoker.java
                        ServerSocketWrapper.java SocketClientInvoker.java
                        SocketWrapper.java
  Log:
  minor refactoring and logging improvments while trying to fix http://jira.jboss.org/jira/browse/JBREM-691
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.16.2.15 +396 -373  JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MicroSocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
  retrieving revision 1.16.2.14
  retrieving revision 1.16.2.15
  diff -u -b -r1.16.2.14 -r1.16.2.15
  --- MicroSocketClientInvoker.java	29 Jan 2007 05:27:53 -0000	1.16.2.14
  +++ MicroSocketClientInvoker.java	29 Jan 2007 07:10:00 -0000	1.16.2.15
  @@ -28,40 +28,40 @@
   import java.util.Properties;
   
   /**
  - * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which
  - * must be a SocketServerInvoker.
  + * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
  + * a SocketServerInvoker.
    *
    * @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
  - * @version $Revision: 1.16.2.14 $
  + * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  + *
  + * @version $Revision: 1.16.2.15 $
    */
   public class MicroSocketClientInvoker extends RemoteClientInvoker
   {
  -   private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
  -   private static final boolean trace = log.isTraceEnabled();
  +   // Constants ------------------------------------------------------------------------------------
   
  -   protected InetAddress addr;
  -   protected int port;
  +   private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
   
      /**
  -    * can be either true or false and will indicate if client socket should have TCP_NODELAY turned on or off.
  -    * TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
  +    * Can be either true or false and will indicate if client socket should have TCP_NODELAY turned
  +    * on or off. TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
       * It should only be set for applications that send frequent small bursts of information without
  -    * getting an immediate response; where timely delivery of data is required
  -    * (the canonical example is mouse movements). The default is false.
  +    * getting an immediate response; where timely delivery of data is required (the canonical
  +    * example is mouse movements). The default is false.
       */
      public static final String TCP_NODELAY_FLAG = "enableTcpNoDelay";
   
      /**
  -    * the client side maximum number of threads. The default is MAX_POOL_SIZE.
  +    * The client side maximum number of threads. The default is MAX_POOL_SIZE.
       */
      public static final String MAX_POOL_SIZE_FLAG = "clientMaxPoolSize";
   
      /**
  -    * Specifies the fully qualified class name for the custom SocketWrapper implementation
  -    * to use on the client. Note, will need to make sure this is marked as a client
  -    * parameter (using the 'isParam' attribute). Making this change will not
  -    * affect the marshaller/unmarshaller that is used, which may also be a requirement.
  +    * Specifies the fully qualified class name for the custom SocketWrapper implementation to use on
  +    * the client. Note, will need to make sure this is marked as a client parameter (using the
  +    * 'isParam' attribute). Making this change will not affect the marshaller/unmarshaller that is
  +    * used, which may also be a requirement.
       */
      public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
   
  @@ -71,11 +71,34 @@
      public static final boolean TCP_NODELAY_DEFAULT = false;
   
      /**
  -    * Indicates if will check the socket connection when
  -    * getting from pool by sending byte over the connection
  -    * to validate is still good.
  +    * Default maximum number of retries to get a valid socket from the* socket pool. This also
  +    * translates to number of seconds will wait for connection to be returned to connection pool
  +    * before erroring. Default is 30.
       */
  -   protected boolean shouldCheckConnection = false;
  +   public static final int MAX_RETRIES = 30;
  +
  +   /**
  +    * Default maximum number of times a invocation will be made when it gets a SocketException.
  +    * Default is 3.
  +    */
  +   public static final int MAX_CALL_RETRIES = 3;
  +
  +   /**
  +    * Default maximum number of socket connections allowed at any point in time. Default is 50.
  +    */
  +   public static final int MAX_POOL_SIZE = 50;
  +
  +
  +   // Static ---------------------------------------------------------------------------------------
  +
  +   private static boolean trace = log.isTraceEnabled();
  +
  +   /**
  +    * Used for debugging (tracing) connections leaks
  +    */
  +   static int counter = 0;
  +
  +   protected static final Map connectionPools = new HashMap();
   
      // Performance measurements
      public static long getSocketTime = 0;
  @@ -85,60 +108,102 @@
      public static long deserializeTime = 0;
   
      /**
  -    * If the TcpNoDelay option should be used on the socket.
  +    * Close all sockets in a specific pool.
       */
  -   protected boolean enableTcpNoDelay = TCP_NODELAY_DEFAULT;
  -
  -   protected String clientSocketClassName = ClientSocketWrapper.class.getName();
  -   private Constructor clientSocketConstructor = null;
  -   protected Class clientSocketClass = null;
  +   public static void clearPool(ServerAddress sa)
  +   {
  +      try
  +      {
  +         LinkedList thepool = (LinkedList)connectionPools.get(sa);
  +         if (thepool == null)
  +         {
  +            return;
  +         }
  +         synchronized (thepool)
  +         {
  +            int size = thepool.size();
  +            for (int i = 0; i < size; i++)
  +            {
  +               SocketWrapper socketWrapper = (SocketWrapper)thepool.removeFirst();
  +               try
  +               {
  +                  socketWrapper.close();
  +                  socketWrapper = null;
  +               }
  +               catch (Exception ignored)
  +               {
  +               }
  +            }
  +         }
  +      }
  +      catch (Exception ex)
  +      {
  +         log.debug("Failure", ex);
  +      }
  +   }
   
      /**
  -    * Default max number of retries to get a valid socket from the
  -    * socket pool.  This also translates to number of seconds will wait
  -    * for connection to be returned to connection pool before erroring. Default is 30.
  +    * Close all sockets in all pools.
       */
  -   public static final int MAX_RETRIES = 30;
  -   public long usedPooled = 0;
  -   public Object usedPoolLock = new Object();
  +   public static void clearPools()
  +   {
  +      synchronized (connectionPools)
  +      {
  +         for(Iterator i = connectionPools.keySet().iterator(); i.hasNext();)
  +         {
  +            ServerAddress sa = (ServerAddress) i.next();
   
  -   protected int numberOfRetries = MAX_RETRIES;
  +            if (trace) { log.trace("clearing pool for " + sa); }
  +            clearPool(sa);
  +         }
  +      }
  +   }
   
  -   /**
  -    * Default max number of times a invocation will be made
  -    * when it gets a SocketException.  Default is 3.
  -    */
  -   public static final int MAX_CALL_RETRIES = 3;
  +   // Attributes -----------------------------------------------------------------------------------
   
  -   protected int numberOfCallRetries = MAX_CALL_RETRIES;
  +   private Constructor clientSocketConstructor;
  +   private boolean reuseAddress;
  +
  +   protected InetAddress addr;
  +   protected int port;
  +
  +   // flag being set on true by a disconnect request. If trying to create a connection goes on in a
  +   // loop and a disconnect request arrives, this flag will be used to sent this information into
  +   // the connect loop
  +   private volatile boolean bailOut;
   
      /**
  -    * Pool for this invoker.  This is shared between all
  -    * instances of proxies attached to a specific invoker
  +    * Indicates if will check the socket connection when getting from pool by sending byte over the
  +    * connection to validate is still good.
       */
  -   protected LinkedList pool = null;
  +   protected boolean shouldCheckConnection;
   
      /**
  -    * connection information
  +    * If the TcpNoDelay option should be used on the socket.
       */
  -   protected ServerAddress address;
  +   protected boolean enableTcpNoDelay;
   
  -   protected static final Map connectionPools = new HashMap();
  +   protected String clientSocketClassName;
  +   protected Class clientSocketClass;
  +   protected int numberOfRetries;
  +   protected int numberOfCallRetries;
  +   protected int maxPoolSize;
   
      /**
  -    * Default max number of socket connections allowed at any
  -    * point in time.  Default is 50.
  +    * Pool for this invoker. This is shared between all instances of proxies attached to a specific
  +    * invoker.
       */
  -   public static final int MAX_POOL_SIZE = 50;
  +   protected LinkedList pool;
   
  -   protected int maxPoolSize = MAX_POOL_SIZE;
  +   /**
  +    * connection information
  +    */
  +   protected ServerAddress address;
   
  -   private boolean reuseAddress = true;
  +   public long usedPooled;
  +   public Object usedPoolLock;
   
  -   // flag being set on true by a disconnect request. If trying to create a connection goes on in a
  -   // loop and a disconnect request arrives, this flag will be used to sent this information into
  -   // the connect loop
  -   private volatile boolean bailOut;
  +   // Constructors ---------------------------------------------------------------------------------
   
      public MicroSocketClientInvoker(InvokerLocator locator)
      {
  @@ -148,22 +213,145 @@
      public MicroSocketClientInvoker(InvokerLocator locator, Map configuration)
      {
         super(locator, configuration);
  +
  +      clientSocketConstructor = null;
  +      reuseAddress = true;
  +      shouldCheckConnection = false;
  +      enableTcpNoDelay = TCP_NODELAY_DEFAULT;
  +      clientSocketClassName = ClientSocketWrapper.class.getName();
  +      clientSocketClass = null;
  +      numberOfRetries = MAX_RETRIES;
  +      numberOfCallRetries = MAX_CALL_RETRIES;
  +      pool = null;
  +      maxPoolSize = MAX_POOL_SIZE;
  +      usedPooled = 0;
  +      usedPoolLock = new Object();
  +
         try
         {
            setup();
         }
         catch (Exception ex)
         {
  -         log.error("Error setting up socket client invoker.", ex);
  +         log.error("Error setting up " + this, ex);
            throw new RuntimeException(ex.getMessage());
         }
  +
  +      log.debug(this + " constructed");
      }
   
  -   protected void setup()
  -         throws Exception
  +   // Public ---------------------------------------------------------------------------------------
  +
  +   /**
  +    * Indicates if will check socket connection when returning from pool by sending byte to the
  +    * server. Default value will be false.
  +    */
  +   public boolean checkingConnection()
  +   {
  +      return shouldCheckConnection;
  +   }
  +
  +   /**
  +    * Returns if newly created sockets will have SO_REUSEADDR enabled. Default is for this to be
  +    * true.
  +    */
  +   public boolean getReuseAddress()
  +   {
  +      return reuseAddress;
  +   }
  +
  +   /**
  +    * Sets if newly created socket should have SO_REUSEADDR enable. Default is true.
  +    */
  +   public void setReuseAddress(boolean reuse)
      {
  -      this.addr = InetAddress.getByName(locator.getHost());
  -      this.port = locator.getPort();
  +      reuseAddress = reuse;
  +   }
  +
  +   public synchronized void disconnect()
  +   {
  +      log.debug(this + " disconnecting ...");
  +      bailOut = true;
  +      super.disconnect();
  +   }
  +
  +   public void flushConnectionPool()
  +   {
  +      synchronized (pool)
  +      {
  +         while (pool != null && pool.size() > 0)
  +         {
  +            SocketWrapper socketWrapper = (SocketWrapper)pool.removeFirst();
  +            try
  +            {
  +               socketWrapper.close();
  +            }
  +            catch (IOException e)
  +            {
  +               log.debug("Failed to close socket wrapper", e);
  +            }
  +         }
  +      }
  +   }
  +
  +   /**
  +    * Sets the number of times an invocation will retry based on getting SocketException.
  +    */
  +   public void setNumberOfCallRetries(int numberOfCallRetries)
  +   {
  +      if (numberOfCallRetries < 1)
  +      {
  +         this.numberOfCallRetries = MAX_CALL_RETRIES;
  +      }
  +      else
  +      {
  +         this.numberOfCallRetries = numberOfCallRetries;
  +      }
  +   }
  +
  +   public int getNumberOfCallRetries()
  +   {
  +      return numberOfCallRetries;
  +   }
  +
  +   /**
  +    * Sets the number of retries to get a socket connection.
  +    *
  +    * @param numberOfRetries Must be a number greater than 0.
  +    */
  +   public void setNumberOfRetries(int numberOfRetries)
  +   {
  +      if (numberOfRetries < 1)
  +      {
  +         this.numberOfRetries = MAX_RETRIES;
  +      }
  +      else
  +      {
  +         this.numberOfRetries = numberOfRetries;
  +      }
  +   }
  +
  +   public int getNumberOfRetries()
  +   {
  +      return numberOfRetries;
  +   }
  +
  +   /**
  +    * The name of of the server.
  +    */
  +   public String getServerHostName() throws Exception
  +   {
  +      return address.address;
  +   }
  +
  +   // Package protected ----------------------------------------------------------------------------
  +
  +   // Protected ------------------------------------------------------------------------------------
  +
  +   protected void setup() throws Exception
  +   {
  +      addr = InetAddress.getByName(locator.getHost());
  +      port = locator.getPort();
   
         Properties props = new Properties();
         props.putAll(configuration);
  @@ -171,33 +359,34 @@
   
         configureParameters();
   
  -      address = createServerAddress();
  -   }
  -
  -   protected ServerAddress createServerAddress()
  -   {
  -      return new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1);
  +      address = new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1);
      }
   
      protected void configureParameters()
      {
         Map params = configuration;
  -      if (params != null)
  +
  +      if (params == null)
         {
  +         return;
  +      }
  +
            // look for enableTcpNoDelay param
            Object val = params.get(TCP_NODELAY_FLAG);
            if (val != null)
            {
               try
               {
  -               enableTcpNoDelay = Boolean.valueOf((String) val).booleanValue();
  -               log.debug(this + " setting SocketClientInvoker::enableTcpNoDelay to " + enableTcpNoDelay);
  +            enableTcpNoDelay = Boolean.valueOf((String)val).booleanValue();
  +            log.debug(this + " setting enableTcpNoDelay to " + enableTcpNoDelay);
               }
               catch (Exception e)
               {
  -               log.warn("Could not convert " + TCP_NODELAY_FLAG + " value of " + val + " to a boolean value.");
  +            log.warn(this + " could not convert " + TCP_NODELAY_FLAG + " value of " +
  +                     val + " to a boolean value.");
               }
            }
  +
            // look for maxPoolSize param
            val = params.get(MAX_POOL_SIZE_FLAG);
            if (val != null)
  @@ -205,22 +394,24 @@
               try
               {
                  maxPoolSize = Integer.valueOf((String)val).intValue();
  -               log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
  +            log.debug(this + " setting maxPoolSize to " + maxPoolSize);
               }
               catch (Exception e)
               {
  -               log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
  +            log.warn(this + " could not convert " + MAX_POOL_SIZE_FLAG + " value of " +
  +                     val + " to a int value");
               }
            }
  +
            // look for client socket class name
            val = params.get(CLIENT_SOCKET_CLASS_FLAG);
            if (val != null)
            {
  -            String value = (String) val;
  +         String value = (String)val;
               if (value.length() > 0)
               {
                  clientSocketClassName = value;
  -               log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
  +            log.debug(this + " setting client socket wrapper class name to " + clientSocketClassName);
               }
            }
   
  @@ -231,54 +422,18 @@
               if (value.length() > 0)
               {
                  shouldCheckConnection = Boolean.valueOf(value).booleanValue();
  -            }
  +            log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
            }
         }
      }
   
  -   /**
  -    * Indicates if will check socket connection when returning from
  -    * pool by sending byte to the server.  Default value will be false.
  -    */
  -   public boolean checkingConnection()
  -   {
  -      return shouldCheckConnection;
  -   }
  -
  -   /**
  -    * Returns if newly created sockets will have SO_REUSEADDR enabled.
  -    * Default is for this to be true.
  -    */
  -   public boolean getReuseAddress()
  -   {
  -      return reuseAddress;
  -   }
  -
  -   /**
  -    * Sets if newly created socket should have SO_REUSEADDR enable.
  -    * Default is true.
  -    * @param reuse
  -    */
  -   public void setReuseAddress(boolean reuse)
  -   {
  -      reuseAddress = reuse;
  -   }
  -
  -   public synchronized void disconnect()
  -   {
  -      if (trace) { log.trace(this + " disconnecting ..."); }
  -      bailOut = true;
  -      super.disconnect();
  -   }
  -
      protected void finalize() throws Throwable
      {
         disconnect();
         super.finalize();
      }
   
  -   protected synchronized void handleConnect()
  -         throws ConnectionFailedException
  +   protected synchronized void handleConnect() throws ConnectionFailedException
      {
         initPool();
      }
  @@ -289,24 +444,21 @@
      }
   
      /**
  -    * Each implementation of the remote client invoker should have
  -    * a default data type that is uses in the case it is not specified
  -    * in the invoker locator uri.
  +    * Each implementation of the remote client invoker should have a default data type that is used
  +    * in the case it is not specified in the invoker locator URI.
       */
      protected String getDefaultDataType()
      {
         return SerializableMarshaller.DATATYPE;
      }
   
  -   protected Object transport(String sessionId, Object invocation, Map metadata,
  +   protected Object transport(String sessionID, Object invocation, Map metadata,
                                 Marshaller marshaller, UnMarshaller unmarshaller)
            throws IOException, ConnectionFailedException, ClassNotFoundException
      {
  -
  -      Object response = null;
         long start = System.currentTimeMillis();
         SocketWrapper socketWrapper = null;
  -
  +      Object response = null;
         boolean oneway = false;
         int tempTimeout = -1;
         int savedTimeout = -1;
  @@ -317,7 +469,6 @@
            Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
            if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
            {
  -            if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
               oneway = true;
            }
            
  @@ -333,7 +484,7 @@
                  }
                  catch (Exception e)
                  {
  -                  log.warn("Could not convert " + ServerInvoker.TIMEOUT + " value of " + 
  +                  log.warn(this + " could not convert " + ServerInvoker.TIMEOUT + " value of " +
                               tempTimeoutString + " to an integer value.");
                  }
               }
  @@ -488,227 +639,46 @@
         return response;
      }
   
  -   public void flushConnectionPool()
  -   {
  -      synchronized (pool)
  -      {
  -         while (pool != null && pool.size() > 0)
  -         {
  -            SocketWrapper socketWrapper = (SocketWrapper) pool.removeFirst();
  -            try
  -            {
  -               socketWrapper.close();
  -            }
  -            catch (IOException e)
  -            {
  -
  -            }
  -         }
  -      }
  -   }
  -
      protected Object handleException(Exception ex, SocketWrapper socketWrapper)
            throws ClassNotFoundException, MarshalException
      {
         log.error(this + " got marshalling exception, exiting ...", ex);
  +
         if (ex instanceof ClassNotFoundException)
         {
            //TODO: -TME Add better exception handling for class not found exception
            log.error("Error loading classes from remote call result.", ex);
  -         throw (ClassNotFoundException) ex;
  -      }
  -
  -      throw new MarshalException("Failed to communicate. Problem during marshalling/unmarshalling.", ex);
  -   }
  -
  -   private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version) throws IOException, ClassNotFoundException
  -   {
  -      //TODO: -TME - is switch required?
  -      switch (version)
  -      {
  -         case Version.VERSION_1:
  -         case Version.VERSION_2:
  -         {
  -            if (trace) { log.trace(this + " reading response from unmarshaller"); }
  -            return unmarshaller.read(inputStream, null);
  -         }
  -         default:
  -         {
  -            throw new IOException("Can not read data for version " + version + ".  Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
  -         }
  -      }
  -   }
  -
  -   private void versionedWrite(OutputStream outputStream, Marshaller marshaller, Object invocation, int version) throws IOException
  -   {
  -      //TODO: -TME Should I worry about checking the version here?  Only one way to do it at this point
  -      switch (version)
  -      {
  -         case Version.VERSION_1:
  -         case Version.VERSION_2:
  -         {
  -            if (trace) { log.trace(this + " writing invocation to marshaller"); }
  -            marshaller.write(invocation, outputStream);
  -            if (trace) { log.trace(this + " done writing invocation to marshaller"); }
  -
  -            return;
  -         }
  -         default:
  -         {
  -            throw new IOException("Can not write data for version " + version + ".  Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
  -         }
  -      }
  +         throw (ClassNotFoundException)ex;
      }
   
  -   //TODO: -TME Exact same method in ServerThread
  -   private int readVersion(InputStream inputStream) throws IOException
  -   {
  -      if (trace) { log.trace(this + " reading version from input stream"); }
  -      int version = inputStream.read();
  -      if (trace) { log.trace(this + " read version " + version + " from input stream"); }
  -      return version;
  -   }
  -
  -   //TODO: -TME Exact same method in ServerThread
  -   private void writeVersion(OutputStream outputStream, int version) throws IOException
  -   {
  -      if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
  -      outputStream.write(version);
  -   }
  -
  -   /**
  -    * Close all sockets in a specific pool.
  -    */
  -   public static void clearPool(ServerAddress sa)
  -   {
  -      try
  -      {
  -         LinkedList thepool = (LinkedList) connectionPools.get(sa);
  -         if (thepool == null)
  -         {
  -            return;
  -         }
  -         synchronized (thepool)
  -         {
  -            int size = thepool.size();
  -            for (int i = 0; i < size; i++)
  -            {
  -               SocketWrapper socketWrapper = (SocketWrapper) thepool.removeFirst();
  -               try
  -               {
  -                  socketWrapper.close();
  -                  socketWrapper = null;
  -               }
  -               catch (Exception ignored)
  -               {
  -               }
  -            }
  -         }
  -      }
  -      catch (Exception ex)
  -      {
  -         // ignored
  -      }
  -   }
  -
  -   /**
  -    * Close all sockets in all pools
  -    */
  -   public static void clearPools()
  -   {
  -      synchronized (connectionPools)
  -      {
  -         Iterator it = connectionPools.keySet().iterator();
  -         while (it.hasNext())
  -         {
  -            ServerAddress sa = (ServerAddress) it.next();
  -
  -            if (trace) { log.trace("clearing pool for " + sa); }
  -
  -            clearPool(sa);
  -         }
  -      }
  +      throw new MarshalException(
  +         "Failed to communicate. Problem during marshalling/unmarshalling.", ex);
      }
   
      protected void initPool()
      {
         synchronized (connectionPools)
         {
  -         pool = (LinkedList) connectionPools.get(address);
  +         pool = (LinkedList)connectionPools.get(address);
            if (pool == null)
            {
               pool = new LinkedList();
               connectionPools.put(address, pool);
  +            log.debug(this + " added new pool as " + address);
            }
         }
      }
   
  -   /**
  -    * Sets the number of times an invocation will retry based on getting
  -    * SocketException.
  -    *
  -    * @param numberOfCallRetries
  -    */
  -   public void setNumberOfCallRetries(int numberOfCallRetries)
  -   {
  -      if (numberOfCallRetries < 1)
  -      {
  -         this.numberOfCallRetries = MAX_CALL_RETRIES;
  -      }
  -      else
  -      {
  -         this.numberOfCallRetries = numberOfCallRetries;
  -      }
  -   }
  -
  -   public int getNumberOfCallRetries()
  -   {
  -      return numberOfCallRetries;
  -   }
  -
  -   /**
  -    * Sets the number of retries to get a socket connection.
  -    *
  -    * @param numberOfRetries Must be a number greater than 0
  -    */
  -   public void setNumberOfRetries(int numberOfRetries)
  -   {
  -      if (numberOfRetries < 1)
  -      {
  -         this.numberOfRetries = MAX_RETRIES;
  -      }
  -      else
  -      {
  -         this.numberOfRetries = numberOfRetries;
  -      }
  -   }
  -
  -   public int getNumberOfRetries()
  -   {
  -      return numberOfRetries;
  -   }
  -
  -   /**
  -    * used for debugging (tracing) connections leaks
  -    */
  -   static int counter = 0;
  -
  -   protected SocketWrapper getConnection(Marshaller marshaller, UnMarshaller unmarshaller) throws Exception
  +   protected SocketWrapper getConnection(Marshaller marshaller, UnMarshaller unmarshaller)
  +      throws Exception
      {
         SocketWrapper pooled = null;
   
  -      //
  -      // Need to retry a few times
  -      // on socket connection because, at least on Windoze,
  -      // if too many concurrent threads try to connect
  -      // at same time, you get ConnectionRefused
  -      //
  -      // Retrying seems to be the most performant.
  -      //
  -      // This problem always happens with RMI and seems to
  -      // have nothing to do with backlog or number of threads
  -      // waiting in accept() on the server.
  -      //
  +      // Need to retry a few times on socket connection because, at least on Windoze, if too many
  +      // concurrent threads try to connect at same time, you get ConnectionRefused. Retrying seems
  +      // to be the most performant. This problem always happens with RMI and seems to have nothing
  +      // to do with backlog or number of threads waiting in accept() on the server.
  +
         for (int i = 0; i < numberOfRetries; i++)
         {
            if (bailOut)
  @@ -736,24 +706,21 @@
               }
               else if (usedPooled < maxPoolSize)
               {
  -               // if no connection in pool and all pooled connections
  -               // not in use, then need create a new connection which
  -               // will be latered returned to the pool (thus filling out
  -               // the pool, since starts out empty).
  +               // If no connection in pool and all pooled connections not in use, then need create
  +               // a new connection which will be latered returned to the pool (thus filling out the
  +               // pool, since starts out empty).
   
                  Socket socket = null;
                  long timestamp = System.currentTimeMillis();
                  try
                  {
  -                  if (trace)
  -                  {
  -                     log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1));
  -                  }
  +                  if (trace) { log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1)); }
                     socket = createSocket(address.address, address.port);
                  }
                  catch (Exception ex)
                  {
  -                  log.debug(this + " got Exception " + ex + ", creation attempt took " + (System.currentTimeMillis() - timestamp) + " ms");
  +                  log.debug(this + " got Exception " + ex + ", creation attempt took " +
  +                     (System.currentTimeMillis() - timestamp) + " ms");
   
                     if (i + 1 < numberOfRetries)
                     {
  @@ -782,14 +749,17 @@
                  break;
               }
            }
  -         // waiting 1 second (numberOfRetries along with 1 second wait determines timeout on getting pooled connection)
  +
  +         // Waiting 1 second (numberOfRetries along with 1 second wait determines timeout on getting
  +         // pooled connection)
            Thread.sleep(1000);
         }
   
         if (pooled == null)
         {
  -         throw new SocketException("Can not obtain client socket connection from pool.  Have waited " +
  -                                   numberOfRetries + " seconds for available connection (" + usedPooled + " in use)");
  +         throw new SocketException("Can not obtain client socket connection from pool. " +
  +            "Have waited " + numberOfRetries + " seconds for available connection (" + usedPooled +
  +            " in use)");
         }
         else
         {
  @@ -797,7 +767,8 @@
         }
      }
   
  -   protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata) throws Exception
  +   protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata)
  +      throws Exception
      {
         if (clientSocketConstructor == null)
         {
  @@ -812,12 +783,12 @@
         }
   
         SocketWrapper clientSocketWrapper = null;
  -      clientSocketWrapper = (SocketWrapper) clientSocketConstructor.newInstance(new Object[]{socket, metadata, new Integer(timeout)});
  +      clientSocketWrapper = (SocketWrapper)clientSocketConstructor.
  +         newInstance(new Object[]{socket, metadata, new Integer(timeout)});
   
         return clientSocketWrapper;
      }
   
  -
      protected Socket createSocket(String address, int port) throws IOException
      {
         return new Socket(address, port);
  @@ -828,7 +799,7 @@
         SocketWrapper socketWrapper = null;
         while (pool.size() > 0)
         {
  -         socketWrapper = (SocketWrapper) pool.removeFirst();
  +         socketWrapper = (SocketWrapper)pool.removeFirst();
            try
            {
               if (socketWrapper != null)
  @@ -846,29 +817,81 @@
            }
            catch (Exception ex)
            {
  -            if (trace)
  -            {
  -               log.trace("Couldn't reuse connection from pool");
  -            }
  +            if (trace) { log.trace(this + " couldn't reuse connection from pool"); }
               try
               {
                  socketWrapper.close();
               }
  -            catch (Exception ignored)
  +            catch (Exception e)
               {
  +               log.debug("Failed to close socket wrapper", e);
               }
            }
         }
         return null;
      }
   
  +   // Private --------------------------------------------------------------------------------------
   
  -   /**
  -    * The name of of the server.
  -    */
  -   public String getServerHostName() throws Exception
  +   private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version)
  +      throws IOException, ClassNotFoundException
      {
  -      return address.address;
  +      //TODO: -TME - is switch required?
  +      switch (version)
  +      {
  +         case Version.VERSION_1:
  +         case Version.VERSION_2:
  +         {
  +            if (trace) { log.trace(this + " reading response from unmarshaller"); }
  +            return unmarshaller.read(inputStream, null);
  +         }
  +         default:
  +         {
  +            throw new IOException("Can not read data for version " + version + ". " +
  +               "Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
      }
  +      }
  +   }
  +
  +   private void versionedWrite(OutputStream outputStream, Marshaller marshaller,
  +                               Object invocation, int version) throws IOException
  +   {
  +      //TODO: -TME Should I worry about checking the version here?  Only one way to do it at this point
  +      switch (version)
  +      {
  +         case Version.VERSION_1:
  +         case Version.VERSION_2:
  +         {
  +            if (trace) { log.trace(this + " writing invocation to marshaller"); }
  +            marshaller.write(invocation, outputStream);
  +            if (trace) { log.trace(this + " done writing invocation to marshaller"); }
  +
  +            return;
  +         }
  +         default:
  +         {
  +            throw new IOException("Can not write data for version " + version + ".  " +
  +               "Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
  +         }
  +      }
  +   }
  +
  +   //TODO: -TME Exact same method in ServerThread
  +   private int readVersion(InputStream inputStream) throws IOException
  +   {
  +      if (trace) { log.trace(this + " reading version from input stream"); }
  +      int version = inputStream.read();
  +      if (trace) { log.trace(this + " read version " + version + " from input stream"); }
  +      return version;
  +   }
  +
  +   //TODO: -TME Exact same method in ServerThread
  +   private void writeVersion(OutputStream outputStream, int version) throws IOException
  +   {
  +      if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
  +      outputStream.write(version);
  +   }
  +
  +   // Inner classes --------------------------------------------------------------------------------
   
   }
  
  
  
  1.14.6.5  +0 -2      JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerSocketWrapper.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: ServerSocketWrapper.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerSocketWrapper.java,v
  retrieving revision 1.14.6.4
  retrieving revision 1.14.6.5
  diff -u -b -r1.14.6.4 -r1.14.6.5
  --- ServerSocketWrapper.java	29 Jan 2007 05:21:31 -0000	1.14.6.4
  +++ ServerSocketWrapper.java	29 Jan 2007 07:10:00 -0000	1.14.6.5
  @@ -36,8 +36,6 @@
   {
      final static private Logger log = Logger.getLogger(ServerSocketWrapper.class);
   
  -   private static boolean trace = log.isTraceEnabled();
  -
      public ServerSocketWrapper(Socket socket) throws Exception
      {
         super(socket);
  
  
  
  1.38.2.4  +4 -3      JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java,v
  retrieving revision 1.38.2.3
  retrieving revision 1.38.2.4
  diff -u -b -r1.38.2.3 -r1.38.2.4
  --- SocketClientInvoker.java	23 Jan 2007 09:39:03 -0000	1.38.2.3
  +++ SocketClientInvoker.java	29 Jan 2007 07:10:00 -0000	1.38.2.4
  @@ -40,7 +40,7 @@
    *
    * @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
  - * @version $Revision: 1.38.2.3 $
  + * @version $Revision: 1.38.2.4 $
    */
   public class SocketClientInvoker extends MicroSocketClientInvoker
   {
  @@ -91,11 +91,12 @@
               try
               {
                  timeout = Integer.valueOf((String) val).intValue();;
  -               log.debug("Setting SocketClientInvoker::timeout to: " + timeout);
  +               log.debug(this + " setting timeout to " + timeout);
               }
               catch (Exception e)
               {
  -               log.warn("Could not convert " + SO_TIMEOUT_FLAG + " value of " + val + " to a int value.");
  +               log.warn(this + " could not convert " + SO_TIMEOUT_FLAG + " value of " +
  +                        val + " to a int value.");
               }
            }
         }
  
  
  
  1.8.10.3  +30 -12    JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketWrapper.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: SocketWrapper.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketWrapper.java,v
  retrieving revision 1.8.10.2
  retrieving revision 1.8.10.3
  diff -u -b -r1.8.10.2 -r1.8.10.3
  --- SocketWrapper.java	29 Jan 2007 05:28:29 -0000	1.8.10.2
  +++ SocketWrapper.java	29 Jan 2007 07:10:00 -0000	1.8.10.3
  @@ -35,28 +35,34 @@
    */
   public abstract class SocketWrapper
   {
  +   // Constants ------------------------------------------------------------------------------------
  +
  +   private static final Logger log = Logger.getLogger(SocketWrapper.class);
  +
      public static final String MARSHALLER = "marshaller";
      public static final String UNMARSHALLER = "unmarshaller";
      
  -   private static final Logger log = Logger.getLogger(SocketWrapper.class);
  +   // Static ---------------------------------------------------------------------------------------
   
      private static boolean trace = log.isTraceEnabled();
   
  +   // Attributes -----------------------------------------------------------------------------------
  +
      private Socket socket;
      private int timeout;
   
  +   // Constructors ---------------------------------------------------------------------------------
  +
      protected SocketWrapper(Socket socket)
      {
         if (trace) { log.trace("creating SocketWrapper for " + socket); }
  -
         this.socket = socket;
      }
   
      protected SocketWrapper(Socket socket, Integer timeoutInt) throws SocketException
      {
  -      if (trace) { log.trace("creating SocketWrapper for " + socket + ", using timeout " + timeoutInt); }
  +      this(socket);
   
  -      this.socket = socket;
         if(timeoutInt != null)
         {
            this.timeout = timeoutInt.intValue();
  @@ -64,14 +70,11 @@
         }
      }
   
  -   public abstract OutputStream getOutputStream() throws IOException;
  -
  -   public abstract InputStream getInputStream() throws IOException;
  -
  -   public abstract void checkConnection() throws IOException;
  +   // Public ---------------------------------------------------------------------------------------
   
      public void setTimeout(int timeout) throws SocketException
      {
  +      if (trace) { log.trace(this + " setting timeout to " + timeout); }
         this.timeout = timeout;
         if(socket != null)
         {
  @@ -84,15 +87,25 @@
         return timeout;
      }
   
  -
      public void close() throws IOException
      {
         if(socket != null)
         {
  +         log.debug(this + " closing");
            socket.close();
         }
      }
   
  +   public abstract OutputStream getOutputStream() throws IOException;
  +
  +   public abstract InputStream getInputStream() throws IOException;
  +
  +   public abstract void checkConnection() throws IOException;
  +
  +   // Package protected ----------------------------------------------------------------------------
  +
  +   // Protected ------------------------------------------------------------------------------------
  +
      protected Socket getSocket()
      {
         return socket;
  @@ -106,10 +119,15 @@
            {
               socket.close();
            }
  -         catch(Exception ignored)
  +         catch(Exception e)
            {
  +            log.debug(this + " failed to close socket", e);
            }
         }
      }
   
  +   // Private --------------------------------------------------------------------------------------
  +
  +   // Inner classes --------------------------------------------------------------------------------
  +
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list