[Jboss-cvs] JBossAS SVN: r56498 - in branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled: interfaces server

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Sep 1 02:59:01 EDT 2006


Author: scott.stark at jboss.org
Date: 2006-09-01 02:58:56 -0400 (Fri, 01 Sep 2006)
New Revision: 56498

Added:
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java
Modified:
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java
   branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java
Log:
Merge the JBAS-3300 pooled invoker patch

Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,185 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.ref.WeakReference;
+
+import org.jboss.util.collection.WeakValueHashMap;
+
+import org.jboss.logging.Logger;
+import org.jboss.invocation.MarshalledValueInputStream;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+
+/**
+ * An ObjectInputStream subclass used by the MarshalledValue class to
+ * ensure the classes and proxies are loaded using the thread context
+ * class loader.
+ *
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class OptimizedObjectInputStream
+        extends ObjectInputStream
+{
+   private static Logger log = Logger.getLogger(OptimizedObjectInputStream.class);
+   /** A class wide cache of proxy classes populated by resolveProxyClass */
+   private static ConcurrentReaderHashMap classCache;
+   private static ConcurrentReaderHashMap objectStreamClassCache;
+   private static Method lookupStreamClass = null;
+
+   static
+   {
+      useClassCache(true);
+      try
+      {
+         lookupStreamClass = ObjectStreamClass.class.getDeclaredMethod("lookup", new Class[]{Class.class, boolean.class});
+         lookupStreamClass.setAccessible(true);
+      }
+      catch (Exception ex)
+      {
+         ex.printStackTrace();
+      }
+   }
+
+   /** Enable local caching of resolved proxy classes. This can only be used
+    * if there is a single ULR and no redeployment of the proxy classes.
+    *
+    * @param flag true to enable caching, false to disable it
+    */
+   public static void useClassCache(boolean flag)
+   {
+      if (flag == true)
+      {
+         classCache = new ConcurrentReaderHashMap();
+         objectStreamClassCache = new ConcurrentReaderHashMap();
+      }
+      else
+      {
+         classCache = null;
+         objectStreamClassCache = null;
+      }
+   }
+
+   /** Clear the current proxy cache.
+    *
+    */
+   public static void flushClassCache()
+   {
+      classCache.clear();
+      objectStreamClassCache.clear();
+   }
+
+   private static Class forName(String className) throws ClassNotFoundException
+   {
+      Class clazz = null;
+      if (classCache != null)
+      {
+         WeakReference ref = (WeakReference) classCache.get(className);
+         if (ref != null) clazz = (Class) ref.get();
+         if (clazz == null)
+         {
+            if (ref != null) classCache.remove(className);
+            ClassLoader loader = Thread.currentThread().getContextClassLoader();
+            try
+            {
+               clazz = loader.loadClass(className);
+            }
+            catch (ClassNotFoundException e)
+            {
+               /* Use the Class.forName call which will resolve array classes. We
+               do not use this by default as this can result in caching of stale
+               values across redeployments.
+               */
+               clazz = Class.forName(className, false, loader);
+            }
+            classCache.put(className, new WeakReference(clazz));
+         }
+      }
+      else
+      {
+         clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
+      }
+      return clazz;
+   }
+
+   /**
+    * Creates a new instance of MarshalledValueOutputStream
+    */
+   public OptimizedObjectInputStream(InputStream is) throws IOException
+   {
+      super(is);
+   }
+
+   protected static ObjectStreamClass lookup(Class clazz)
+   {
+      Object[] args = {clazz, Boolean.TRUE};
+      try
+      {
+         return (ObjectStreamClass) lookupStreamClass.invoke(null, args);
+      }
+      catch (Exception ex)
+      {
+         ex.printStackTrace();
+      }
+      return null;
+   }
+
+   protected ObjectStreamClass readClassDescriptor()
+           throws IOException, ClassNotFoundException
+   {
+      String className = readUTF();
+      ObjectStreamClass osc = null;
+      if (objectStreamClassCache != null)
+      {
+         osc = (ObjectStreamClass) objectStreamClassCache.get(className);
+      }
+      if (osc == null)
+      {
+         Class clazz = forName(className);
+         osc = ObjectStreamClass.lookup(clazz);
+         if (osc == null) osc = lookup(clazz);
+         if (osc == null) throw new IOException("Unable to readClassDescriptor for class " + className);
+         if (objectStreamClassCache != null) objectStreamClassCache.put(className, osc);
+      }
+      return osc;
+   }
+
+   /**
+    * Use the thread context class loader to resolve the class
+    *
+    * @throws IOException   Any exception thrown by the underlying OutputStream.
+    */
+   protected Class resolveClass(ObjectStreamClass v)
+           throws IOException, ClassNotFoundException
+   {
+      String className = v.getName();
+      return forName(className);
+   }
+
+   protected Class resolveProxyClass(String[] interfaces)
+           throws IOException, ClassNotFoundException
+   {
+      // Load the interfaces from the cache or thread context class loader
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+      Class[] ifaceClasses = new Class[interfaces.length];
+      for (int i = 0; i < interfaces.length; i++)
+      {
+         String className = interfaces[i];
+         Class iface = forName(className);
+         ifaceClasses[i] = iface;
+      }
+
+      return Proxy.getProxyClass(loader, ifaceClasses);
+   }
+}

Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.rmi.Remote;
+import java.rmi.server.RemoteObject;
+import java.rmi.server.RemoteStub;
+
+/**
+ * An ObjectOutputStream subclass used by the MarshalledValue class to
+ * ensure the classes and proxies are loaded using the thread context
+ * class loader. Currently this does not do anything as neither class or
+ * proxy annotations are used.
+ *
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class OptimizedObjectOutputStream
+   extends ObjectOutputStream
+{
+   /** Creates a new instance of MarshalledValueOutputStream
+    If there is a security manager installed, this method requires a
+    SerializablePermission("enableSubstitution") permission to ensure it's
+    ok to enable the stream to do replacement of objects in the stream.
+    */
+   public OptimizedObjectOutputStream(OutputStream os) throws IOException
+   {
+      super(os);
+      enableReplaceObject(true);
+   }
+
+   /**
+    * Writes just the class name to this output stream.
+    *
+    * @param classdesc class description object
+    */
+   protected void writeClassDescriptor(ObjectStreamClass classdesc)
+      throws IOException
+   {
+      writeUTF(classdesc.getName());
+   }
+
+   /** Override replaceObject to check for Remote objects that are
+    not RemoteStubs.
+   */
+   protected Object replaceObject(Object obj) throws IOException
+   {
+      if( (obj instanceof Remote) && !(obj instanceof RemoteStub) )
+      {
+         Remote remote = (Remote) obj;
+         try
+         {
+            obj = RemoteObject.toStub(remote);
+         }
+         catch(IOException ignore)
+         {
+            // Let the Serialization layer try with the orignal obj
+         }
+      }
+      return obj;
+   }
+}

Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,10 +1,24 @@
 /*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
 package org.jboss.invocation.pooled.interfaces;
 
 import java.io.IOException;
@@ -15,44 +29,59 @@
 import java.io.BufferedInputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.IOException;
+import java.io.EOFException;
+import java.io.OptionalDataException;
+import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
 import java.net.Socket;
+import java.net.SocketException;
 import java.rmi.MarshalledObject;
 import java.rmi.NoSuchObjectException;
 import java.rmi.ServerException;
 import java.rmi.ConnectException;
-import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.List;
 import java.util.LinkedList;
 
 import javax.transaction.TransactionRolledbackException;
 import javax.transaction.SystemException;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.HandshakeCompletedListener;
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.net.ssl.SSLException;
 
 import org.jboss.invocation.Invocation;
 import org.jboss.invocation.Invoker;
-import org.jboss.invocation.MarshalledInvocation;
 import org.jboss.tm.TransactionPropagationContextFactory;
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
 
 
+
 /**
  * Client socket connections are pooled to avoid the overhead of
  * making a connection.  RMI seems to do a new connection with each
  * request.
  *
  * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Scott.Stark at jboss.org
  * @version $Revision$
  */
 public class PooledInvokerProxy
    implements Invoker, Externalizable
 {
    // Attributes ----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PooledInvokerProxy.class);
    /** The serialVersionUID @since 1.1.4.3 */
    private static final long serialVersionUID = -1456509931095566410L;
+   /** The current wire format we write */
+   private static final int WIRE_VERSION = 1;
 
    /**
     * Factory for transaction propagation contexts.
     *
-    * @todo: marcf remove all transaction spill from here
+    * @todo marcf remove all transaction spill from here
     * 
     * When set to a non-null value, it is used to get transaction
     * propagation contexts for remote method invocations.
@@ -60,7 +89,7 @@
     * remote method invocations.
     */
    protected static TransactionPropagationContextFactory tpcFactory = null;
-   
+
    //  @todo: MOVE TO TRANSACTION
    // 
    // TPC factory
@@ -68,77 +97,249 @@
       tpcFactory = tpcf;
    }
 
-   // Performance measurements
+   // Simple performance measurements, not thread safe
    public static long getSocketTime = 0;
    public static long readTime = 0;
    public static long writeTime = 0;
    public static long serializeTime = 0;
    public static long deserializeTime = 0;
+   /** The number of times a connection has been obtained from a pool */
    public static long usedPooled = 0;
+   /** The number of connections in use */
+   private static int inUseCount = 0;
+   /** The number of socket connections made */
+   private static long socketConnectCount = 0;
+   /** The number of socket close calls made */
+   private static long socketCloseCount = 0;
 
-
    /**
     * Set number of retries in getSocket method
     */
    public static int MAX_RETRIES = 10;
 
+   /** A class wide pool Map<ServerAddres, LinkedList<ClientSocket>> */
+   protected static final Map connectionPools = new ConcurrentReaderHashMap();
 
-   protected static HashMap connectionPools = new HashMap();
-
    /**
     * connection information
     */
    protected ServerAddress address;
-   
+
    /**
     * Pool for this invoker.  This is shared between all
     * instances of proxies attached to a specific invoker
+    * This should not be serializable, but is for backward compatibility.
     */
    protected LinkedList pool = null;
+   /** */
    protected int maxPoolSize;
+   /** The number of times to retry after seeing a ConnectionException */
+   protected int retryCount = 1;
+   /** The logging trace flag */
+   private transient boolean trace;
 
+   /**
+    * An encapsulation of a client connection
+    */
    protected static class ClientSocket
+      implements HandshakeCompletedListener
    {
       public ObjectOutputStream out;
       public ObjectInputStream in;
       public Socket socket;
       public int timeout;
+      public String sessionID;
+      private boolean handshakeComplete = false;
+      private boolean trace;
+
       public ClientSocket(Socket socket, int timeout) throws Exception
       {
          this.socket = socket;
+         trace = log.isTraceEnabled();
+         boolean needHandshake = false;
+
+         if( socket instanceof SSLSocket )
+         {
+            SSLSocket ssl = (SSLSocket) socket;
+            ssl.addHandshakeCompletedListener(this);
+            if( trace )
+               log.trace("Starting SSL handshake");
+            needHandshake = true;
+            handshakeComplete = false;
+            ssl.startHandshake();
+         }
          socket.setSoTimeout(timeout);
          this.timeout = timeout;
-         out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+         out = new OptimizedObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
          out.flush();
-         in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+         in = new OptimizedObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+         if( needHandshake )
+         {
+            // Loop waiting for the handshake to complete
+            socket.setSoTimeout(1000);
+            for(int n = 0; handshakeComplete == false && n < 60; n ++)
+            {
+               try
+               {
+                  int b = in.read();
+               }
+               catch(SSLException e)
+               {
+                  if( trace )
+                     log.trace("Error while waiting for handshake to complete", e);
+                  throw e;
+               }
+               catch(IOException e)
+               {
+                  if( trace )
+                     log.trace("Handshaked read()", e);
+               }
+            }
+            if( handshakeComplete == false )
+               throw new SSLException("Handshaked failed to complete in 60 seconds");
+            // Restore the original timeout
+            socket.setSoTimeout(timeout);
+         }
+
       }
 
+      public void handshakeCompleted(HandshakeCompletedEvent event)
+      {
+         handshakeComplete = true;
+         byte[] id = event.getSession().getId();
+         try
+         {
+            sessionID = new String(id, "UTF-8");
+         }
+         catch (UnsupportedEncodingException e)
+         {
+            log.warn("Failed to create session id using UTF-8, using default", e);
+            sessionID = new String(id);
+         }
+         if( trace )
+         {
+            log.trace("handshakeCompleted, event="+event+", sessionID="+sessionID);
+         }
+      }
+
+      public String toString()
+      {
+         StringBuffer tmp = new StringBuffer("ClientSocket@");
+         tmp.append(System.identityHashCode(this));
+         tmp.append('[');
+         tmp.append("socket=");
+         tmp.append(socket.toString());
+         tmp.append(']');
+         return tmp.toString();
+      }
+
+      /**
+       * @todo should this be handled with weak references as this should
+       * work better with gc
+       */
       protected void finalize()
       {
          if (socket != null)
          {
-            try { socket.close(); } catch (Exception ignored) {}
+            if( trace )
+               log.trace("Closing socket in finalize: "+socket);
+            try
+            {
+               socketCloseCount --;
+               socket.close();
+            }
+            catch (Exception ignored) {}
+            finally
+            {
+               socket = null;
+            }
          }
       }
    }
 
    /**
+    * Clear all class level stats
+    */
+   public static void clearStats()
+   {
+      getSocketTime = 0;
+      readTime = 0;
+      writeTime = 0;
+      serializeTime = 0;
+      deserializeTime = 0;
+      usedPooled = 0;      
+   }
+
+   /**
+    * @return the active number of client connections
+    */
+   public static long getInUseCount()
+   {
+      return inUseCount;
+   }
+
+   /**
+    * @return the number of times a connection was returned from a pool
+    */
+   public static long getUsedPooled()
+   {
+      return usedPooled;
+   }
+   public static long getSocketConnectCount()
+   {
+      return socketConnectCount;
+   }
+   public static long getSocketCloseCount()
+   {
+      return socketCloseCount;
+   }
+
+   /**
+    * @return the total number of pooled connections across all ServerAddresses
+    */
+   public static int getTotalPoolCount()
+   {
+      int count = 0;
+      Iterator iter = connectionPools.values().iterator();
+      while( iter.hasNext() )
+      {
+         List pool = (List) iter.next();
+         if( pool != null )
+            count += pool.size();
+      }
+      return count;
+   }
+
+   /**
+    * @return the proxy local pool count
+    */
+   public long getPoolCount()
+   {
+      return pool.size();
+   }
+
+   /**
     * Exposed for externalization.
     */
    public PooledInvokerProxy()
    {
       super();
+      trace = log.isTraceEnabled();
    }
 
-   
    /**
     * Create a new Proxy.
     *
     */
    public PooledInvokerProxy(ServerAddress sa, int maxPoolSize)
    {
+      this(sa, maxPoolSize, MAX_RETRIES);
+   }
+   public PooledInvokerProxy(ServerAddress sa, int maxPoolSize, int retryCount)
+   {
       this.address = sa;
       this.maxPoolSize = maxPoolSize;
+      this.retryCount = retryCount;
    }
 
    /**
@@ -146,6 +347,9 @@
     */
    public static void clearPool(ServerAddress sa)
    {
+      boolean trace = log.isTraceEnabled();
+      if( trace )
+         log.trace("clearPool, sa: "+sa);
       try
       {
          LinkedList thepool = (LinkedList)connectionPools.get(sa);
@@ -155,15 +359,24 @@
             int size = thepool.size();
             for (int i = 0; i < size; i++)
             {
-               ClientSocket socket = (ClientSocket)thepool.removeFirst();
+               ClientSocket cs = null;
                try
                {
+                  ClientSocket socket = (ClientSocket)thepool.removeFirst();
+                  cs = socket;
+                  if( trace )
+                     log.trace("Closing, ClientSocket: "+socket);
+                  socketCloseCount --;
                   socket.socket.close();
-                  socket.socket = null;
                }
                catch (Exception ignored)
                {
                }
+               finally
+               {
+                  if( cs != null )
+                     cs.socket = null;
+               }
             }
          }
       }
@@ -203,10 +416,9 @@
 
    protected ClientSocket getConnection() throws Exception
    {
-      Exception failed = null;
       Socket socket = null;
+      ClientSocket cs = null;
 
-
       //
       // Need to retry a few times
       // on socket connection because, at least on Windoze,
@@ -219,79 +431,156 @@
       // have nothing to do with backlog or number of threads
       // waiting in accept() on the server.
       // 
-      for (int i = 0; i < MAX_RETRIES; i++)
+      for (int i = 0; i < retryCount; i++)
       {
-         synchronized(pool)
+         ClientSocket pooled = getPooledConnection();
+         if (pooled != null)
          {
-            if (pool.size() > 0)
-            {
-               ClientSocket pooled = getPooledConnection();
-               if (pooled != null)
-               {
-                  usedPooled++;
-                  return pooled;
-               }
-            }
+            usedPooled++;
+            inUseCount ++;
+            return pooled;
          }
-         
+
          try
          {
-            socket = new Socket(address.address, address.port);
+            if( trace)
+            {
+               log.trace("Connecting to addr: "+address.address
+                  +", port: "+address.port
+                  +",clientSocketFactory: "+address.clientSocketFactory
+                  +",enableTcpNoDelay: "+address.enableTcpNoDelay
+                  +",timeout: "+address.timeout);
+            }
+            if( address.clientSocketFactory != null )
+               socket = address.clientSocketFactory.createSocket(address.address, address.port);
+            else
+               socket = new Socket(address.address, address.port);
+            socketConnectCount ++;
+            if( trace )
+               log.trace("Connected, socket="+socket);
+
+            socket.setTcpNoDelay(address.enableTcpNoDelay);
+            cs = new ClientSocket(socket, address.timeout);
+            inUseCount ++;
+            if( trace )
+            {
+               log.trace("New ClientSocket: "+cs
+                  +", usedPooled="+ usedPooled
+                  +", inUseCount="+ inUseCount
+                  +", socketConnectCount="+ socketConnectCount
+                  +", socketCloseCount="+ socketCloseCount
+               );
+            }
             break;
          }
          catch (Exception ex)
          {
-            if (i + 1 < MAX_RETRIES)
+            if( ex instanceof InterruptedIOException || ex instanceof SocketException )
             {
-               Thread.sleep(1);
-               continue;
+               if( trace )
+                  log.trace("Connect failed", ex);
+               if (i + 1 < retryCount)
+               {
+                  Thread.sleep(1);
+                  continue;
+               }
             }
             throw ex;
          }
       }
-      socket.setTcpNoDelay(address.enableTcpNoDelay);
-      return new ClientSocket(socket, address.timeout); 
+      // Should not happen
+      if( cs == null )
+         throw new ConnectException("Failed to obtain a socket, tries="+retryCount);
+      return cs;
    }
- 
-   protected ClientSocket getPooledConnection()
+
+   protected synchronized ClientSocket getPooledConnection()
    {
       ClientSocket socket = null;
       while (pool.size() > 0)
       {
-         socket = (ClientSocket)pool.removeFirst();
          try
          {
+            synchronized( pool )
+            {
+               socket = (ClientSocket)pool.removeFirst();
+            }
             // Test to see if socket is alive by send ACK message
+            if( trace )
+               log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
             final byte ACK = 1;
             socket.out.writeByte(ACK);
             socket.out.flush();
             socket.in.readByte();
+            if( trace )
+            {
+               log.trace("Using pooled ClientSocket: "+socket
+                  +", usedPooled="+ usedPooled
+                  +", inUseCount="+ inUseCount
+                  +", socketConnectCount="+ socketConnectCount
+                  +", socketCloseCount="+ socketCloseCount
+               );
+            }
             return socket;
          }
          catch (Exception ex)
          {
+            if( trace )
+               log.trace("Failed to validate pooled socket: "+socket, ex);
             try
             {
-               socket.socket.close();
+               if( socket != null )
+               {
+                  socketCloseCount --;
+                  socket.socket.close();
+               }
             }
-            catch (Exception ignored) {}
+            catch (Exception ignored)
+            {
+            }
+            finally
+            {
+               if( socket != null )
+                  socket.socket = null;
+            }
          }
       }
       return null;
    }
 
    /**
+    * Return a socket to the pool
+    * @param socket
+    * @return true if socket was added to the pool, false if the pool
+    *    was full
+    */
+   protected synchronized boolean returnConnection(ClientSocket socket)
+   {
+      boolean pooled = false;
+      synchronized( pool )
+      {
+         if (pool.size() < maxPoolSize)
+         {
+            pool.add(socket);
+            inUseCount --;
+            pooled = true;
+         }
+      }
+      return pooled;
+   }
+
+   /**
     * The name of of the server.
     */
    public String getServerHostName() throws Exception
    {
       return address.address;
    }
-   
+
    /**
     * ???
     *
-    * @todo: MOVE TO TRANSACTION
+    * @todo MOVE TO TRANSACTION
     *  
     * @return the transaction propagation context of the transaction
     *         associated with the current thread.
@@ -312,21 +601,31 @@
    public Object invoke(Invocation invocation)
       throws Exception
    {
+      boolean trace = log.isTraceEnabled();
       // We are going to go through a Remote invocation, switch to a Marshalled Invocation
-      MarshalledInvocation mi = new MarshalledInvocation(invocation);
-         
+      PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation);
+
       // Set the transaction propagation context
       //  @todo: MOVE TO TRANSACTION
       mi.setTransactionPropagationContext(getTransactionPropagationContext());
 
-
       Object response = null;
       long start = System.currentTimeMillis();
       ClientSocket socket = getConnection();
       long end = System.currentTimeMillis() - start;
       getSocketTime += end;
+      // Add the socket session if it exists
+      if( socket.sessionID != null )
+      {
+         mi.setValue("SESSION_ID", socket.sessionID);
+         if( trace )
+            log.trace("Added SESSION_ID to invocation");
+      }
+
       try
       {
+         if( trace )
+            log.trace("Sending invocation to: "+mi.getObjectName());
          socket.out.writeObject(mi);
          socket.out.reset();
          socket.out.writeObject(Boolean.TRUE); // for stupid ObjectInputStream reset
@@ -345,38 +644,43 @@
       }
       catch (Exception ex)
       {
+         if( trace )
+            log.trace("Failure during invoke", ex);
          try
          {
+            socketCloseCount --;
             socket.socket.close();
          }
          catch (Exception ignored) {}
-         //System.out.println("got read exception, exiting");
-         throw new ConnectException("Failed to communicate", ex);
+         finally
+         {
+            socket.socket = null;
+         }
+         throw new java.rmi.ConnectException("Failure during invoke", ex);
       }
-      
-      //System.out.println("put back in pool");
+
       // Put socket back in pool for reuse
-      synchronized (pool)
+      if( returnConnection(socket) == false )
       {
-         if (pool.size() < maxPoolSize)
+         // Failed, close the socket
+         if( trace )
+            log.trace("Closing unpooled socket: "+socket);
+         try
          {
-            pool.add(socket);
+            socketCloseCount --;
+            socket.socket.close();
          }
-         else
+         catch (Exception ignored) {}
+         finally
          {
-            try
-            {
-               socket.socket.close();
-            }
-            catch (Exception ignored) {}
+            socket.socket = null;
          }
       }
-      
+
       // Return response
-      //System.out.println("return response");
 
       try
-      { 
+      {
          if (response instanceof Exception)
          {
             throw ((Exception)response);
@@ -385,7 +689,7 @@
          {
             return ((MarshalledObject)response).get();
          }
-         return response; 
+         return response;
       }
       catch (ServerException ex)
       {
@@ -402,29 +706,70 @@
             throw (TransactionRolledbackException) ex.detail;
          }
          throw ex;
-      }  
+      }
    }
-   
+
    /**
-    * Externalize this instance and handle obtaining the remoteInvoker stub
+    * Write out the serializable data
+    * @serialData address ServerAddress
+    * @serialData maxPoolSize int 
+    * @serialData WIRE_VERSION int version
+    * @serialData retryCount int
+    * @param out
+    * @throws IOException
     */
    public void writeExternal(final ObjectOutput out)
       throws IOException
-   { 
+   {
+      // The legacy wire format is address, maxPoolSize
       out.writeObject(address);
       out.writeInt(maxPoolSize);
+      // Write out the current version format and its data
+      out.writeInt(WIRE_VERSION);
+      out.writeInt(retryCount);
    }
- 
-   /**
-    * Un-externalize this instance.
-    *
-    */
+
    public void readExternal(final ObjectInput in)
       throws IOException, ClassNotFoundException
    {
+      trace = log.isTraceEnabled();
       address = (ServerAddress)in.readObject();
       maxPoolSize = in.readInt();
+      int version = 0;
+      try
+      {
+         version = in.readInt();
+      }
+      catch(EOFException e)
+      {
+         // No version written and there is no more data
+      }
+      catch(OptionalDataException e)
+      {
+         // No version written and there is data from other objects
+      }
+
+      switch( version )
+      {
+         case 0:
+            // This has no retryCount, default it to the hard-coded value
+            retryCount = MAX_RETRIES;
+            break;
+         case 1:
+            readVersion1(in);
+            break;
+         default:
+            /* Assume a newer version that only adds defaultable values.
+            The alternative would be to thrown an exception
+            */
+            break;
+      }
       initPool();
    }
+
+   private void readVersion1(final ObjectInput in)
+      throws IOException
+   {
+      retryCount = in.readInt();
+   }
 }
-

Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,779 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.Externalizable;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
+import java.io.ObjectOutput;
+import java.io.ObjectInput;
+import java.io.EOFException;
+import java.io.OptionalDataException;
+import java.util.Map;
+import java.util.Iterator;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.ConnectException;
+import java.rmi.MarshalledObject;
+import java.rmi.ServerException;
+import java.rmi.NoSuchObjectException;
+
+import javax.net.ssl.HandshakeCompletedListener;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionRolledbackException;
+
+import org.jboss.invocation.Invoker;
+import org.jboss.invocation.Invocation;
+import org.jboss.logging.Logger;
+import org.jboss.tm.TransactionPropagationContextFactory;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
+
+/**
+ * Client socket connections are pooled to avoid the overhead of
+ * making a connection.
+ *
+ * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class PooledInvokerProxy2
+   implements Invoker, Externalizable
+{
+   // Attributes ----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PooledInvokerProxy2.class);
+   /** The serialVersionUID @since 1.1.4.3 */
+   private static final long serialVersionUID = -1456509931095566410L;
+   /** The current wire format we write */
+   private static final int WIRE_VERSION = 1;
+   /** This controls the max number of concurrent connection in usse */
+   private static FIFOSemaphore maxActiveConnections;
+
+   /**
+    * Factory for transaction propagation contexts.
+    *
+    * @todo marcf remove all transaction spill from here
+    * 
+    * When set to a non-null value, it is used to get transaction
+    * propagation contexts for remote method invocations.
+    * If <code>null</code>, transactions are not propagated on
+    * remote method invocations.
+    */
+   protected static TransactionPropagationContextFactory tpcFactory = null;
+
+   //  @todo: MOVE TO TRANSACTION
+   // 
+   // TPC factory
+   public static void setTPCFactory(TransactionPropagationContextFactory tpcf) {
+      PooledInvokerProxy2.tpcFactory = tpcf;
+   }
+
+   // Simple performance measurements, not thread safe
+   public static long getSocketTime = 0;
+   public static long readTime = 0;
+   public static long writeTime = 0;
+   public static long serializeTime = 0;
+   public static long deserializeTime = 0;
+   /** The number of times a connection has been obtained from a pool */
+   public static long usedPooled = 0;
+   /** The number of connections in use */
+   private static int inUseCount = 0;
+   /** The number of socket connections made */
+   private static long socketConnectCount = 0;
+   /** The number of socket close calls made */
+   private static long socketCloseCount = 0;
+
+   /**
+    * Set number of retries in getSocket method
+    */
+   public static int MAX_RETRIES = 10;
+
+   /** A class wide pool Map<ServerAddres, LinkedList<ClientSocket>> */
+   protected static final Map connectionPools = new ConcurrentReaderHashMap();
+
+   /**
+    * connection information
+    */
+   protected ServerAddress address;
+
+   /**
+    * Pool for this invoker.  This is shared between all
+    * instances of proxies attached to a specific invoker
+    * This should not be serializable, but is for backward compatibility.
+    */
+   protected BoundedLinkedQueue pool = null;
+   /** Used to initialize the */
+   protected int maxPoolSize;
+   /** The number of times to retry after seeing a ConnectionException */
+   protected int retryCount = 1;
+   /** The logging trace flag */
+   private transient boolean trace;
+
+   /**
+    * An encapsulation of a client connection
+    */
+   protected static class ClientSocket
+      implements HandshakeCompletedListener
+   {
+      public ObjectOutputStream out;
+      public ObjectInputStream in;
+      public Socket socket;
+      public int timeout;
+      public String sessionID;
+      private boolean handshakeComplete = false;
+      private boolean trace;
+
+      public ClientSocket(Socket socket, int timeout) throws Exception
+      {
+         this.socket = socket;
+         trace = log.isTraceEnabled();
+         boolean needHandshake = false;
+
+         if( socket instanceof SSLSocket )
+         {
+            SSLSocket ssl = (SSLSocket) socket;
+            ssl.addHandshakeCompletedListener(this);
+            if( trace )
+               log.trace("Starting SSL handshake");
+            needHandshake = true;
+            handshakeComplete = false;
+            ssl.startHandshake();
+         }
+         socket.setSoTimeout(timeout);
+         this.timeout = timeout;
+         out = new OptimizedObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+         out.flush();
+         in = new OptimizedObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+         if( needHandshake )
+         {
+            // Loop waiting for the handshake to complete
+            socket.setSoTimeout(1000);
+            for(int n = 0; handshakeComplete == false && n < 60; n ++)
+            {
+               try
+               {
+                  int b = in.read();
+               }
+               catch(SSLException e)
+               {
+                  if( trace )
+                     log.trace("Error while waiting for handshake to complete", e);
+                  throw e;
+               }
+               catch(IOException e)
+               {
+                  if( trace )
+                     log.trace("Handshaked read()", e);
+               }
+            }
+            if( handshakeComplete == false )
+               throw new SSLException("Handshaked failed to complete in 60 seconds");
+            // Restore the original timeout
+            socket.setSoTimeout(timeout);
+         }
+
+      }
+
+      public void handshakeCompleted(HandshakeCompletedEvent event)
+      {
+         handshakeComplete = true;
+         byte[] id = event.getSession().getId();
+         try
+         {
+            sessionID = new String(id, "UTF-8");
+         }
+         catch (UnsupportedEncodingException e)
+         {
+            log.warn("Failed to create session id using UTF-8, using default", e);
+            sessionID = new String(id);
+         }
+         if( trace )
+         {
+            log.trace("handshakeCompleted, event="+event+", sessionID="+sessionID);
+         }
+      }
+
+      public String toString()
+      {
+         StringBuffer tmp = new StringBuffer("ClientSocket@");
+         tmp.append(System.identityHashCode(this));
+         tmp.append('[');
+         tmp.append("socket=");
+         tmp.append(socket.toString());
+         tmp.append(']');
+         return tmp.toString();
+      }
+
+      /**
+       * @todo should this be handled with weak references as this should
+       * work better with gc
+       */
+      protected void finalize()
+      {
+         if (socket != null)
+         {
+            if( trace )
+               log.trace("Closing socket in finalize: "+socket);
+            try
+            {
+               socketCloseCount --;
+               socket.close();
+            }
+            catch (Exception ignored) {}
+            finally
+            {
+               socket = null;
+            }
+         }
+      }
+   }
+
+   /**
+    * Clear all class level stats
+    */
+   public static void clearStats()
+   {
+      getSocketTime = 0;
+      readTime = 0;
+      writeTime = 0;
+      serializeTime = 0;
+      deserializeTime = 0;
+      usedPooled = 0;
+   }
+
+   /**
+    * @return the active number of client connections
+    */
+   public static long getInUseCount()
+   {
+      return inUseCount;
+   }
+
+   /**
+    * @return the number of times a connection was returned from a pool
+    */
+   public static long getUsedPooled()
+   {
+      return usedPooled;
+   }
+   public static long getSocketConnectCount()
+   {
+      return socketConnectCount;
+   }
+   public static long getSocketCloseCount()
+   {
+      return socketCloseCount;
+   }
+
+   /**
+    * @return the total number of pooled connections across all ServerAddresses
+    */
+   public static int getTotalPoolCount()
+   {
+      int count = 0;
+      Iterator iter = connectionPools.values().iterator();
+      while( iter.hasNext() )
+      {
+         BoundedLinkedQueue pool = (BoundedLinkedQueue) iter.next();
+         if( pool != null )
+            count += pool.size();
+      }
+      return count;
+   }
+
+   /**
+    * @return the proxy local pool count
+    */
+   public long getPoolCount()
+   {
+      return pool.size();
+   }
+
+   /**
+    * Exposed for externalization.
+    */
+   public PooledInvokerProxy2()
+   {
+      super();
+      trace = log.isTraceEnabled();
+   }
+
+   /**
+    * Create a new Proxy.
+    *
+    */
+   public PooledInvokerProxy2(ServerAddress sa, int maxPoolSize)
+   {
+      this(sa, maxPoolSize, MAX_RETRIES);
+   }
+   public PooledInvokerProxy2(ServerAddress sa, int maxPoolSize, int retryCount)
+   {
+      this.address = sa;
+      this.maxPoolSize = maxPoolSize;
+      this.retryCount = retryCount;
+   }
+
+   /**
+    * Close all sockets in a specific pool.
+    */
+   public static void clearPool(ServerAddress sa)
+   {
+      boolean trace = log.isTraceEnabled();
+      if( trace )
+         log.trace("clearPool, sa: "+sa);
+      try
+      {
+         BoundedLinkedQueue thepool = (BoundedLinkedQueue) connectionPools.get(sa);
+         if (thepool == null) return;
+         int size = thepool.size();
+         for (int i = 0; i < size; i++)
+         {
+            ClientSocket cs = null;
+            try
+            {
+               ClientSocket socket = (ClientSocket)thepool.take();
+               cs = socket;
+               if( trace )
+                  log.trace("Closing, ClientSocket: "+socket);
+               socketCloseCount --;
+               socket.socket.close();
+            }
+            catch (Exception ignored)
+            {
+            }
+            finally
+            {
+               if( cs != null )
+                  cs.socket = null;
+            }
+         }
+      }
+      catch (Exception ex)
+      {
+         // ignored
+      }
+   }
+   /**
+    * Close all sockets in all pools
+    */
+   public static void clearPools()
+   {
+      synchronized (connectionPools)
+      {
+         Iterator it = connectionPools.keySet().iterator();
+         while (it.hasNext())
+         {
+            ServerAddress sa = (ServerAddress)it.next();
+            clearPool(sa);
+         }
+      }
+   }
+
+   protected void initPool()
+   {
+      synchronized (connectionPools)
+      {
+         pool = (BoundedLinkedQueue) connectionPools.get(address);
+         if (pool == null)
+         {
+            pool = new BoundedLinkedQueue(maxPoolSize);
+            connectionPools.put(address, pool);
+         }
+         if( maxActiveConnections == null )
+            maxActiveConnections = new FIFOSemaphore(maxPoolSize);
+      }
+   }
+
+   protected ClientSocket getConnection() throws Exception
+   {
+      Socket socket = null;
+      ClientSocket cs = null;
+
+      //
+      // Need to retry a few times
+      // on socket connection because, at least on Windoze,
+      // if too many concurrent threads try to connect
+      // at same time, you get ConnectionRefused
+      //
+      // Retrying seems to be the most performant.
+      //
+      // This problem always happens with RMI and seems to
+      // have nothing to do with backlog or number of threads
+      // waiting in accept() on the server.
+      // 
+      for (int i = 0; i < retryCount; i++)
+      {
+         ClientSocket pooled = getPooledConnection();
+         if (pooled != null)
+         {
+            usedPooled++;
+            inUseCount ++;
+            return pooled;
+         }
+
+         try
+         {
+            if( trace)
+            {
+               log.trace("Connecting to addr: "+address.address
+                  +", port: "+address.port
+                  +",clientSocketFactory: "+address.clientSocketFactory
+                  +",enableTcpNoDelay: "+address.enableTcpNoDelay
+                  +",timeout: "+address.timeout);
+            }
+            if( address.clientSocketFactory != null )
+               socket = address.clientSocketFactory.createSocket(address.address, address.port);
+            else
+               socket = new Socket(address.address, address.port);
+            socketConnectCount ++;
+            if( trace )
+               log.trace("Connected, socket="+socket);
+
+            socket.setTcpNoDelay(address.enableTcpNoDelay);
+            cs = new ClientSocket(socket, address.timeout);
+            inUseCount ++;
+            if( trace )
+            {
+               log.trace("New ClientSocket: "+cs
+                  +", usedPooled="+ usedPooled
+                  +", inUseCount="+ inUseCount
+                  +", socketConnectCount="+ socketConnectCount
+                  +", socketCloseCount="+ socketCloseCount
+               );
+            }
+            break;
+         }
+         catch (Exception ex)
+         {
+            if( ex instanceof InterruptedIOException || ex instanceof SocketException )
+            {
+               if( trace )
+                  log.trace("Connect failed", ex);
+               if (i + 1 < retryCount)
+               {
+                  Thread.sleep(1);
+                  continue;
+               }
+            }
+            throw ex;
+         }
+      }
+      // Should not happen
+      if( cs == null )
+         throw new ConnectException("Failed to obtain a socket, tries="+retryCount);
+      return cs;
+   }
+
+   protected synchronized ClientSocket getPooledConnection()
+   {
+      ClientSocket socket = null;
+      while (pool.size() > 0)
+      {
+         try
+         {
+            socket = (ClientSocket)pool.take();
+            // Test to see if socket is alive by send ACK message
+            if( trace )
+               log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
+            final byte ACK = 1;
+            socket.out.writeByte(ACK);
+            socket.out.flush();
+            socket.in.readByte();
+            if( trace )
+            {
+               log.trace("Using pooled ClientSocket: "+socket
+                  +", usedPooled="+ usedPooled
+                  +", inUseCount="+ inUseCount
+                  +", socketConnectCount="+ socketConnectCount
+                  +", socketCloseCount="+ socketCloseCount
+               );
+            }
+            return socket;
+         }
+         catch (Exception ex)
+         {
+            if( trace )
+               log.trace("Failed to validate pooled socket: "+socket, ex);
+            try
+            {
+               if( socket != null )
+               {
+                  socketCloseCount --;
+                  socket.socket.close();
+               }
+            }
+            catch (Exception ignored)
+            {
+            }
+            finally
+            {
+               socket.socket = null;
+            }
+         }
+      }
+      return null;
+   }
+
+   /**
+    * Return a socket to the pool
+    * @param socket
+    * @return true if socket was added to the pool, false if the pool
+    *    was full
+    */
+   protected synchronized boolean returnConnection(ClientSocket socket)
+   {
+      boolean pooled = false;
+      try
+      {
+         pool.offer(socket, 10*1000);
+         pooled = true;
+      }
+      catch (InterruptedException e)
+      {
+         if( trace )
+            log.trace("Failed to put socket back into pool due to InterruptedException");
+      }
+      inUseCount --;
+      return pooled;
+   }
+
+   /**
+    * The name of of the server.
+    */
+   public String getServerHostName() throws Exception
+   {
+      return address.address;
+   }
+
+   /**
+    * ???
+    *
+    * @todo MOVE TO TRANSACTION
+    *  
+    * @return the transaction propagation context of the transaction
+    *         associated with the current thread.
+    *         Returns <code>null</code> if the transaction manager was never
+    *         set, or if no transaction is associated with the current thread.
+    */
+   public Object getTransactionPropagationContext()
+      throws SystemException
+   {
+      return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext();
+   }
+
+
+   /**
+    * The invocation on the delegate, calls the right invoker.  Remote if we are remote, 
+    * local if we are local. 
+    */
+   public Object invoke(Invocation invocation)
+      throws Exception
+   {
+      boolean trace = log.isTraceEnabled();
+      // We are going to go through a Remote invocation, switch to a Marshalled Invocation
+      PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation);
+
+      // Set the transaction propagation context
+      //  @todo: MOVE TO TRANSACTION
+      mi.setTransactionPropagationContext(getTransactionPropagationContext());
+
+      Object response = null;
+      long start = System.currentTimeMillis();
+      maxActiveConnections.acquire();
+      try
+      {
+         ClientSocket socket = getConnection();
+         long end = System.currentTimeMillis() - start;
+         getSocketTime += end;
+         // Add the socket session if it exists
+         if( socket.sessionID != null )
+         {
+            mi.setValue("SESSION_ID", socket.sessionID);
+            if( trace )
+               log.trace("Added SESSION_ID to invocation");
+         }
+
+         try
+         {
+            if( trace )
+               log.trace("Sending invocation to: "+mi.getObjectName());
+            socket.out.writeObject(mi);
+            socket.out.reset();
+            socket.out.writeObject(Boolean.TRUE); // for stupid ObjectInputStream reset
+            socket.out.flush();
+            socket.out.reset();
+            end = System.currentTimeMillis() - start;
+            writeTime += end;
+            start = System.currentTimeMillis();
+            response = socket.in.readObject();
+            // to make sure stream gets reset
+            // Stupid ObjectInputStream holds object graph
+            // can only be set by the client/server sending a TC_RESET
+            socket.in.readObject();
+            end = System.currentTimeMillis() - start;
+            readTime += end;
+         }
+         catch (Exception ex)
+         {
+            if( trace )
+               log.trace("Failure during invoke", ex);
+            try
+            {
+               socketCloseCount --;
+               socket.socket.close();
+            }
+            catch (Exception ignored) {}
+            finally
+            {
+               socket.socket = null;
+            }
+            throw new java.rmi.ConnectException("Failure during invoke", ex);
+         }
+
+         // Put socket back in pool for reuse
+         if( returnConnection(socket) == false )
+         {
+            // Failed, close the socket
+            if( trace )
+               log.trace("Closing unpooled socket: "+socket);
+            try
+            {
+               socketCloseCount --;
+               socket.socket.close();
+            }
+            catch (Exception ignored) {}
+            finally
+            {
+               socket.socket = null;
+            }
+         }
+      }
+      finally
+      {
+         maxActiveConnections.release();
+      }
+
+      // Return response
+
+      try
+      {
+         if (response instanceof Exception)
+         {
+            throw ((Exception)response);
+         }
+         if (response instanceof MarshalledObject)
+         {
+            return ((MarshalledObject)response).get();
+         }
+         return response;
+      }
+      catch (ServerException ex)
+      {
+         // Suns RMI implementation wraps NoSuchObjectException in
+         // a ServerException. We cannot have that if we want
+         // to comply with the spec, so we unwrap here.
+         if (ex.detail instanceof NoSuchObjectException)
+         {
+            throw (NoSuchObjectException) ex.detail;
+         }
+         //likewise
+         if (ex.detail instanceof TransactionRolledbackException)
+         {
+            throw (TransactionRolledbackException) ex.detail;
+         }
+         throw ex;
+      }
+   }
+
+   /**
+    * Write out the serializable data
+    * @serialData address ServerAddress
+    * @serialData maxPoolSize int 
+    * @serialData WIRE_VERSION int version
+    * @serialData retryCount int
+    * @param out
+    * @throws java.io.IOException
+    */
+   public void writeExternal(final ObjectOutput out)
+      throws IOException
+   {
+      // The legacy wire format is address, maxPoolSize
+      out.writeObject(address);
+      out.writeInt(maxPoolSize);
+      // Write out the current version format and its data
+      out.writeInt(WIRE_VERSION);
+      out.writeInt(retryCount);
+   }
+
+   public void readExternal(final ObjectInput in)
+      throws IOException, ClassNotFoundException
+   {
+      trace = log.isTraceEnabled();
+      address = (ServerAddress)in.readObject();
+      maxPoolSize = in.readInt();
+      int version = 0;
+      try
+      {
+         version = in.readInt();
+      }
+      catch(EOFException e)
+      {
+         // No version written and there is no more data
+      }
+      catch(OptionalDataException e)
+      {
+         // No version written and there is data from other objects
+      }
+
+      switch( version )
+      {
+         case 0:
+            // This has no retryCount, default it to the hard-coded value
+            retryCount = MAX_RETRIES;
+            break;
+         case 1:
+            readVersion1(in);
+            break;
+         default:
+            /* Assume a newer version that only adds defaultable values.
+            The alternative would be to thrown an exception
+            */
+            break;
+      }
+      initPool();
+   }
+
+   private void readVersion1(final ObjectInput in)
+      throws IOException
+   {
+      retryCount = in.readInt();
+   }
+}

Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.invocation.pooled.interfaces;
+
+import org.jboss.invocation.Invocation;
+import org.jboss.invocation.InvocationType;
+import org.jboss.invocation.MarshalledInvocation;
+import org.jboss.invocation.MarshalledValue;
+
+import javax.transaction.Transaction;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.security.Principal;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * The MarshalledInvocation is an invocation that travels.  As such it
+ * serializes its payload because of lack of ClassLoader visibility. As such it
+ * contains Marshalled data representing the byte[] of the Invocation object it
+ * extends Besides handling the specifics of "marshalling" the payload, which
+ * could be done at the Invocation level the Marshalled Invocation can hold
+ * optimization and needed code for distribution for example the
+ * TransactionPropagationContext which is a serialization of the TX for
+ * distribution purposes as well as the "hash" for the methods that we send, as
+ * opposed to sending Method objects. Serialization "optimizations" should be
+ * coded here in the externalization implementation of the class
+ * 
+ * @author <a href="mailto:marc at jboss.org">Marc Fleury</a>
+ * @version $Revision$
+ */
+public class PooledMarshalledInvocation
+   extends MarshalledInvocation
+   implements java.io.Externalizable
+{
+   // Constants -----------------------------------------------------
+
+   /**
+    * Serial Version Identifier.
+    */
+   static final long serialVersionUID = -728630295444149842L;
+
+   private transient Transaction tx;
+   private transient Object credential;
+   private transient Principal principal;
+   private transient Object enterpriseContext;
+   private transient Object id;
+   private transient PooledMarshalledValue pooledMarshalledArgs;
+
+
+   // Constructors --------------------------------------------------
+   public PooledMarshalledInvocation()
+   {
+      // For externalization to work
+   }
+
+   public PooledMarshalledInvocation(Invocation invocation)
+   {
+      this.payload = invocation.payload;
+      this.as_is_payload = invocation.as_is_payload;
+      this.method = invocation.getMethod();
+      this.objectName = invocation.getObjectName();
+      this.args = invocation.getArguments();
+      this.invocationType = invocation.getType();
+   }
+
+
+   public PooledMarshalledInvocation(Object id,
+      Method m,
+      Object[] args,
+      Transaction tx,
+      Principal identity,
+      Object credential)
+   {
+      super(id, m, args, tx, identity, credential);
+   }
+   // Public --------------------------------------------------------
+
+
+
+   public Object getEnterpriseContext()
+   {
+      return enterpriseContext;
+   }
+
+   public void setEnterpriseContext(Object enterpriseContext)
+   {
+      this.enterpriseContext = enterpriseContext;
+   }
+
+   public Object getId()
+   {
+      if (id == null) id = super.getId();
+      return id;
+   }
+
+   public void setId(Object id)
+   {
+      super.setId(id);
+      this.id = id;
+   }
+
+   public void setTransaction(Transaction tx)
+   {
+      super.setTransaction(tx);
+      this.tx = tx;
+   }
+
+   public Transaction getTransaction()
+   {
+      if (tx == null) tx = super.getTransaction();
+      return this.tx;
+   }
+
+   public Object getCredential()
+   {
+      if (credential == null) credential = super.getCredential();
+      return credential;
+   }
+
+   public void setCredential(Object credential)
+   {
+      super.setCredential(credential);
+      this.credential = credential;
+   }
+
+   public Principal getPrincipal()
+   {
+      if (principal == null) principal = super.getPrincipal();
+      return principal;
+   }
+
+   public void setPrincipal(Principal principal)
+   {
+      super.setPrincipal(principal);
+      this.principal = principal;
+   }
+
+
+   public Object[] getArguments()
+   {
+      if (this.args == null)
+      {
+         try
+         {
+            this.args = (Object[]) pooledMarshalledArgs.get();
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+      return args;
+   }
+
+   // Externalizable implementation ---------------------------------
+   public void writeExternal(java.io.ObjectOutput out)
+      throws IOException
+   {
+      out.writeObject(invocationType);
+      // FIXME marcf: the "specific" treatment of Transactions should be abstracted.
+      // Write the TPC, not the local transaction
+      out.writeObject(tpc);
+
+      long methodHash = calculateHash(this.method);
+      out.writeLong(methodHash);
+
+      out.writeInt(((Integer) this.objectName).intValue());
+      out.writeObject(new PooledMarshalledValue(this.args));
+
+
+      // Write out payload hashmap
+      // Don't use hashmap serialization to avoid not-needed data being
+      // marshalled
+      // The map contains only serialized representations of every other object
+      // Everything else is possibly tied to classloaders that exist inside the
+      // server but not in the generic JMX land. they will travel in the  payload
+      // as MarshalledValue objects, see the Invocation getter logic
+      //
+      if (payload == null)
+         out.writeInt(0);
+      else
+      {
+         out.writeInt(payload.size());
+         Iterator keys = payload.keySet().iterator();
+         while (keys.hasNext())
+         {
+            Object currentKey = keys.next();
+
+            // This code could be if (object.getClass().getName().startsWith("java")) then don't serialize.
+            // Bench the above for speed.
+
+            out.writeObject(currentKey);
+            out.writeObject(new MarshalledValue(payload.get(currentKey)));
+         }
+      }
+
+      // This map is "safe" as is
+      //out.writeObject(as_is_payload);
+      if (as_is_payload == null)
+         out.writeInt(0);
+      else
+      {
+         out.writeInt(as_is_payload.size());
+
+         Iterator keys = as_is_payload.keySet().iterator();
+         while (keys.hasNext())
+         {
+            Object currentKey = keys.next();
+            out.writeObject(currentKey);
+            out.writeObject(as_is_payload.get(currentKey));
+         }
+      }
+   }
+
+   public void readExternal(java.io.ObjectInput in)
+      throws IOException, ClassNotFoundException
+   {
+      invocationType = (InvocationType) in.readObject();
+      tpc = in.readObject();
+      this.methodHash = in.readLong();
+
+      this.objectName = new Integer(in.readInt());
+
+      pooledMarshalledArgs = (PooledMarshalledValue) in.readObject();
+
+      int payloadSize = in.readInt();
+      if (payloadSize > 0)
+      {
+         payload = new HashMap();
+         for (int i = 0; i < payloadSize; i++)
+         {
+            Object key = in.readObject();
+            Object value = in.readObject();
+            payload.put(key, value);
+         }
+      }
+
+      int as_is_payloadSize = in.readInt();
+      if (as_is_payloadSize > 0)
+      {
+         as_is_payload = new HashMap();
+         for (int i = 0; i < as_is_payloadSize; i++)
+         {
+            Object key = in.readObject();
+            Object value = in.readObject();
+            as_is_payload.put(key, value);
+         }
+      }
+   }
+}

Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+/**
+ * A simple replacement for the RMI MarshalledObject that uses the thread
+ * context class loader for resolving classes and proxies. This currently does
+ * not support class annotations and dynamic class loading.
+ *
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class PooledMarshalledValue
+   implements java.io.Externalizable
+{
+   /** Serial Version Identifier. */
+   private static final long serialVersionUID = -1265347772036749711L;
+
+   /**
+    * The serialized form of the value. If <code>serializedForm</code> is
+    * <code>null</code> then the object marshalled was a <code>null</code>
+    * reference.
+    */
+   private byte[] serializedForm;
+   
+   /**
+    * The RMI MarshalledObject hash of the serializedForm array
+    */
+   private int hashCode;
+
+   /**
+    * Exposed for externalization.
+    */
+   public PooledMarshalledValue()
+   {
+      super();
+   }
+
+   public PooledMarshalledValue(Object obj) throws IOException
+   {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream mvos = new OptimizedObjectOutputStream(baos);
+      mvos.writeObject(obj);
+      mvos.flush();
+      serializedForm = baos.toByteArray();
+      mvos.close();
+      // Use the java.rmi.MarshalledObject hash code calculation
+      int hash = 0;
+      for (int i = 0; i < serializedForm.length; i++)
+      {
+         hash = 31 * hash + serializedForm[i];
+      }
+      
+      hashCode = hash;
+   }
+
+   public Object get() throws IOException, ClassNotFoundException
+   {
+      if (serializedForm == null)
+         return null;
+
+      ByteArrayInputStream bais = new ByteArrayInputStream(serializedForm);
+      ObjectInputStream mvis = new OptimizedObjectInputStream(bais);
+      Object retValue =  mvis.readObject();
+      mvis.close();
+      return retValue;
+   }
+
+   public byte[] toByteArray()
+   {
+      return serializedForm;
+   }
+
+   public int size()
+   {
+      int size = serializedForm != null ? serializedForm.length : 0;
+      return size;
+   }
+
+   /**
+    * Return a hash code for the serialized form of the value.
+    *
+    * @return the serialized form value hash.
+    */
+   public int hashCode()
+   {
+      return hashCode;
+   }
+
+   public boolean equals(Object obj)
+   {
+      if( this == obj )
+         return true;
+
+      boolean equals = false;
+      if( obj instanceof PooledMarshalledValue )
+      {
+         PooledMarshalledValue mv = (PooledMarshalledValue) obj;
+         if( serializedForm == mv.serializedForm )
+         {
+            equals = true;
+         }
+         else
+         {
+            equals = Arrays.equals(serializedForm, mv.serializedForm);
+         }
+      }
+      return equals;
+   }
+   
+   /**
+    * The object implements the readExternal method to restore its
+    * contents by calling the methods of DataInput for primitive
+    * types and readObject for objects, strings and arrays.  The
+    * readExternal method must read the values in the same sequence
+    * and with the same types as were written by writeExternal.
+    *
+    * @param in the stream to read data from in order to restore the object
+    * 
+    * @throws IOException              if I/O errors occur
+    * @throws ClassNotFoundException   If the class for an object being
+    *                                  restored cannot be found.
+    */
+   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+   {
+      int length = in.readInt();
+      serializedForm = null;
+      if( length > 0 )
+      {
+         serializedForm = new byte[length];
+         in.readFully(serializedForm);
+      }
+      hashCode = in.readInt();
+   }
+
+   /**
+    * The object implements the writeExternal method to save its contents
+    * by calling the methods of DataOutput for its primitive values or
+    * calling the writeObject method of ObjectOutput for objects, strings,
+    * and arrays.
+    *
+    * @serialData Overriding methods should use this tag to describe
+    *            the data layout of this Externalizable object.
+    *            List the sequence of element types and, if possible,
+    *            relate the element to a public/protected field and/or
+    *            method of this Externalizable class.
+    *
+    * @param out    the stream to write the object to
+    * 
+    * @throws IOException   Includes any I/O exceptions that may occur
+    */
+   public void writeExternal(ObjectOutput out) throws IOException
+   {
+      int length = serializedForm != null ? serializedForm.length : 0;
+      out.writeInt(length);
+      if( length > 0 )
+      {
+         out.write(serializedForm);
+      }
+      out.writeInt(hashCode);
+   }
+}

Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,14 +1,29 @@
-/***************************************
- *                                     *
- *  JBoss: The OpenSource J2EE WebOS   *
- *                                     *
- *  Distributable under LGPL license.  *
- *  See terms of license at gnu.org.   *
- *                                     *
- ***************************************/
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
 package org.jboss.invocation.pooled.interfaces;
 
 import java.io.Serializable;
+import java.io.IOException;
+import javax.net.SocketFactory;
 
 /**
  * This class encapsulates all the required information for a client to 
@@ -17,7 +32,8 @@
  * It also attempts to provide a fast hash() function since this object
  * is used as a key in a hashmap mainted by the ConnectionManager. 
  *
- * @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a>
+ * @author Bill Burke
+ * @author Scott.Stark at jboss.org
  * @version $Revision$
  */
 public class ServerAddress implements Serializable
@@ -26,24 +42,32 @@
    private static final long serialVersionUID = -7206359745950445445L;
 
    /**
-    * Address of host ot connect to
+    * Address of host to connect to
+    * @serial
     */
    public String address;
 
    /**
     * Port the service is listening on
+    * @serial
     */
    public int port;
 
    /**
     * If the TcpNoDelay option should be used on the socket.
+    * @serial
     */
    public boolean enableTcpNoDelay = false;
 
    /**
     * Timeout of setSoTimeout
+    * @serial
     */
    public int timeout = 60000;
+   /** An option socket factory for connecting to the server
+    * @serial
+    */
+   public SocketFactory clientSocketFactory;
 
    /**
     * This object is used as a key in a hashmap,
@@ -51,13 +75,26 @@
     */
    private transient int hashCode;
 
-   public ServerAddress(String address, int port, boolean enableTcpNoDelay, int timeout)
+   /**
+    * The server address/port representation.
+    * 
+    * @param address - hostname/ip of the server
+    * @param port - the invoker port
+    * @param enableTcpNoDelay - the Socket.setTcpNoDelay flag
+    * @param timeout - the Socket.setSoTimeout value
+    * @param clientSocketFactory - optional SocketFactory
+    */ 
+   public ServerAddress(String address, int port, boolean enableTcpNoDelay,
+      int timeout, SocketFactory clientSocketFactory)
    {
       this.address = address;
       this.port = port;
       this.enableTcpNoDelay = enableTcpNoDelay;
       this.hashCode = address.hashCode() + port;
+      if( enableTcpNoDelay )
+         this.hashCode ++;
       this.timeout = timeout;
+      this.clientSocketFactory = clientSocketFactory;
    }
 
    public String toString()
@@ -69,15 +106,14 @@
    {
       try
       {
+         // Compare this to obj
          ServerAddress o = (ServerAddress) obj;
-         if (o.hashCode != hashCode)
+         if (port != o.port)
             return false;
-         if (port != port)
+         if (address.equals(o.address) == false)
             return false;
-         if (!o.address.equals(address))
+         if (enableTcpNoDelay != o.enableTcpNoDelay)
             return false;
-         if (o.enableTcpNoDelay != enableTcpNoDelay)
-            return false;
          return true;
       }
       catch (Throwable e)
@@ -91,4 +127,20 @@
       return hashCode;
    }
 
+   /**
+    * Create the transient hashCode 
+    * @param in
+    * @throws IOException
+    * @throws ClassNotFoundException
+    */
+   private void readObject(java.io.ObjectInputStream in)
+     throws IOException, ClassNotFoundException
+   {
+      // Trigger default serialization
+      in.defaultReadObject();
+      // Build the hashCode
+      this.hashCode = address.hashCode() + port;
+      if( enableTcpNoDelay )
+         this.hashCode ++;
+   }
 }

Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,11 +1,9 @@
-/***************************************
- *                                     *
- *  JBoss: The OpenSource J2EE WebOS   *
- *                                     *
- *  Distributable under LGPL license.  *
- *  See terms of license at gnu.org.   *
- *                                     *
- ***************************************/
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
 
 package org.jboss.invocation.pooled.server;
 
@@ -17,7 +15,7 @@
  * it makes sure to call shutdown on the pooled ServerThread
  *
  * @author    <a href="mailto:bill at jboss.org">Bill Burke</a>
- *
+ * @version $Revision$
  */
 public class LRUPool extends LRUCachePolicy
 {

Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,38 +1,43 @@
-/***************************************
- *                                     *
- *  JBoss: The OpenSource J2EE WebOS   *
- *                                     *
- *  Distributable under LGPL license.  *
- *  See terms of license at gnu.org.   *
- *                                     *
- ***************************************/
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
 
 package org.jboss.invocation.pooled.server;
 
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.util.LinkedList;
 import java.security.PrivilegedExceptionAction;
 import java.security.AccessController;
 import java.security.PrivilegedActionException;
+import java.lang.reflect.Method;
+import java.rmi.NoSuchObjectException;
 import javax.management.ObjectName;
 import javax.naming.InitialContext;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import javax.net.SocketFactory;
+import javax.net.ServerSocketFactory;
 
 import org.jboss.invocation.Invocation;
-import org.jboss.invocation.MarshalledInvocation;
 import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
 import org.jboss.invocation.pooled.interfaces.ServerAddress;
-import org.jboss.invocation.jrmp.interfaces.JRMPInvokerProxy;
+import org.jboss.invocation.pooled.interfaces.PooledMarshalledInvocation;
 import org.jboss.logging.Logger;
 import org.jboss.proxy.TransactionInterceptor;
 import org.jboss.system.Registry;
 import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.system.server.ServerConfigUtil;
 import org.jboss.tm.TransactionPropagationContextFactory;
 import org.jboss.tm.TransactionPropagationContextImporter;
 import org.jboss.tm.TransactionPropagationContextUtil;
+import org.jboss.security.SecurityDomain;
+import org.jboss.net.sockets.DefaultSocketFactory;
 
 /**
  * This invoker pools Threads and client connections to one server socket.
@@ -50,9 +55,10 @@
  * on the server side.  Pool, is an LRU pool, so resources should be cleaned up.
  * 
  *
- * @author    <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Scott.Stark at jboss.org
+ * @version $Revision:$
  *
- * @jmx:mbean extends="org.jboss.system.ServiceMBean"
  */
 public class PooledInvoker extends ServiceMBeanSupport
    implements PooledInvokerMBean, Runnable
@@ -87,11 +93,30 @@
     * The port a client will use to connect to the sever.
     */
    protected int clientConnectPort = 0;
+   /**
+    * The number of retry attempts on 
+    */ 
+   protected int clientRetryCount = 1;
 
    protected int backlog = 200;
 
+   /** The class name of the optional custom client socket factory */
+   protected String clientSocketFactoryName;
+
+   /** The class name of the optional custom server socket factory */
+   protected String serverSocketFactoryName;
+
+   /** An optional custom client socket factory */
+   protected SocketFactory clientSocketFactory;
+
+   /** An optional custom server socket factory */
+   protected ServerSocketFactory serverSocketFactory;
+   /** The server socket for */
    protected ServerSocket serverSocket = null;
 
+   /** The name of the security domain to use with server sockets that support SSL */
+   protected String sslDomain;
+
    protected int timeout = 60000; // 60 seconds.
 
    protected int maxPoolSize = 300;
@@ -104,6 +129,8 @@
    protected LRUPool clientpool;
    protected LinkedList threadpool;
    protected boolean running = true;
+   /** The logging trace level flag */
+   protected boolean trace = false;
    /**
     * ObjectName of the <code>transactionManagerService</code> we use.
     * Probably should not be here -- used to set txInterceptor tx mananger.
@@ -122,16 +149,7 @@
    // The following methods Override the ServiceMBeanSupport base class
    //
    ////////////////////////////////////////////////////////////////////////
-   /**
-    * Gives this JMX service a name.
-    * @return   The Name value
-    */
-   public String getName()
-   {
-      return "Optimized-Invoker";
-   }
 
-
    protected void jmxBind()
    {
       Registry.bind(getServiceName(), optimizedInvokerProxy);
@@ -144,6 +162,7 @@
     */
    public void startService() throws Exception
    {
+      trace = log.isTraceEnabled();
 
       ///////////////////////////////////////////////////////////      
       // Setup the transaction stuff
@@ -172,16 +191,28 @@
          (clientConnectAddress == null || clientConnectAddress.length() == 0)
             ? InetAddress.getLocalHost().getHostName()
             : clientConnectAddress;
+      /* We need to check the address against "0.0.0.0" as this is not a valid
+      address although some jdks will default to the host, while others fail
+      with java.net.BindException: Cannot assign requested address: connect
+      */
+      clientConnectAddress = ServerConfigUtil.fixRemoteAddress(clientConnectAddress);
 
-
       clientpool = new LRUPool(2, maxPoolSize);
       clientpool.create();
       threadpool = new LinkedList();
-      serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
+       try
+       {
+           serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
+       }catch( java.net.BindException be)
+       {
+           throw new Exception("Port "+serverBindPort+" is already in use",be);
+       }
+       serverBindPort = serverSocket.getLocalPort();
       clientConnectPort = (clientConnectPort == 0) ? serverSocket.getLocalPort() : clientConnectPort;
 
-      ServerAddress sa = new ServerAddress(clientConnectAddress, clientConnectPort, enableTcpNoDelay, timeout); 
-      optimizedInvokerProxy = new PooledInvokerProxy(sa, clientMaxPoolSize);
+      ServerAddress sa = new ServerAddress(clientConnectAddress, clientConnectPort,
+         enableTcpNoDelay, timeout, clientSocketFactory);
+      optimizedInvokerProxy = new PooledInvokerProxy(sa, clientMaxPoolSize, clientRetryCount);
 
       ///////////////////////////////////////////////////////////      
       // Register the service with the rest of the JBoss Kernel
@@ -194,20 +225,21 @@
       acceptThreads = new Thread[numAcceptThreads];
       for (int i = 0; i < numAcceptThreads; i++)
       {
-         acceptThreads[i] = new Thread(this);
+         String name = "PooledInvokerAcceptor#"+i+"-"+serverBindPort;
+         acceptThreads[i] = new Thread(this, name);
          acceptThreads[i].start();
       }
    }
 
    public void run()
    {
-      
       while (running)
       {
          try
          {
             Socket socket = serverSocket.accept();
-            //System.out.println("Thread accepted: " + Thread.currentThread());
+            if( trace )
+               log.trace("Accepted: "+socket);
             ServerThread thread = null;
             boolean newThread = false;
             
@@ -232,9 +264,11 @@
                      if (thread == null)
                      {
                         clientpool.evict();
-                        log.debug("**** WAITING *****");
+                        if( trace )
+                           log.trace("Waiting for a thread...");
                         clientpool.wait();
-                        log.debug("**** WOKE UP *****");
+                        if( trace )
+                           log.trace("Notified of available thread");
                      }
                   }
                }
@@ -246,16 +280,18 @@
             
             if (newThread)
             {
-               log.debug("**** ACQUIRED NEW *****");
+               if( trace )
+                  log.trace("Created a new thread, t="+thread);
                thread.start();
             }
             else
             {
-               log.debug("**** ACQUIRED OLD *****");
+               if( trace )
+                  log.trace("Reusing thread t="+thread);
                thread.wakeup(socket, timeout);
             }
          }
-         catch (Exception ex)
+         catch (Throwable ex)
          {
             if (running)
                log.error("Failed to accept socket connection", ex);
@@ -312,9 +348,14 @@
       {
 
          // Deserialize the transaction if it is there
-         MarshalledInvocation mi = (MarshalledInvocation) invocation;
+         PooledMarshalledInvocation mi = (PooledMarshalledInvocation) invocation;
          invocation.setTransaction(importTPC(mi.getTransactionPropagationContext()));
          ObjectName mbean = (ObjectName) Registry.lookup(invocation.getObjectName());
+         if( mbean == null )
+         {
+            System.err.println("NoSuchObjectException: "+invocation.getObjectName());
+            throw new NoSuchObjectException("Failed to find target for objectName: "+invocation.getObjectName());
+         }
 
          // The cl on the thread should be set in another interceptor
          Object obj = serverAction.invoke(mbean, "invoke",
@@ -380,7 +421,7 @@
    /**
     * Setter for property maxPoolSize.
     *
-    * @param maxPoolSize New value of property serverBindPort.
+    * @param maxPoolSize New value of property maxPoolSize.
     * @jmx:managed-attribute
     */
    public void setMaxPoolSize(int maxPoolSize)
@@ -434,7 +475,7 @@
 
    /**
     *
-    * @return Value of property serverBindPort.
+    * @return Value of property CurrentClientPoolSize.
     * @jmx:managed-attribute
     */
    public int getCurrentClientPoolSize()
@@ -444,7 +485,7 @@
 
    /**
     *
-    * @return Value of property serverBindPort.
+    * @return Value of property CurrentThreadPoolSize.
     * @jmx:managed-attribute
     */
    public int getCurrentThreadPoolSize()
@@ -509,6 +550,22 @@
    /**
     * @jmx:managed-attribute
     */
+   public int getClientRetryCount()
+   {
+      return clientRetryCount;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setClientRetryCount(int clientRetryCount)
+   {
+      this.clientRetryCount = clientRetryCount;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
    public int getBacklog()
    {
       return backlog;
@@ -554,8 +611,102 @@
       this.serverBindAddress = serverBindAddress;
    }
 
+   /**
+    * @jmx:managed-attribute
+    */
+   public String getClientSocketFactoryName()
+   {
+      return clientSocketFactoryName;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setClientSocketFactoryName(String clientSocketFactoryName)
+   {
+      this.clientSocketFactoryName = clientSocketFactoryName;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public String getServerSocketFactoryName()
+   {
+      return serverSocketFactoryName;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setServerSocketFactoryName(String serverSocketFactoryName)
+   {
+      this.serverSocketFactoryName = serverSocketFactoryName;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public SocketFactory getClientSocketFactory()
+   {
+      return clientSocketFactory;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setClientSocketFactory(SocketFactory clientSocketFactory)
+   {
+      this.clientSocketFactory = clientSocketFactory;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public ServerSocket getServerSocket()
+   {
+      return serverSocket;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setServerSocket(ServerSocket serverSocket)
+   {
+      this.serverSocket = serverSocket;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public String getSslDomain()
+   {
+      return sslDomain;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setSslDomain(String sslDomain)
+   {
+      this.sslDomain = sslDomain;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public ServerSocketFactory getServerSocketFactory()
+   {
+      return serverSocketFactory;
+   }
+
+   /**
+    * @jmx:managed-attribute
+    */
+   public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
+   {
+      this.serverSocketFactory = serverSocketFactory;
+   }
    
-   
    /**
     * mbean get-set pair for field transactionManagerService
     * Get the value of transactionManagerService
@@ -588,6 +739,101 @@
       return optimizedInvokerProxy;
    }
 
+   /** Load and instantiate the clientSocketFactory, serverSocketFactory using
+    the TCL and set the bind address and SSL domain if the serverSocketFactory
+    supports it.
+   */
+   protected void loadCustomSocketFactories()
+   {
+      ClassLoader loader = Thread.currentThread().getContextClassLoader();
+
+      try
+      {
+         if( clientSocketFactoryName != null )
+         {
+            Class csfClass = loader.loadClass(clientSocketFactoryName);
+            clientSocketFactory = (SocketFactory) csfClass.newInstance();
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("Failed to load client socket factory", e);
+         clientSocketFactory = null;
+      }
+
+      try
+      {
+         if( serverSocketFactoryName != null )
+         {
+            Class ssfClass = loader.loadClass(serverSocketFactoryName);
+            serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance();
+            if( serverBindAddress != null )
+            {
+               // See if the server socket supports setBindAddress(String)
+               try
+               {
+                  Class[] parameterTypes = {String.class};
+                  Method m = ssfClass.getMethod("setBindAddress", parameterTypes);
+                  Object[] args = {serverBindAddress};
+                  m.invoke(serverSocketFactory, args);
+               }
+               catch (NoSuchMethodException e)
+               {
+                  log.warn("Socket factory does not support setBindAddress(String)");
+                  // Go with default address
+               }
+               catch (Exception e)
+               {
+                  log.warn("Failed to setBindAddress="+serverBindAddress+" on socket factory", e);
+                  // Go with default address
+               }
+            }
+            /* See if the server socket supports setSecurityDomain(SecurityDomain)
+            if an sslDomain was specified
+            */
+            if( sslDomain != null )
+            {
+               try
+               {
+                  InitialContext ctx = new InitialContext();
+                  SecurityDomain domain = (SecurityDomain) ctx.lookup(sslDomain);
+                  Class[] parameterTypes = {SecurityDomain.class};
+                  Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes);
+                  Object[] args = {domain};
+                  m.invoke(serverSocketFactory, args);
+               }
+               catch(NoSuchMethodException e)
+               {
+                  log.error("Socket factory does not support setSecurityDomain(SecurityDomain)");
+               }
+               catch(Exception e)
+               {
+                  log.error("Failed to setSecurityDomain="+sslDomain+" on socket factory", e);
+               }
+            }
+         }
+         // If a bind address was specified create a DefaultSocketFactory
+         else if( serverBindAddress != null )
+         {
+            DefaultSocketFactory defaultFactory = new DefaultSocketFactory(backlog);
+            serverSocketFactory = defaultFactory;
+            try
+            {
+               defaultFactory.setBindAddress(serverBindAddress);
+            }
+            catch (UnknownHostException e)
+            {
+               log.error("Failed to setBindAddress="+serverBindAddress+" on socket factory", e);
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         log.error("operation failed", e);
+         serverSocketFactory = null;
+      }
+   }
+
    /** Perform the MBeanServer.invoke op in a PrivilegedExceptionAction if
     * running with a security manager.
     */ 

Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,183 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.invocation.pooled.server;
+
+import javax.management.ObjectName;
+
+import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
+
+/**
+ * The PooledInvoker standard MBean interface.
+ * @author Bill Burke
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public interface PooledInvokerMBean extends org.jboss.system.ServiceMBean
+{
+
+   /**
+    * Getter for property numAcceptThreads
+    * @return Value of property numAcceptThreads
+    */
+   int getNumAcceptThreads();
+
+   /**
+    * Setter for property numAcceptThreads
+    * @param size New value of property numAcceptThreads.
+    */
+   void setNumAcceptThreads(int size);
+
+   /**
+    * Getter for property maxPoolSize;
+    * @return Value of property maxPoolSize.
+    */
+   int getMaxPoolSize();
+
+   /**
+    * Setter for property maxPoolSize.
+    * @param maxPoolSize New value of property serverBindPort.
+    */
+   void setMaxPoolSize(int maxPoolSize);
+
+   /**
+    * Getter for property maxPoolSize;
+    * @return Value of property maxPoolSize.
+    */
+   int getClientMaxPoolSize();
+
+   /**
+    * Setter for property maxPoolSize.
+    * @param clientMaxPoolSize New value of property serverBindPort.
+    */
+   void setClientMaxPoolSize(int clientMaxPoolSize);
+
+   /**
+    * Getter for property timeout
+    * @return Value of property timeout
+    */
+   int getSocketTimeout();
+
+   /**
+    * Setter for property timeout
+    * @param time New value of property timeout
+    */
+   void setSocketTimeout(int time);
+
+   /**
+    * @return Current client connection pool size
+    */ 
+   int getCurrentClientPoolSize();
+
+   /**
+    * @return current connection thread pool size
+    */ 
+   int getCurrentThreadPoolSize();
+
+   /**
+    * Getter for property serverBindPort.
+    * @return Value of property serverBindPort.
+    */
+   int getServerBindPort();
+
+   /**
+    * Setter for property serverBindPort.
+    * @param serverBindPort New value of property serverBindPort.
+    */
+   void setServerBindPort(int serverBindPort);
+
+   /**
+    * @return the address the client proxy connects to 
+    */ 
+   String getClientConnectAddress();
+   /**
+    * Set the clientConnectAddress
+    * @param clientConnectAddress - address the client proxy connects to 
+    */ 
+   void setClientConnectAddress(java.lang.String clientConnectAddress);
+
+   /**
+    * @return the client local bind port
+    */ 
+   int getClientConnectPort();
+   /**
+    * @param clientConnectPort - the client local bind port
+    */ 
+   void setClientConnectPort(int clientConnectPort);
+
+   /**
+    * @return the number of connect retries
+    */ 
+   public int getClientRetryCount();
+   /**
+    * @param clientRetryCount - the number of connect retries
+    */ 
+   public void setClientRetryCount(int clientRetryCount);
+
+   /**
+    * @return the server accept backlog
+    */ 
+   int getBacklog();
+   /**
+    * @param backlog - the server accept backlog
+    */ 
+   void setBacklog(int backlog);
+
+   /**
+    * @return Socket.setTcpNoDelay flag
+    */ 
+   boolean isEnableTcpNoDelay();
+   /**
+    * Socket.setTcpNoDelay flag
+    * @param enableTcpNoDelay
+    */ 
+   void setEnableTcpNoDelay(boolean enableTcpNoDelay);
+
+   String getServerBindAddress();
+
+   void setServerBindAddress(String serverBindAddress);
+
+   /**
+    * mbean get-set pair for field transactionManagerService Get the value of
+    * transactionManagerService
+    * @return value of transactionManagerService
+    */
+   ObjectName getTransactionManagerService();
+
+   /**
+    * Set the value of transactionManagerService
+    * @param transactionManagerService Value to assign to transactionManagerService
+    */
+   void setTransactionManagerService(ObjectName transactionManagerService);
+
+   PooledInvokerProxy getOptimizedInvokerProxy();
+
+   /**
+    * Set the client socket factory implementation
+    * @return the javax.net.SocketFactory implementation class name
+    */ 
+   public String getClientSocketFactoryName();
+   /**
+    * Set the client factory implementation
+    * @param factoryName - the javax.net.SocketFactory implementation class name
+    */ 
+   public void setClientSocketFactoryName(String factoryName);
+
+   /**
+    * Set the server socket factory implementation
+    * @return the javax.net.ServerSocketFactory implementation class name
+    */ 
+   public String getServerSocketFactoryName();
+   /**
+    * Set the server factory implementation
+    * @param factoryName - the javax.net.ServerSocketFactory implementation class name
+    */ 
+   public void setServerSocketFactoryName(String factoryName);
+
+   public String getSslDomain();
+   public void setSslDomain(String sslDomain);
+   
+}

Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java	2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java	2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,27 +1,24 @@
-/***************************************
- *                                     *
- *  JBoss: The OpenSource J2EE WebOS   *
- *                                     *
- *  Distributable under LGPL license.  *
- *  See terms of license at gnu.org.   *
- *                                     *
- ***************************************/
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
 
 package org.jboss.invocation.pooled.server;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.InterruptedIOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.net.Socket;
-import java.rmi.MarshalledObject;
+import java.util.LinkedList;
+
 import org.jboss.invocation.Invocation;
+import org.jboss.invocation.pooled.interfaces.OptimizedObjectInputStream;
+import org.jboss.invocation.pooled.interfaces.OptimizedObjectOutputStream;
 import org.jboss.logging.Logger;
-import java.util.LinkedList;
 
 /**
  * This Thread object hold a single Socket connection to a client
@@ -38,6 +35,7 @@
  * done by the TrunkInvoker.
  *
  * @author    <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @version $Revision$
  */
 public class ServerThread extends Thread
 {
@@ -54,9 +52,16 @@
    protected volatile boolean shutdown = false;
    protected static int id = 0;
 
-   public ServerThread(Socket socket, PooledInvoker invoker, LRUPool clientpool, LinkedList threadpool, int timeout) throws Exception
+   public static synchronized int nextID()
    {
-      super("PooledInvokerThread-" + id++);
+      int nextID = id ++;
+      return nextID;
+   }
+
+   public ServerThread(Socket socket, PooledInvoker invoker, LRUPool clientpool,
+      LinkedList threadpool, int timeout) throws Exception
+   {
+      super("PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID());
       this.socket = socket;
       this.invoker = invoker;
       this.clientpool = clientpool;
@@ -81,7 +86,7 @@
          try
          {
             this.interrupt();
-            this.interrupted(); // clear
+            Thread.interrupted(); // clear
          }
          catch (Exception ignored) {}
       }
@@ -107,7 +112,7 @@
          try
          {
             this.interrupt();
-            this.interrupted(); // clear
+            Thread.interrupted(); // clear
          }
          catch (Exception ignored) {}
       }
@@ -117,6 +122,8 @@
    public synchronized void wakeup(Socket socket, int timeout) throws Exception
    {
       this.socket = socket;
+      String name = "PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID();
+      super.setName(name);
       socket.setSoTimeout(timeout);
       running = true;
       handlingResponse = true;
@@ -155,6 +162,7 @@
                         clientpool.remove(this);
                         //System.out.println("adding myself to threadpool");
                         threadpool.add(this);
+                        Thread.interrupted(); // clear any interruption so that we can be pooled.
                         clientpool.notify();
                      }
                   }
@@ -200,12 +208,15 @@
       Object response = null;
       try
       {
+          // Make absolutely sure thread interrupted is cleared.
+         boolean interrupted = Thread.interrupted();
          response = invoker.invoke(invocation);
       }
       catch (Exception ex)
       {
          response = ex;
       }
+      Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
       out.writeObject(response);
       out.reset();
       // to make sure stream gets reset
@@ -227,9 +238,11 @@
       handlingResponse = true;
       try
       {
-         out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+         BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
+         out = new OptimizedObjectOutputStream(bos);
          out.flush();
-         in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+         BufferedInputStream bis = new BufferedInputStream(socket.getInputStream());
+         in = new OptimizedObjectInputStream(bis);
       }
       catch (Exception e)
       {
@@ -246,7 +259,6 @@
          running = false;
       }
 
-      //System.out.println("****entering re-use loop");
       // Re-use loop
       while (running)
       {
@@ -255,22 +267,22 @@
             acknowledge();
             processInvocation();
          }
-         catch (InterruptedIOException iex)
+         catch (InterruptedIOException e)
          {
-            //System.out.println("exception found!");
-            log.debug("socket timed out");
+            log.debug("socket timed out", e);
             running = false;
          }
-         catch (InterruptedException intr)
+         catch (InterruptedException e)
          {
-            log.debug("interrupted");
+            log.debug("interrupted", e);
          }
          catch (Exception ex)
          {
-            //System.out.println("exception found!");
-            //log.error("failed", ex);
+            log.debug("failed", ex);
             running = false;
          }
+         // clear any interruption so that thread can be pooled.
+         Thread.interrupted();
       }
       //System.out.println("finished loop:" + Thread.currentThread());
       // Ok, we've been shutdown.  Do appropriate cleanups.




More information about the jboss-cvs-commits mailing list