[jboss-cvs] JBossAS SVN: r58749 - branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 29 13:35:32 EST 2006


Author: tom.elrod at jboss.com
Date: 2006-11-29 13:35:31 -0500 (Wed, 29 Nov 2006)
New Revision: 58749

Modified:
   branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
Log:
ASPATCH-121 - patch for JBAS-3216 to JBoss_4_0_4_GA_CP branch.

Modified: branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
===================================================================
--- branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java	2006-11-29 18:22:27 UTC (rev 58748)
+++ branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java	2006-11-29 18:35:31 UTC (rev 58749)
@@ -32,7 +32,9 @@
 import java.io.EOFException;
 import java.io.OptionalDataException;
 import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
 import java.net.Socket;
+import java.net.SocketException;
 import java.rmi.MarshalledObject;
 import java.rmi.NoSuchObjectException;
 import java.rmi.ServerException;
@@ -79,7 +81,7 @@
     * Factory for transaction propagation contexts.
     *
     * @todo marcf remove all transaction spill from here
-    * 
+    *
     * When set to a non-null value, it is used to get transaction
     * propagation contexts for remote method invocations.
     * If <code>null</code>, transactions are not propagated on
@@ -88,7 +90,7 @@
    protected static TransactionPropagationContextFactory tpcFactory = null;
 
    //  @todo: MOVE TO TRANSACTION
-   // 
+   //
    // TPC factory
    public static void setTPCFactory(TransactionPropagationContextFactory tpcf) {
       tpcFactory = tpcf;
@@ -104,6 +106,10 @@
    public static long usedPooled = 0;
    /** The number of connections in use */
    private static int inUseCount = 0;
+   /** The number of socket connections made */
+   private static long socketConnectCount = 0;
+   /** The number of socket close calls made */
+   private static long socketCloseCount = 0;
 
    /**
     * Set number of retries in getSocket method
@@ -215,6 +221,17 @@
          }
       }
 
+      public String toString()
+      {
+         StringBuffer tmp = new StringBuffer("ClientSocket@");
+         tmp.append(System.identityHashCode(this));
+         tmp.append('[');
+         tmp.append("socket=");
+         tmp.append(socket.toString());
+         tmp.append(']');
+         return tmp.toString();
+      }
+
       /**
        * @todo should this be handled with weak references as this should
        * work better with gc
@@ -225,10 +242,19 @@
          {
             if( trace )
                log.trace("Closing socket in finalize: "+socket);
-            try { socket.close(); } catch (Exception ignored) {}
+            try
+            {
+               socketCloseCount --;
+               socket.close();
          }
+            catch (Exception ignored) {}
+            finally
+            {
+               socket = null;
       }
    }
+      }
+   }
 
    /**
     * Clear all class level stats
@@ -240,7 +266,7 @@
       writeTime = 0;
       serializeTime = 0;
       deserializeTime = 0;
-      usedPooled = 0;      
+      usedPooled = 0;
    }
 
    /**
@@ -258,6 +284,14 @@
    {
       return usedPooled;
    }
+   public static long getSocketConnectCount()
+   {
+      return socketConnectCount;
+   }
+   public static long getSocketCloseCount()
+   {
+      return socketCloseCount;
+   }
 
    /**
     * @return the total number of pooled connections across all ServerAddresses
@@ -324,20 +358,27 @@
             int size = thepool.size();
             for (int i = 0; i < size; i++)
             {
-               ClientSocket socket = (ClientSocket)thepool.removeFirst();
+               ClientSocket cs = null;
                try
                {
+                  ClientSocket socket = (ClientSocket)thepool.removeFirst();
+                  cs = socket;
                   if( trace )
-                     log.trace("Closing, ClientSocket: "+socket.socket);
+                     log.trace("Closing, ClientSocket: "+socket);
+                  socketCloseCount --;
                   socket.socket.close();
-                  socket.socket = null;
                }
                catch (Exception ignored)
                {
                }
+               finally
+               {
+                  if( cs != null )
+                     cs.socket = null;
             }
          }
       }
+      }
       catch (Exception ex)
       {
          // ignored
@@ -375,6 +416,7 @@
    protected ClientSocket getConnection() throws Exception
    {
       Socket socket = null;
+      ClientSocket cs = null;
 
       //
       // Need to retry a few times
@@ -387,7 +429,7 @@
       // This problem always happens with RMI and seems to
       // have nothing to do with backlog or number of threads
       // waiting in accept() on the server.
-      // 
+      //
       for (int i = 0; i < retryCount; i++)
       {
          ClientSocket pooled = getPooledConnection();
@@ -404,18 +446,36 @@
             {
                log.trace("Connecting to addr: "+address.address
                   +", port: "+address.port
-                  +",clientSocketFactory: "+address.clientSocketFactory);
+                  +",clientSocketFactory: "+address.clientSocketFactory
+                  +",enableTcpNoDelay: "+address.enableTcpNoDelay
+                  +",timeout: "+address.timeout);
             }
             if( address.clientSocketFactory != null )
                socket = address.clientSocketFactory.createSocket(address.address, address.port);
             else
                socket = new Socket(address.address, address.port);
+            socketConnectCount ++;
             if( trace )
-               log.trace("Connected: "+socket);
+               log.trace("Connected, socket="+socket);
+
+            socket.setTcpNoDelay(address.enableTcpNoDelay);
+            cs = new ClientSocket(socket, address.timeout);
+            inUseCount ++;
+            if( trace )
+            {
+               log.trace("New ClientSocket: "+cs
+                  +", usedPooled="+ usedPooled
+                  +", inUseCount="+ inUseCount
+                  +", socketConnectCount="+ socketConnectCount
+                  +", socketCloseCount="+ socketCloseCount
+               );
+            }
             break;
          }
-         catch (ConnectException ex)
+         catch (Exception ex)
          {
+            if( ex instanceof InterruptedIOException || ex instanceof SocketException )
+            {
             if( trace )
                log.trace("Connect failed", ex);
             if (i + 1 < retryCount)
@@ -423,16 +483,14 @@
                Thread.sleep(1);
                continue;
             }
+            }
             throw ex;
          }
       }
       // Should not happen
-      if( socket == null )
+      if( cs == null )
          throw new ConnectException("Failed to obtain a socket, tries="+retryCount);
-
-      inUseCount ++;
-      socket.setTcpNoDelay(address.enableTcpNoDelay);
-      return new ClientSocket(socket, address.timeout);
+      return cs;
    }
 
    protected synchronized ClientSocket getPooledConnection()
@@ -440,31 +498,52 @@
       ClientSocket socket = null;
       while (pool.size() > 0)
       {
-         socket = (ClientSocket)pool.removeFirst();
          try
          {
+            synchronized( pool )
+            {
+               socket = (ClientSocket)pool.removeFirst();
+            }
             // Test to see if socket is alive by send ACK message
+            if( trace )
+               log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
             final byte ACK = 1;
             socket.out.writeByte(ACK);
             socket.out.flush();
             socket.in.readByte();
             if( trace )
-               log.trace("Using pooled socket: "+socket);
+            {
+               log.trace("Using pooled ClientSocket: "+socket
+                  +", usedPooled="+ usedPooled
+                  +", inUseCount="+ inUseCount
+                  +", socketConnectCount="+ socketConnectCount
+                  +", socketCloseCount="+ socketCloseCount
+               );
+            }
             return socket;
          }
          catch (Exception ex)
          {
             if( trace )
-               log.trace("Failed to validate pooled socket: "+socket);
+               log.trace("Failed to validate pooled socket: "+socket, ex);
             try
             {
+               if( socket != null )
+               {
+                  socketCloseCount --;
                socket.socket.close();
             }
+            }
             catch (Exception ignored)
             {
             }
+            finally
+            {
+               if( socket != null )
+                  socket.socket = null;
          }
       }
+      }
       return null;
    }
 
@@ -477,12 +556,15 @@
    protected synchronized boolean returnConnection(ClientSocket socket)
    {
       boolean pooled = false;
+      synchronized( pool )
+      {
       if (pool.size() < maxPoolSize)
       {
          pool.add(socket);
          inUseCount --;
          pooled = true;
       }
+      }
       return pooled;
    }
 
@@ -498,7 +580,7 @@
     * ???
     *
     * @todo MOVE TO TRANSACTION
-    *  
+    *
     * @return the transaction propagation context of the transaction
     *         associated with the current thread.
     *         Returns <code>null</code> if the transaction manager was never
@@ -512,8 +594,8 @@
 
 
    /**
-    * The invocation on the delegate, calls the right invoker.  Remote if we are remote, 
-    * local if we are local. 
+    * The invocation on the delegate, calls the right invoker.  Remote if we are remote,
+    * local if we are local.
     */
    public Object invoke(Invocation invocation)
       throws Exception
@@ -561,14 +643,20 @@
       }
       catch (Exception ex)
       {
+         if( trace )
+            log.trace("Failure during invoke", ex);
          try
          {
+            socketCloseCount --;
             socket.socket.close();
          }
          catch (Exception ignored) {}
-         //System.out.println("got read exception, exiting");
-         throw new ConnectException("Failed to communicate", ex);
+         finally
+         {
+            socket.socket = null;
       }
+         throw new java.rmi.ConnectException("Failure during invoke", ex);
+      }
 
       // Put socket back in pool for reuse
       if( returnConnection(socket) == false )
@@ -578,10 +666,15 @@
             log.trace("Closing unpooled socket: "+socket);
          try
          {
+            socketCloseCount --;
             socket.socket.close();
          }
          catch (Exception ignored) {}
+         finally
+         {
+            socket.socket = null;
       }
+      }
 
       // Return response
 
@@ -618,7 +711,7 @@
    /**
     * Write out the serializable data
     * @serialData address ServerAddress
-    * @serialData maxPoolSize int 
+    * @serialData maxPoolSize int
     * @serialData WIRE_VERSION int version
     * @serialData retryCount int
     * @param out




More information about the jboss-cvs-commits mailing list