[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ron Sigal ron_sigal at yahoo.com
Mon Jan 22 20:02:29 EST 2007


  User: rsigal  
  Date: 07/01/22 20:02:29

  Modified:    src/main/org/jboss/remoting/transport/socket 
                        MicroSocketClientInvoker.java
  Log:
  JBREM-684:  In transport() oneway invocations return socket to pool.
  JBREM-662:  Added Ovidiu's bailout flag from remoting_2_x branch.
  Also, brought in Ovidiu's logging changes from remoting_2_x branch.
  
  Revision  Changes    Path
  1.20      +74 -46    JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MicroSocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -b -r1.19 -r1.20
  --- MicroSocketClientInvoker.java	12 Jan 2007 18:56:22 -0000	1.19
  +++ MicroSocketClientInvoker.java	23 Jan 2007 01:02:29 -0000	1.20
  @@ -6,10 +6,10 @@
   import org.jboss.remoting.InvokerLocator;
   import org.jboss.remoting.RemoteClientInvoker;
   import org.jboss.remoting.Version;
  +import org.jboss.remoting.serialization.ClassLoaderUtility;
   import org.jboss.remoting.marshal.Marshaller;
   import org.jboss.remoting.marshal.UnMarshaller;
   import org.jboss.remoting.marshal.serializable.SerializableMarshaller;
  -import org.jboss.remoting.serialization.ClassLoaderUtility;
   import org.jboss.util.propertyeditor.PropertyEditors;
   
   import java.io.IOException;
  @@ -32,12 +32,12 @@
    *
    * @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
  - * @version $Revision: 1.19 $
  + * @version $Revision: 1.20 $
    */
   public class MicroSocketClientInvoker extends RemoteClientInvoker
   {
      private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
  -   private static final boolean isTraceEnabled = log.isTraceEnabled();
  +   private static final boolean trace = log.isTraceEnabled();
   
      protected InetAddress addr;
      protected int port;
  @@ -134,14 +134,17 @@
   
      private boolean reuseAddress = true;
   
  +   // flag being set on true by a disconnect request. If trying to create a connection goes on in a
  +   // loop and a disconnect request arrives, this flag will be used to sent this information into
  +   // the connect loop
  +   private volatile boolean bailOut;
  +
      public MicroSocketClientInvoker(InvokerLocator locator)
  -         throws IOException
      {
         this(locator, null);
      }
   
      public MicroSocketClientInvoker(InvokerLocator locator, Map configuration)
  -         throws IOException
      {
         super(locator, configuration);
         try
  @@ -201,13 +204,12 @@
            {
               try
               {
  -               int nVal = Integer.valueOf((String) val).intValue();
  -               maxPoolSize = nVal;
  -               log.debug("Setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
  +               maxPoolSize = Integer.valueOf((String)val).intValue();
  +               log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
               }
               catch (Exception e)
               {
  -               log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value.");
  +               log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
               }
            }
            // look for client socket class name
  @@ -218,7 +220,7 @@
               if (value.length() > 0)
               {
                  clientSocketClassName = value;
  -               log.debug("Setting ClientSocket class name to: " + clientSocketClassName);
  +               log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
               }
            }
   
  @@ -237,7 +239,6 @@
      /**
       * Indicates if will check socket connection when returning from
       * pool by sending byte to the server.  Default value will be false.
  -    * @return
       */
      public boolean checkingConnection()
      {
  @@ -247,7 +248,6 @@
      /**
       * Returns if newly created sockets will have SO_REUSEADDR enabled.
       * Default is for this to be true.
  -    * @return
       */
      public boolean getReuseAddress()
      {
  @@ -264,6 +264,13 @@
         reuseAddress = reuse;
      }
   
  +   public synchronized void disconnect()
  +   {
  +      if (trace) { log.trace(this + " disconnecting ..."); }
  +      bailOut = true;
  +      super.disconnect();
  +   }
  +
      protected void finalize() throws Throwable
      {
         disconnect();
  @@ -285,8 +292,6 @@
       * Each implementation of the remote client invoker should have
       * a default data type that is uses in the case it is not specified
       * in the invoker locator uri.
  -    *
  -    * @return
       */
      protected String getDefaultDataType()
      {
  @@ -323,15 +328,17 @@
            {
               throw new CannotConnectException("Can not get connection to server.  Problem establishing socket connection for locator - " + locator, e);
            }
  +
            long end = System.currentTimeMillis() - start;
            getSocketTime += end;
  +
            try
            {
  -
               int version = Version.getDefaultVersion();
               boolean performVersioning = Version.performVersioning();
   
               OutputStream outputStream = socketWrapper.getOutputStream();
  +
               if (performVersioning)
               {
                  writeVersion(outputStream, version);
  @@ -345,37 +352,40 @@
               start = System.currentTimeMillis();
   
               // check to see if is one way invocation and return if is
  +            boolean oneway = false;
               if(metadata != null)
               {
                  Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
                  if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
                  {
  -                  if(isTraceEnabled)
  -                  {
  -                     log.trace("Oneway invocation, so not waiting for response.  Returning null.");
  -                  }
  -                  return null;
  +                  if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
  +//                  return null;
  +                  oneway = true;
                  }
               }
   
  +            if (!oneway)
  +            {
               InputStream inputStream = socketWrapper.getInputStream();
               if (performVersioning)
               {
                  version = readVersion(inputStream);
                  if (version == -1)
                  {
  -                  throw new SocketException();
  +                     throw new SocketException("end of file");
                  }
               }
   
               response = versionedRead(inputStream, unmarshaller, version);
  +            }
   
               end = System.currentTimeMillis() - start;
               readTime += end;
            }
            catch (SocketException sex)
            {
  -            log.debug(sex);
  +            log.debug(this + " got SocketException " + sex);
  +
               try
               {
                  socketWrapper.close();
  @@ -383,6 +393,7 @@
               }
               catch (Exception ex)
               {
  +               if (trace) { log.trace(this + " couldn't successfully close its socketWrapper", ex); }
               }
   
               /**
  @@ -400,6 +411,8 @@
            }
            catch (Exception ex)
            {
  +            log.debug(this + " got exception " + ex);
  +
               try
               {
                  socketWrapper.close();
  @@ -434,7 +447,7 @@
            }
            else
            {
  -            if (isTraceEnabled)
  +            if (trace)
               {
                  log.trace("Pool was already full, will close the connection");
               }
  @@ -448,12 +461,7 @@
            }
         }
   
  -      // Return response
  -      if (isTraceEnabled)
  -      {
  -         log.trace("Response: " + response);
  -      }
  -
  +      if (trace) { log.trace(this + " received " + response);  }
         return response;
   
      }
  @@ -480,7 +488,7 @@
      protected Object handleException(Exception ex, SocketWrapper socketWrapper)
            throws ClassNotFoundException, MarshalException
      {
  -      log.error("Got marshalling exception, exiting", ex);
  +      log.error(this + " got marshalling exception, exiting ...", ex);
         if (ex instanceof ClassNotFoundException)
         {
            //TODO: -TME Add better exception handling for class not found exception
  @@ -488,7 +496,7 @@
            throw (ClassNotFoundException) ex;
         }
   
  -      throw new MarshalException("Failed to communicate.  Problem during marshalling/unmarshalling", ex);
  +      throw new MarshalException("Failed to communicate. Problem during marshalling/unmarshalling.", ex);
      }
   
      private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version) throws IOException, ClassNotFoundException
  @@ -499,6 +507,7 @@
            case Version.VERSION_1:
            case Version.VERSION_2:
            {
  +            if (trace) { log.trace(this + " reading response from input stream"); }
               return unmarshaller.read(inputStream, null);
            }
            default:
  @@ -516,7 +525,10 @@
            case Version.VERSION_1:
            case Version.VERSION_2:
            {
  +            if (trace) { log.trace(this + " writing invocation on output stream"); }
               marshaller.write(invocation, outputStream);
  +            if (trace) { log.trace(this + " done writing invocation on output stream"); }
  +
               return;
            }
            default:
  @@ -529,16 +541,19 @@
      //TODO: -TME Exact same method in ServerThread
      private int readVersion(InputStream inputStream) throws IOException
      {
  -      return inputStream.read();
  +      if (trace) { log.trace(this + " reading version from input stream"); }
  +      int version = inputStream.read();
  +      if (trace) { log.trace(this + " read version " + version + " from input stream"); }
  +      return version;
      }
   
      //TODO: -TME Exact same method in ServerThread
      private void writeVersion(OutputStream outputStream, int version) throws IOException
      {
  +      if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
         outputStream.write(version);
      }
   
  -
      /**
       * Close all sockets in a specific pool.
       */
  @@ -585,6 +600,9 @@
            while (it.hasNext())
            {
               ServerAddress sa = (ServerAddress) it.next();
  +
  +            if (trace) { log.trace("clearing pool for " + sa); }
  +
               clearPool(sa);
            }
         }
  @@ -671,6 +689,13 @@
         //
         for (int i = 0; i < numberOfRetries; i++)
         {
  +         if (bailOut)
  +         {
  +            log.debug(this + " has been concurrently disconnected, " +
  +               "bailing out from trying to create a new connection");
  +            break;
  +         }
  +         
            synchronized (pool)
            {
               // if connection within pool, use it
  @@ -679,6 +704,7 @@
                  pooled = getPooledConnection();
               }
            }
  +
            synchronized(usedPoolLock)
            {
               if (pooled != null)
  @@ -692,18 +718,21 @@
                  // not in use, then need create a new connection which
                  // will be latered returned to the pool (thus filling out
                  // the pool, since starts out empty).
  +
                  Socket socket = null;
  +               long timestamp = System.currentTimeMillis();
                  try
                  {
  -                  if (isTraceEnabled)
  +                  if (trace)
                     {
  -                     log.trace("Creating socket number " + (counter++));
  +                     log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1));
                     }
                     socket = createSocket(address.address, address.port);
                  }
                  catch (Exception ex)
                  {
  -                  log.debug(ex);
  +                  log.debug(this + " got Exception " + ex + ", creation attempt took " + (System.currentTimeMillis() - timestamp) + " ms");
  +
                     if (i + 1 < numberOfRetries)
                     {
                        Thread.sleep(1);
  @@ -781,7 +810,7 @@
            }
            catch (Exception ex)
            {
  -            if (isTraceEnabled)
  +            if (trace)
               {
                  log.trace("Couldn't reuse connection from pool");
               }
  @@ -806,5 +835,4 @@
         return address.address;
      }
   
  -
   }
  
  
  



More information about the jboss-cvs-commits mailing list