[Jboss-cvs] JBossAS SVN: r56498 - in branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled: interfaces server
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Sep 1 02:59:01 EDT 2006
Author: scott.stark at jboss.org
Date: 2006-09-01 02:58:56 -0400 (Fri, 01 Sep 2006)
New Revision: 56498
Added:
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java
Modified:
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java
branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java
Log:
Merge the JBAS-3300 pooled invoker patch
Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectInputStream.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,185 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.ref.WeakReference;
+
+import org.jboss.util.collection.WeakValueHashMap;
+
+import org.jboss.logging.Logger;
+import org.jboss.invocation.MarshalledValueInputStream;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+
+/**
+ * An ObjectInputStream subclass used by the MarshalledValue class to
+ * ensure the classes and proxies are loaded using the thread context
+ * class loader.
+ *
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class OptimizedObjectInputStream
+ extends ObjectInputStream
+{
+ private static Logger log = Logger.getLogger(OptimizedObjectInputStream.class);
+ /** A class wide cache of proxy classes populated by resolveProxyClass */
+ private static ConcurrentReaderHashMap classCache;
+ private static ConcurrentReaderHashMap objectStreamClassCache;
+ private static Method lookupStreamClass = null;
+
+ static
+ {
+ useClassCache(true);
+ try
+ {
+ lookupStreamClass = ObjectStreamClass.class.getDeclaredMethod("lookup", new Class[]{Class.class, boolean.class});
+ lookupStreamClass.setAccessible(true);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ /** Enable local caching of resolved proxy classes. This can only be used
+ * if there is a single ULR and no redeployment of the proxy classes.
+ *
+ * @param flag true to enable caching, false to disable it
+ */
+ public static void useClassCache(boolean flag)
+ {
+ if (flag == true)
+ {
+ classCache = new ConcurrentReaderHashMap();
+ objectStreamClassCache = new ConcurrentReaderHashMap();
+ }
+ else
+ {
+ classCache = null;
+ objectStreamClassCache = null;
+ }
+ }
+
+ /** Clear the current proxy cache.
+ *
+ */
+ public static void flushClassCache()
+ {
+ classCache.clear();
+ objectStreamClassCache.clear();
+ }
+
+ private static Class forName(String className) throws ClassNotFoundException
+ {
+ Class clazz = null;
+ if (classCache != null)
+ {
+ WeakReference ref = (WeakReference) classCache.get(className);
+ if (ref != null) clazz = (Class) ref.get();
+ if (clazz == null)
+ {
+ if (ref != null) classCache.remove(className);
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ try
+ {
+ clazz = loader.loadClass(className);
+ }
+ catch (ClassNotFoundException e)
+ {
+ /* Use the Class.forName call which will resolve array classes. We
+ do not use this by default as this can result in caching of stale
+ values across redeployments.
+ */
+ clazz = Class.forName(className, false, loader);
+ }
+ classCache.put(className, new WeakReference(clazz));
+ }
+ }
+ else
+ {
+ clazz = Thread.currentThread().getContextClassLoader().loadClass(className);
+ }
+ return clazz;
+ }
+
+ /**
+ * Creates a new instance of MarshalledValueOutputStream
+ */
+ public OptimizedObjectInputStream(InputStream is) throws IOException
+ {
+ super(is);
+ }
+
+ protected static ObjectStreamClass lookup(Class clazz)
+ {
+ Object[] args = {clazz, Boolean.TRUE};
+ try
+ {
+ return (ObjectStreamClass) lookupStreamClass.invoke(null, args);
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ return null;
+ }
+
+ protected ObjectStreamClass readClassDescriptor()
+ throws IOException, ClassNotFoundException
+ {
+ String className = readUTF();
+ ObjectStreamClass osc = null;
+ if (objectStreamClassCache != null)
+ {
+ osc = (ObjectStreamClass) objectStreamClassCache.get(className);
+ }
+ if (osc == null)
+ {
+ Class clazz = forName(className);
+ osc = ObjectStreamClass.lookup(clazz);
+ if (osc == null) osc = lookup(clazz);
+ if (osc == null) throw new IOException("Unable to readClassDescriptor for class " + className);
+ if (objectStreamClassCache != null) objectStreamClassCache.put(className, osc);
+ }
+ return osc;
+ }
+
+ /**
+ * Use the thread context class loader to resolve the class
+ *
+ * @throws IOException Any exception thrown by the underlying OutputStream.
+ */
+ protected Class resolveClass(ObjectStreamClass v)
+ throws IOException, ClassNotFoundException
+ {
+ String className = v.getName();
+ return forName(className);
+ }
+
+ protected Class resolveProxyClass(String[] interfaces)
+ throws IOException, ClassNotFoundException
+ {
+ // Load the interfaces from the cache or thread context class loader
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class[] ifaceClasses = new Class[interfaces.length];
+ for (int i = 0; i < interfaces.length; i++)
+ {
+ String className = interfaces[i];
+ Class iface = forName(className);
+ ifaceClasses[i] = iface;
+ }
+
+ return Proxy.getProxyClass(loader, ifaceClasses);
+ }
+}
Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/OptimizedObjectOutputStream.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.rmi.Remote;
+import java.rmi.server.RemoteObject;
+import java.rmi.server.RemoteStub;
+
+/**
+ * An ObjectOutputStream subclass used by the MarshalledValue class to
+ * ensure the classes and proxies are loaded using the thread context
+ * class loader. Currently this does not do anything as neither class or
+ * proxy annotations are used.
+ *
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class OptimizedObjectOutputStream
+ extends ObjectOutputStream
+{
+ /** Creates a new instance of MarshalledValueOutputStream
+ If there is a security manager installed, this method requires a
+ SerializablePermission("enableSubstitution") permission to ensure it's
+ ok to enable the stream to do replacement of objects in the stream.
+ */
+ public OptimizedObjectOutputStream(OutputStream os) throws IOException
+ {
+ super(os);
+ enableReplaceObject(true);
+ }
+
+ /**
+ * Writes just the class name to this output stream.
+ *
+ * @param classdesc class description object
+ */
+ protected void writeClassDescriptor(ObjectStreamClass classdesc)
+ throws IOException
+ {
+ writeUTF(classdesc.getName());
+ }
+
+ /** Override replaceObject to check for Remote objects that are
+ not RemoteStubs.
+ */
+ protected Object replaceObject(Object obj) throws IOException
+ {
+ if( (obj instanceof Remote) && !(obj instanceof RemoteStub) )
+ {
+ Remote remote = (Remote) obj;
+ try
+ {
+ obj = RemoteObject.toStub(remote);
+ }
+ catch(IOException ignore)
+ {
+ // Let the Serialization layer try with the orignal obj
+ }
+ }
+ return obj;
+ }
+}
Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,10 +1,24 @@
/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
package org.jboss.invocation.pooled.interfaces;
import java.io.IOException;
@@ -15,44 +29,59 @@
import java.io.BufferedInputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.IOException;
+import java.io.EOFException;
+import java.io.OptionalDataException;
+import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
import java.net.Socket;
+import java.net.SocketException;
import java.rmi.MarshalledObject;
import java.rmi.NoSuchObjectException;
import java.rmi.ServerException;
import java.rmi.ConnectException;
-import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
+import java.util.List;
import java.util.LinkedList;
import javax.transaction.TransactionRolledbackException;
import javax.transaction.SystemException;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.HandshakeCompletedListener;
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.net.ssl.SSLException;
import org.jboss.invocation.Invocation;
import org.jboss.invocation.Invoker;
-import org.jboss.invocation.MarshalledInvocation;
import org.jboss.tm.TransactionPropagationContextFactory;
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+
/**
* Client socket connections are pooled to avoid the overhead of
* making a connection. RMI seems to do a new connection with each
* request.
*
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Scott.Stark at jboss.org
* @version $Revision$
*/
public class PooledInvokerProxy
implements Invoker, Externalizable
{
// Attributes ----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PooledInvokerProxy.class);
/** The serialVersionUID @since 1.1.4.3 */
private static final long serialVersionUID = -1456509931095566410L;
+ /** The current wire format we write */
+ private static final int WIRE_VERSION = 1;
/**
* Factory for transaction propagation contexts.
*
- * @todo: marcf remove all transaction spill from here
+ * @todo marcf remove all transaction spill from here
*
* When set to a non-null value, it is used to get transaction
* propagation contexts for remote method invocations.
@@ -60,7 +89,7 @@
* remote method invocations.
*/
protected static TransactionPropagationContextFactory tpcFactory = null;
-
+
// @todo: MOVE TO TRANSACTION
//
// TPC factory
@@ -68,77 +97,249 @@
tpcFactory = tpcf;
}
- // Performance measurements
+ // Simple performance measurements, not thread safe
public static long getSocketTime = 0;
public static long readTime = 0;
public static long writeTime = 0;
public static long serializeTime = 0;
public static long deserializeTime = 0;
+ /** The number of times a connection has been obtained from a pool */
public static long usedPooled = 0;
+ /** The number of connections in use */
+ private static int inUseCount = 0;
+ /** The number of socket connections made */
+ private static long socketConnectCount = 0;
+ /** The number of socket close calls made */
+ private static long socketCloseCount = 0;
-
/**
* Set number of retries in getSocket method
*/
public static int MAX_RETRIES = 10;
+ /** A class wide pool Map<ServerAddres, LinkedList<ClientSocket>> */
+ protected static final Map connectionPools = new ConcurrentReaderHashMap();
- protected static HashMap connectionPools = new HashMap();
-
/**
* connection information
*/
protected ServerAddress address;
-
+
/**
* Pool for this invoker. This is shared between all
* instances of proxies attached to a specific invoker
+ * This should not be serializable, but is for backward compatibility.
*/
protected LinkedList pool = null;
+ /** */
protected int maxPoolSize;
+ /** The number of times to retry after seeing a ConnectionException */
+ protected int retryCount = 1;
+ /** The logging trace flag */
+ private transient boolean trace;
+ /**
+ * An encapsulation of a client connection
+ */
protected static class ClientSocket
+ implements HandshakeCompletedListener
{
public ObjectOutputStream out;
public ObjectInputStream in;
public Socket socket;
public int timeout;
+ public String sessionID;
+ private boolean handshakeComplete = false;
+ private boolean trace;
+
public ClientSocket(Socket socket, int timeout) throws Exception
{
this.socket = socket;
+ trace = log.isTraceEnabled();
+ boolean needHandshake = false;
+
+ if( socket instanceof SSLSocket )
+ {
+ SSLSocket ssl = (SSLSocket) socket;
+ ssl.addHandshakeCompletedListener(this);
+ if( trace )
+ log.trace("Starting SSL handshake");
+ needHandshake = true;
+ handshakeComplete = false;
+ ssl.startHandshake();
+ }
socket.setSoTimeout(timeout);
this.timeout = timeout;
- out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+ out = new OptimizedObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
out.flush();
- in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+ in = new OptimizedObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+ if( needHandshake )
+ {
+ // Loop waiting for the handshake to complete
+ socket.setSoTimeout(1000);
+ for(int n = 0; handshakeComplete == false && n < 60; n ++)
+ {
+ try
+ {
+ int b = in.read();
+ }
+ catch(SSLException e)
+ {
+ if( trace )
+ log.trace("Error while waiting for handshake to complete", e);
+ throw e;
+ }
+ catch(IOException e)
+ {
+ if( trace )
+ log.trace("Handshaked read()", e);
+ }
+ }
+ if( handshakeComplete == false )
+ throw new SSLException("Handshaked failed to complete in 60 seconds");
+ // Restore the original timeout
+ socket.setSoTimeout(timeout);
+ }
+
}
+ public void handshakeCompleted(HandshakeCompletedEvent event)
+ {
+ handshakeComplete = true;
+ byte[] id = event.getSession().getId();
+ try
+ {
+ sessionID = new String(id, "UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ log.warn("Failed to create session id using UTF-8, using default", e);
+ sessionID = new String(id);
+ }
+ if( trace )
+ {
+ log.trace("handshakeCompleted, event="+event+", sessionID="+sessionID);
+ }
+ }
+
+ public String toString()
+ {
+ StringBuffer tmp = new StringBuffer("ClientSocket@");
+ tmp.append(System.identityHashCode(this));
+ tmp.append('[');
+ tmp.append("socket=");
+ tmp.append(socket.toString());
+ tmp.append(']');
+ return tmp.toString();
+ }
+
+ /**
+ * @todo should this be handled with weak references as this should
+ * work better with gc
+ */
protected void finalize()
{
if (socket != null)
{
- try { socket.close(); } catch (Exception ignored) {}
+ if( trace )
+ log.trace("Closing socket in finalize: "+socket);
+ try
+ {
+ socketCloseCount --;
+ socket.close();
+ }
+ catch (Exception ignored) {}
+ finally
+ {
+ socket = null;
+ }
}
}
}
/**
+ * Clear all class level stats
+ */
+ public static void clearStats()
+ {
+ getSocketTime = 0;
+ readTime = 0;
+ writeTime = 0;
+ serializeTime = 0;
+ deserializeTime = 0;
+ usedPooled = 0;
+ }
+
+ /**
+ * @return the active number of client connections
+ */
+ public static long getInUseCount()
+ {
+ return inUseCount;
+ }
+
+ /**
+ * @return the number of times a connection was returned from a pool
+ */
+ public static long getUsedPooled()
+ {
+ return usedPooled;
+ }
+ public static long getSocketConnectCount()
+ {
+ return socketConnectCount;
+ }
+ public static long getSocketCloseCount()
+ {
+ return socketCloseCount;
+ }
+
+ /**
+ * @return the total number of pooled connections across all ServerAddresses
+ */
+ public static int getTotalPoolCount()
+ {
+ int count = 0;
+ Iterator iter = connectionPools.values().iterator();
+ while( iter.hasNext() )
+ {
+ List pool = (List) iter.next();
+ if( pool != null )
+ count += pool.size();
+ }
+ return count;
+ }
+
+ /**
+ * @return the proxy local pool count
+ */
+ public long getPoolCount()
+ {
+ return pool.size();
+ }
+
+ /**
* Exposed for externalization.
*/
public PooledInvokerProxy()
{
super();
+ trace = log.isTraceEnabled();
}
-
/**
* Create a new Proxy.
*
*/
public PooledInvokerProxy(ServerAddress sa, int maxPoolSize)
{
+ this(sa, maxPoolSize, MAX_RETRIES);
+ }
+ public PooledInvokerProxy(ServerAddress sa, int maxPoolSize, int retryCount)
+ {
this.address = sa;
this.maxPoolSize = maxPoolSize;
+ this.retryCount = retryCount;
}
/**
@@ -146,6 +347,9 @@
*/
public static void clearPool(ServerAddress sa)
{
+ boolean trace = log.isTraceEnabled();
+ if( trace )
+ log.trace("clearPool, sa: "+sa);
try
{
LinkedList thepool = (LinkedList)connectionPools.get(sa);
@@ -155,15 +359,24 @@
int size = thepool.size();
for (int i = 0; i < size; i++)
{
- ClientSocket socket = (ClientSocket)thepool.removeFirst();
+ ClientSocket cs = null;
try
{
+ ClientSocket socket = (ClientSocket)thepool.removeFirst();
+ cs = socket;
+ if( trace )
+ log.trace("Closing, ClientSocket: "+socket);
+ socketCloseCount --;
socket.socket.close();
- socket.socket = null;
}
catch (Exception ignored)
{
}
+ finally
+ {
+ if( cs != null )
+ cs.socket = null;
+ }
}
}
}
@@ -203,10 +416,9 @@
protected ClientSocket getConnection() throws Exception
{
- Exception failed = null;
Socket socket = null;
+ ClientSocket cs = null;
-
//
// Need to retry a few times
// on socket connection because, at least on Windoze,
@@ -219,79 +431,156 @@
// have nothing to do with backlog or number of threads
// waiting in accept() on the server.
//
- for (int i = 0; i < MAX_RETRIES; i++)
+ for (int i = 0; i < retryCount; i++)
{
- synchronized(pool)
+ ClientSocket pooled = getPooledConnection();
+ if (pooled != null)
{
- if (pool.size() > 0)
- {
- ClientSocket pooled = getPooledConnection();
- if (pooled != null)
- {
- usedPooled++;
- return pooled;
- }
- }
+ usedPooled++;
+ inUseCount ++;
+ return pooled;
}
-
+
try
{
- socket = new Socket(address.address, address.port);
+ if( trace)
+ {
+ log.trace("Connecting to addr: "+address.address
+ +", port: "+address.port
+ +",clientSocketFactory: "+address.clientSocketFactory
+ +",enableTcpNoDelay: "+address.enableTcpNoDelay
+ +",timeout: "+address.timeout);
+ }
+ if( address.clientSocketFactory != null )
+ socket = address.clientSocketFactory.createSocket(address.address, address.port);
+ else
+ socket = new Socket(address.address, address.port);
+ socketConnectCount ++;
+ if( trace )
+ log.trace("Connected, socket="+socket);
+
+ socket.setTcpNoDelay(address.enableTcpNoDelay);
+ cs = new ClientSocket(socket, address.timeout);
+ inUseCount ++;
+ if( trace )
+ {
+ log.trace("New ClientSocket: "+cs
+ +", usedPooled="+ usedPooled
+ +", inUseCount="+ inUseCount
+ +", socketConnectCount="+ socketConnectCount
+ +", socketCloseCount="+ socketCloseCount
+ );
+ }
break;
}
catch (Exception ex)
{
- if (i + 1 < MAX_RETRIES)
+ if( ex instanceof InterruptedIOException || ex instanceof SocketException )
{
- Thread.sleep(1);
- continue;
+ if( trace )
+ log.trace("Connect failed", ex);
+ if (i + 1 < retryCount)
+ {
+ Thread.sleep(1);
+ continue;
+ }
}
throw ex;
}
}
- socket.setTcpNoDelay(address.enableTcpNoDelay);
- return new ClientSocket(socket, address.timeout);
+ // Should not happen
+ if( cs == null )
+ throw new ConnectException("Failed to obtain a socket, tries="+retryCount);
+ return cs;
}
-
- protected ClientSocket getPooledConnection()
+
+ protected synchronized ClientSocket getPooledConnection()
{
ClientSocket socket = null;
while (pool.size() > 0)
{
- socket = (ClientSocket)pool.removeFirst();
try
{
+ synchronized( pool )
+ {
+ socket = (ClientSocket)pool.removeFirst();
+ }
// Test to see if socket is alive by send ACK message
+ if( trace )
+ log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
final byte ACK = 1;
socket.out.writeByte(ACK);
socket.out.flush();
socket.in.readByte();
+ if( trace )
+ {
+ log.trace("Using pooled ClientSocket: "+socket
+ +", usedPooled="+ usedPooled
+ +", inUseCount="+ inUseCount
+ +", socketConnectCount="+ socketConnectCount
+ +", socketCloseCount="+ socketCloseCount
+ );
+ }
return socket;
}
catch (Exception ex)
{
+ if( trace )
+ log.trace("Failed to validate pooled socket: "+socket, ex);
try
{
- socket.socket.close();
+ if( socket != null )
+ {
+ socketCloseCount --;
+ socket.socket.close();
+ }
}
- catch (Exception ignored) {}
+ catch (Exception ignored)
+ {
+ }
+ finally
+ {
+ if( socket != null )
+ socket.socket = null;
+ }
}
}
return null;
}
/**
+ * Return a socket to the pool
+ * @param socket
+ * @return true if socket was added to the pool, false if the pool
+ * was full
+ */
+ protected synchronized boolean returnConnection(ClientSocket socket)
+ {
+ boolean pooled = false;
+ synchronized( pool )
+ {
+ if (pool.size() < maxPoolSize)
+ {
+ pool.add(socket);
+ inUseCount --;
+ pooled = true;
+ }
+ }
+ return pooled;
+ }
+
+ /**
* The name of of the server.
*/
public String getServerHostName() throws Exception
{
return address.address;
}
-
+
/**
* ???
*
- * @todo: MOVE TO TRANSACTION
+ * @todo MOVE TO TRANSACTION
*
* @return the transaction propagation context of the transaction
* associated with the current thread.
@@ -312,21 +601,31 @@
public Object invoke(Invocation invocation)
throws Exception
{
+ boolean trace = log.isTraceEnabled();
// We are going to go through a Remote invocation, switch to a Marshalled Invocation
- MarshalledInvocation mi = new MarshalledInvocation(invocation);
-
+ PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation);
+
// Set the transaction propagation context
// @todo: MOVE TO TRANSACTION
mi.setTransactionPropagationContext(getTransactionPropagationContext());
-
Object response = null;
long start = System.currentTimeMillis();
ClientSocket socket = getConnection();
long end = System.currentTimeMillis() - start;
getSocketTime += end;
+ // Add the socket session if it exists
+ if( socket.sessionID != null )
+ {
+ mi.setValue("SESSION_ID", socket.sessionID);
+ if( trace )
+ log.trace("Added SESSION_ID to invocation");
+ }
+
try
{
+ if( trace )
+ log.trace("Sending invocation to: "+mi.getObjectName());
socket.out.writeObject(mi);
socket.out.reset();
socket.out.writeObject(Boolean.TRUE); // for stupid ObjectInputStream reset
@@ -345,38 +644,43 @@
}
catch (Exception ex)
{
+ if( trace )
+ log.trace("Failure during invoke", ex);
try
{
+ socketCloseCount --;
socket.socket.close();
}
catch (Exception ignored) {}
- //System.out.println("got read exception, exiting");
- throw new ConnectException("Failed to communicate", ex);
+ finally
+ {
+ socket.socket = null;
+ }
+ throw new java.rmi.ConnectException("Failure during invoke", ex);
}
-
- //System.out.println("put back in pool");
+
// Put socket back in pool for reuse
- synchronized (pool)
+ if( returnConnection(socket) == false )
{
- if (pool.size() < maxPoolSize)
+ // Failed, close the socket
+ if( trace )
+ log.trace("Closing unpooled socket: "+socket);
+ try
{
- pool.add(socket);
+ socketCloseCount --;
+ socket.socket.close();
}
- else
+ catch (Exception ignored) {}
+ finally
{
- try
- {
- socket.socket.close();
- }
- catch (Exception ignored) {}
+ socket.socket = null;
}
}
-
+
// Return response
- //System.out.println("return response");
try
- {
+ {
if (response instanceof Exception)
{
throw ((Exception)response);
@@ -385,7 +689,7 @@
{
return ((MarshalledObject)response).get();
}
- return response;
+ return response;
}
catch (ServerException ex)
{
@@ -402,29 +706,70 @@
throw (TransactionRolledbackException) ex.detail;
}
throw ex;
- }
+ }
}
-
+
/**
- * Externalize this instance and handle obtaining the remoteInvoker stub
+ * Write out the serializable data
+ * @serialData address ServerAddress
+ * @serialData maxPoolSize int
+ * @serialData WIRE_VERSION int version
+ * @serialData retryCount int
+ * @param out
+ * @throws IOException
*/
public void writeExternal(final ObjectOutput out)
throws IOException
- {
+ {
+ // The legacy wire format is address, maxPoolSize
out.writeObject(address);
out.writeInt(maxPoolSize);
+ // Write out the current version format and its data
+ out.writeInt(WIRE_VERSION);
+ out.writeInt(retryCount);
}
-
- /**
- * Un-externalize this instance.
- *
- */
+
public void readExternal(final ObjectInput in)
throws IOException, ClassNotFoundException
{
+ trace = log.isTraceEnabled();
address = (ServerAddress)in.readObject();
maxPoolSize = in.readInt();
+ int version = 0;
+ try
+ {
+ version = in.readInt();
+ }
+ catch(EOFException e)
+ {
+ // No version written and there is no more data
+ }
+ catch(OptionalDataException e)
+ {
+ // No version written and there is data from other objects
+ }
+
+ switch( version )
+ {
+ case 0:
+ // This has no retryCount, default it to the hard-coded value
+ retryCount = MAX_RETRIES;
+ break;
+ case 1:
+ readVersion1(in);
+ break;
+ default:
+ /* Assume a newer version that only adds defaultable values.
+ The alternative would be to thrown an exception
+ */
+ break;
+ }
initPool();
}
+
+ private void readVersion1(final ObjectInput in)
+ throws IOException
+ {
+ retryCount = in.readInt();
+ }
}
-
Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy2.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,779 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.Externalizable;
+import java.io.ObjectOutputStream;
+import java.io.ObjectInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
+import java.io.ObjectOutput;
+import java.io.ObjectInput;
+import java.io.EOFException;
+import java.io.OptionalDataException;
+import java.util.Map;
+import java.util.Iterator;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.ConnectException;
+import java.rmi.MarshalledObject;
+import java.rmi.ServerException;
+import java.rmi.NoSuchObjectException;
+
+import javax.net.ssl.HandshakeCompletedListener;
+import javax.net.ssl.SSLSocket;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.HandshakeCompletedEvent;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionRolledbackException;
+
+import org.jboss.invocation.Invoker;
+import org.jboss.invocation.Invocation;
+import org.jboss.logging.Logger;
+import org.jboss.tm.TransactionPropagationContextFactory;
+import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
+
+/**
+ * Client socket connections are pooled to avoid the overhead of
+ * making a connection.
+ *
+ * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class PooledInvokerProxy2
+ implements Invoker, Externalizable
+{
+ // Attributes ----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PooledInvokerProxy2.class);
+ /** The serialVersionUID @since 1.1.4.3 */
+ private static final long serialVersionUID = -1456509931095566410L;
+ /** The current wire format we write */
+ private static final int WIRE_VERSION = 1;
+ /** This controls the max number of concurrent connection in usse */
+ private static FIFOSemaphore maxActiveConnections;
+
+ /**
+ * Factory for transaction propagation contexts.
+ *
+ * @todo marcf remove all transaction spill from here
+ *
+ * When set to a non-null value, it is used to get transaction
+ * propagation contexts for remote method invocations.
+ * If <code>null</code>, transactions are not propagated on
+ * remote method invocations.
+ */
+ protected static TransactionPropagationContextFactory tpcFactory = null;
+
+ // @todo: MOVE TO TRANSACTION
+ //
+ // TPC factory
+ public static void setTPCFactory(TransactionPropagationContextFactory tpcf) {
+ PooledInvokerProxy2.tpcFactory = tpcf;
+ }
+
+ // Simple performance measurements, not thread safe
+ public static long getSocketTime = 0;
+ public static long readTime = 0;
+ public static long writeTime = 0;
+ public static long serializeTime = 0;
+ public static long deserializeTime = 0;
+ /** The number of times a connection has been obtained from a pool */
+ public static long usedPooled = 0;
+ /** The number of connections in use */
+ private static int inUseCount = 0;
+ /** The number of socket connections made */
+ private static long socketConnectCount = 0;
+ /** The number of socket close calls made */
+ private static long socketCloseCount = 0;
+
+ /**
+ * Set number of retries in getSocket method
+ */
+ public static int MAX_RETRIES = 10;
+
+ /** A class wide pool Map<ServerAddres, LinkedList<ClientSocket>> */
+ protected static final Map connectionPools = new ConcurrentReaderHashMap();
+
+ /**
+ * connection information
+ */
+ protected ServerAddress address;
+
+ /**
+ * Pool for this invoker. This is shared between all
+ * instances of proxies attached to a specific invoker
+ * This should not be serializable, but is for backward compatibility.
+ */
+ protected BoundedLinkedQueue pool = null;
+ /** Used to initialize the */
+ protected int maxPoolSize;
+ /** The number of times to retry after seeing a ConnectionException */
+ protected int retryCount = 1;
+ /** The logging trace flag */
+ private transient boolean trace;
+
+ /**
+ * An encapsulation of a client connection
+ */
+ protected static class ClientSocket
+ implements HandshakeCompletedListener
+ {
+ public ObjectOutputStream out;
+ public ObjectInputStream in;
+ public Socket socket;
+ public int timeout;
+ public String sessionID;
+ private boolean handshakeComplete = false;
+ private boolean trace;
+
+ public ClientSocket(Socket socket, int timeout) throws Exception
+ {
+ this.socket = socket;
+ trace = log.isTraceEnabled();
+ boolean needHandshake = false;
+
+ if( socket instanceof SSLSocket )
+ {
+ SSLSocket ssl = (SSLSocket) socket;
+ ssl.addHandshakeCompletedListener(this);
+ if( trace )
+ log.trace("Starting SSL handshake");
+ needHandshake = true;
+ handshakeComplete = false;
+ ssl.startHandshake();
+ }
+ socket.setSoTimeout(timeout);
+ this.timeout = timeout;
+ out = new OptimizedObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+ out.flush();
+ in = new OptimizedObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+ if( needHandshake )
+ {
+ // Loop waiting for the handshake to complete
+ socket.setSoTimeout(1000);
+ for(int n = 0; handshakeComplete == false && n < 60; n ++)
+ {
+ try
+ {
+ int b = in.read();
+ }
+ catch(SSLException e)
+ {
+ if( trace )
+ log.trace("Error while waiting for handshake to complete", e);
+ throw e;
+ }
+ catch(IOException e)
+ {
+ if( trace )
+ log.trace("Handshaked read()", e);
+ }
+ }
+ if( handshakeComplete == false )
+ throw new SSLException("Handshaked failed to complete in 60 seconds");
+ // Restore the original timeout
+ socket.setSoTimeout(timeout);
+ }
+
+ }
+
+ public void handshakeCompleted(HandshakeCompletedEvent event)
+ {
+ handshakeComplete = true;
+ byte[] id = event.getSession().getId();
+ try
+ {
+ sessionID = new String(id, "UTF-8");
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ log.warn("Failed to create session id using UTF-8, using default", e);
+ sessionID = new String(id);
+ }
+ if( trace )
+ {
+ log.trace("handshakeCompleted, event="+event+", sessionID="+sessionID);
+ }
+ }
+
+ public String toString()
+ {
+ StringBuffer tmp = new StringBuffer("ClientSocket@");
+ tmp.append(System.identityHashCode(this));
+ tmp.append('[');
+ tmp.append("socket=");
+ tmp.append(socket.toString());
+ tmp.append(']');
+ return tmp.toString();
+ }
+
+ /**
+ * @todo should this be handled with weak references as this should
+ * work better with gc
+ */
+ protected void finalize()
+ {
+ if (socket != null)
+ {
+ if( trace )
+ log.trace("Closing socket in finalize: "+socket);
+ try
+ {
+ socketCloseCount --;
+ socket.close();
+ }
+ catch (Exception ignored) {}
+ finally
+ {
+ socket = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Clear all class level stats
+ */
+ public static void clearStats()
+ {
+ getSocketTime = 0;
+ readTime = 0;
+ writeTime = 0;
+ serializeTime = 0;
+ deserializeTime = 0;
+ usedPooled = 0;
+ }
+
+ /**
+ * @return the active number of client connections
+ */
+ public static long getInUseCount()
+ {
+ return inUseCount;
+ }
+
+ /**
+ * @return the number of times a connection was returned from a pool
+ */
+ public static long getUsedPooled()
+ {
+ return usedPooled;
+ }
+ public static long getSocketConnectCount()
+ {
+ return socketConnectCount;
+ }
+ public static long getSocketCloseCount()
+ {
+ return socketCloseCount;
+ }
+
+ /**
+ * @return the total number of pooled connections across all ServerAddresses
+ */
+ public static int getTotalPoolCount()
+ {
+ int count = 0;
+ Iterator iter = connectionPools.values().iterator();
+ while( iter.hasNext() )
+ {
+ BoundedLinkedQueue pool = (BoundedLinkedQueue) iter.next();
+ if( pool != null )
+ count += pool.size();
+ }
+ return count;
+ }
+
+ /**
+ * @return the proxy local pool count
+ */
+ public long getPoolCount()
+ {
+ return pool.size();
+ }
+
+ /**
+ * Exposed for externalization.
+ */
+ public PooledInvokerProxy2()
+ {
+ super();
+ trace = log.isTraceEnabled();
+ }
+
+ /**
+ * Create a new Proxy.
+ *
+ */
+ public PooledInvokerProxy2(ServerAddress sa, int maxPoolSize)
+ {
+ this(sa, maxPoolSize, MAX_RETRIES);
+ }
+ public PooledInvokerProxy2(ServerAddress sa, int maxPoolSize, int retryCount)
+ {
+ this.address = sa;
+ this.maxPoolSize = maxPoolSize;
+ this.retryCount = retryCount;
+ }
+
+ /**
+ * Close all sockets in a specific pool.
+ */
+ public static void clearPool(ServerAddress sa)
+ {
+ boolean trace = log.isTraceEnabled();
+ if( trace )
+ log.trace("clearPool, sa: "+sa);
+ try
+ {
+ BoundedLinkedQueue thepool = (BoundedLinkedQueue) connectionPools.get(sa);
+ if (thepool == null) return;
+ int size = thepool.size();
+ for (int i = 0; i < size; i++)
+ {
+ ClientSocket cs = null;
+ try
+ {
+ ClientSocket socket = (ClientSocket)thepool.take();
+ cs = socket;
+ if( trace )
+ log.trace("Closing, ClientSocket: "+socket);
+ socketCloseCount --;
+ socket.socket.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ finally
+ {
+ if( cs != null )
+ cs.socket = null;
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ // ignored
+ }
+ }
+ /**
+ * Close all sockets in all pools
+ */
+ public static void clearPools()
+ {
+ synchronized (connectionPools)
+ {
+ Iterator it = connectionPools.keySet().iterator();
+ while (it.hasNext())
+ {
+ ServerAddress sa = (ServerAddress)it.next();
+ clearPool(sa);
+ }
+ }
+ }
+
+ protected void initPool()
+ {
+ synchronized (connectionPools)
+ {
+ pool = (BoundedLinkedQueue) connectionPools.get(address);
+ if (pool == null)
+ {
+ pool = new BoundedLinkedQueue(maxPoolSize);
+ connectionPools.put(address, pool);
+ }
+ if( maxActiveConnections == null )
+ maxActiveConnections = new FIFOSemaphore(maxPoolSize);
+ }
+ }
+
+ protected ClientSocket getConnection() throws Exception
+ {
+ Socket socket = null;
+ ClientSocket cs = null;
+
+ //
+ // Need to retry a few times
+ // on socket connection because, at least on Windoze,
+ // if too many concurrent threads try to connect
+ // at same time, you get ConnectionRefused
+ //
+ // Retrying seems to be the most performant.
+ //
+ // This problem always happens with RMI and seems to
+ // have nothing to do with backlog or number of threads
+ // waiting in accept() on the server.
+ //
+ for (int i = 0; i < retryCount; i++)
+ {
+ ClientSocket pooled = getPooledConnection();
+ if (pooled != null)
+ {
+ usedPooled++;
+ inUseCount ++;
+ return pooled;
+ }
+
+ try
+ {
+ if( trace)
+ {
+ log.trace("Connecting to addr: "+address.address
+ +", port: "+address.port
+ +",clientSocketFactory: "+address.clientSocketFactory
+ +",enableTcpNoDelay: "+address.enableTcpNoDelay
+ +",timeout: "+address.timeout);
+ }
+ if( address.clientSocketFactory != null )
+ socket = address.clientSocketFactory.createSocket(address.address, address.port);
+ else
+ socket = new Socket(address.address, address.port);
+ socketConnectCount ++;
+ if( trace )
+ log.trace("Connected, socket="+socket);
+
+ socket.setTcpNoDelay(address.enableTcpNoDelay);
+ cs = new ClientSocket(socket, address.timeout);
+ inUseCount ++;
+ if( trace )
+ {
+ log.trace("New ClientSocket: "+cs
+ +", usedPooled="+ usedPooled
+ +", inUseCount="+ inUseCount
+ +", socketConnectCount="+ socketConnectCount
+ +", socketCloseCount="+ socketCloseCount
+ );
+ }
+ break;
+ }
+ catch (Exception ex)
+ {
+ if( ex instanceof InterruptedIOException || ex instanceof SocketException )
+ {
+ if( trace )
+ log.trace("Connect failed", ex);
+ if (i + 1 < retryCount)
+ {
+ Thread.sleep(1);
+ continue;
+ }
+ }
+ throw ex;
+ }
+ }
+ // Should not happen
+ if( cs == null )
+ throw new ConnectException("Failed to obtain a socket, tries="+retryCount);
+ return cs;
+ }
+
+ protected synchronized ClientSocket getPooledConnection()
+ {
+ ClientSocket socket = null;
+ while (pool.size() > 0)
+ {
+ try
+ {
+ socket = (ClientSocket)pool.take();
+ // Test to see if socket is alive by send ACK message
+ if( trace )
+ log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
+ final byte ACK = 1;
+ socket.out.writeByte(ACK);
+ socket.out.flush();
+ socket.in.readByte();
+ if( trace )
+ {
+ log.trace("Using pooled ClientSocket: "+socket
+ +", usedPooled="+ usedPooled
+ +", inUseCount="+ inUseCount
+ +", socketConnectCount="+ socketConnectCount
+ +", socketCloseCount="+ socketCloseCount
+ );
+ }
+ return socket;
+ }
+ catch (Exception ex)
+ {
+ if( trace )
+ log.trace("Failed to validate pooled socket: "+socket, ex);
+ try
+ {
+ if( socket != null )
+ {
+ socketCloseCount --;
+ socket.socket.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+ finally
+ {
+ socket.socket = null;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Return a socket to the pool
+ * @param socket
+ * @return true if socket was added to the pool, false if the pool
+ * was full
+ */
+ protected synchronized boolean returnConnection(ClientSocket socket)
+ {
+ boolean pooled = false;
+ try
+ {
+ pool.offer(socket, 10*1000);
+ pooled = true;
+ }
+ catch (InterruptedException e)
+ {
+ if( trace )
+ log.trace("Failed to put socket back into pool due to InterruptedException");
+ }
+ inUseCount --;
+ return pooled;
+ }
+
+ /**
+ * The name of of the server.
+ */
+ public String getServerHostName() throws Exception
+ {
+ return address.address;
+ }
+
+ /**
+ * ???
+ *
+ * @todo MOVE TO TRANSACTION
+ *
+ * @return the transaction propagation context of the transaction
+ * associated with the current thread.
+ * Returns <code>null</code> if the transaction manager was never
+ * set, or if no transaction is associated with the current thread.
+ */
+ public Object getTransactionPropagationContext()
+ throws SystemException
+ {
+ return (tpcFactory == null) ? null : tpcFactory.getTransactionPropagationContext();
+ }
+
+
+ /**
+ * The invocation on the delegate, calls the right invoker. Remote if we are remote,
+ * local if we are local.
+ */
+ public Object invoke(Invocation invocation)
+ throws Exception
+ {
+ boolean trace = log.isTraceEnabled();
+ // We are going to go through a Remote invocation, switch to a Marshalled Invocation
+ PooledMarshalledInvocation mi = new PooledMarshalledInvocation(invocation);
+
+ // Set the transaction propagation context
+ // @todo: MOVE TO TRANSACTION
+ mi.setTransactionPropagationContext(getTransactionPropagationContext());
+
+ Object response = null;
+ long start = System.currentTimeMillis();
+ maxActiveConnections.acquire();
+ try
+ {
+ ClientSocket socket = getConnection();
+ long end = System.currentTimeMillis() - start;
+ getSocketTime += end;
+ // Add the socket session if it exists
+ if( socket.sessionID != null )
+ {
+ mi.setValue("SESSION_ID", socket.sessionID);
+ if( trace )
+ log.trace("Added SESSION_ID to invocation");
+ }
+
+ try
+ {
+ if( trace )
+ log.trace("Sending invocation to: "+mi.getObjectName());
+ socket.out.writeObject(mi);
+ socket.out.reset();
+ socket.out.writeObject(Boolean.TRUE); // for stupid ObjectInputStream reset
+ socket.out.flush();
+ socket.out.reset();
+ end = System.currentTimeMillis() - start;
+ writeTime += end;
+ start = System.currentTimeMillis();
+ response = socket.in.readObject();
+ // to make sure stream gets reset
+ // Stupid ObjectInputStream holds object graph
+ // can only be set by the client/server sending a TC_RESET
+ socket.in.readObject();
+ end = System.currentTimeMillis() - start;
+ readTime += end;
+ }
+ catch (Exception ex)
+ {
+ if( trace )
+ log.trace("Failure during invoke", ex);
+ try
+ {
+ socketCloseCount --;
+ socket.socket.close();
+ }
+ catch (Exception ignored) {}
+ finally
+ {
+ socket.socket = null;
+ }
+ throw new java.rmi.ConnectException("Failure during invoke", ex);
+ }
+
+ // Put socket back in pool for reuse
+ if( returnConnection(socket) == false )
+ {
+ // Failed, close the socket
+ if( trace )
+ log.trace("Closing unpooled socket: "+socket);
+ try
+ {
+ socketCloseCount --;
+ socket.socket.close();
+ }
+ catch (Exception ignored) {}
+ finally
+ {
+ socket.socket = null;
+ }
+ }
+ }
+ finally
+ {
+ maxActiveConnections.release();
+ }
+
+ // Return response
+
+ try
+ {
+ if (response instanceof Exception)
+ {
+ throw ((Exception)response);
+ }
+ if (response instanceof MarshalledObject)
+ {
+ return ((MarshalledObject)response).get();
+ }
+ return response;
+ }
+ catch (ServerException ex)
+ {
+ // Suns RMI implementation wraps NoSuchObjectException in
+ // a ServerException. We cannot have that if we want
+ // to comply with the spec, so we unwrap here.
+ if (ex.detail instanceof NoSuchObjectException)
+ {
+ throw (NoSuchObjectException) ex.detail;
+ }
+ //likewise
+ if (ex.detail instanceof TransactionRolledbackException)
+ {
+ throw (TransactionRolledbackException) ex.detail;
+ }
+ throw ex;
+ }
+ }
+
+ /**
+ * Write out the serializable data
+ * @serialData address ServerAddress
+ * @serialData maxPoolSize int
+ * @serialData WIRE_VERSION int version
+ * @serialData retryCount int
+ * @param out
+ * @throws java.io.IOException
+ */
+ public void writeExternal(final ObjectOutput out)
+ throws IOException
+ {
+ // The legacy wire format is address, maxPoolSize
+ out.writeObject(address);
+ out.writeInt(maxPoolSize);
+ // Write out the current version format and its data
+ out.writeInt(WIRE_VERSION);
+ out.writeInt(retryCount);
+ }
+
+ public void readExternal(final ObjectInput in)
+ throws IOException, ClassNotFoundException
+ {
+ trace = log.isTraceEnabled();
+ address = (ServerAddress)in.readObject();
+ maxPoolSize = in.readInt();
+ int version = 0;
+ try
+ {
+ version = in.readInt();
+ }
+ catch(EOFException e)
+ {
+ // No version written and there is no more data
+ }
+ catch(OptionalDataException e)
+ {
+ // No version written and there is data from other objects
+ }
+
+ switch( version )
+ {
+ case 0:
+ // This has no retryCount, default it to the hard-coded value
+ retryCount = MAX_RETRIES;
+ break;
+ case 1:
+ readVersion1(in);
+ break;
+ default:
+ /* Assume a newer version that only adds defaultable values.
+ The alternative would be to thrown an exception
+ */
+ break;
+ }
+ initPool();
+ }
+
+ private void readVersion1(final ObjectInput in)
+ throws IOException
+ {
+ retryCount = in.readInt();
+ }
+}
Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledInvocation.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,255 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.invocation.pooled.interfaces;
+
+import org.jboss.invocation.Invocation;
+import org.jboss.invocation.InvocationType;
+import org.jboss.invocation.MarshalledInvocation;
+import org.jboss.invocation.MarshalledValue;
+
+import javax.transaction.Transaction;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.security.Principal;
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * The MarshalledInvocation is an invocation that travels. As such it
+ * serializes its payload because of lack of ClassLoader visibility. As such it
+ * contains Marshalled data representing the byte[] of the Invocation object it
+ * extends Besides handling the specifics of "marshalling" the payload, which
+ * could be done at the Invocation level the Marshalled Invocation can hold
+ * optimization and needed code for distribution for example the
+ * TransactionPropagationContext which is a serialization of the TX for
+ * distribution purposes as well as the "hash" for the methods that we send, as
+ * opposed to sending Method objects. Serialization "optimizations" should be
+ * coded here in the externalization implementation of the class
+ *
+ * @author <a href="mailto:marc at jboss.org">Marc Fleury</a>
+ * @version $Revision$
+ */
+public class PooledMarshalledInvocation
+ extends MarshalledInvocation
+ implements java.io.Externalizable
+{
+ // Constants -----------------------------------------------------
+
+ /**
+ * Serial Version Identifier.
+ */
+ static final long serialVersionUID = -728630295444149842L;
+
+ private transient Transaction tx;
+ private transient Object credential;
+ private transient Principal principal;
+ private transient Object enterpriseContext;
+ private transient Object id;
+ private transient PooledMarshalledValue pooledMarshalledArgs;
+
+
+ // Constructors --------------------------------------------------
+ public PooledMarshalledInvocation()
+ {
+ // For externalization to work
+ }
+
+ public PooledMarshalledInvocation(Invocation invocation)
+ {
+ this.payload = invocation.payload;
+ this.as_is_payload = invocation.as_is_payload;
+ this.method = invocation.getMethod();
+ this.objectName = invocation.getObjectName();
+ this.args = invocation.getArguments();
+ this.invocationType = invocation.getType();
+ }
+
+
+ public PooledMarshalledInvocation(Object id,
+ Method m,
+ Object[] args,
+ Transaction tx,
+ Principal identity,
+ Object credential)
+ {
+ super(id, m, args, tx, identity, credential);
+ }
+ // Public --------------------------------------------------------
+
+
+
+ public Object getEnterpriseContext()
+ {
+ return enterpriseContext;
+ }
+
+ public void setEnterpriseContext(Object enterpriseContext)
+ {
+ this.enterpriseContext = enterpriseContext;
+ }
+
+ public Object getId()
+ {
+ if (id == null) id = super.getId();
+ return id;
+ }
+
+ public void setId(Object id)
+ {
+ super.setId(id);
+ this.id = id;
+ }
+
+ public void setTransaction(Transaction tx)
+ {
+ super.setTransaction(tx);
+ this.tx = tx;
+ }
+
+ public Transaction getTransaction()
+ {
+ if (tx == null) tx = super.getTransaction();
+ return this.tx;
+ }
+
+ public Object getCredential()
+ {
+ if (credential == null) credential = super.getCredential();
+ return credential;
+ }
+
+ public void setCredential(Object credential)
+ {
+ super.setCredential(credential);
+ this.credential = credential;
+ }
+
+ public Principal getPrincipal()
+ {
+ if (principal == null) principal = super.getPrincipal();
+ return principal;
+ }
+
+ public void setPrincipal(Principal principal)
+ {
+ super.setPrincipal(principal);
+ this.principal = principal;
+ }
+
+
+ public Object[] getArguments()
+ {
+ if (this.args == null)
+ {
+ try
+ {
+ this.args = (Object[]) pooledMarshalledArgs.get();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ return args;
+ }
+
+ // Externalizable implementation ---------------------------------
+ public void writeExternal(java.io.ObjectOutput out)
+ throws IOException
+ {
+ out.writeObject(invocationType);
+ // FIXME marcf: the "specific" treatment of Transactions should be abstracted.
+ // Write the TPC, not the local transaction
+ out.writeObject(tpc);
+
+ long methodHash = calculateHash(this.method);
+ out.writeLong(methodHash);
+
+ out.writeInt(((Integer) this.objectName).intValue());
+ out.writeObject(new PooledMarshalledValue(this.args));
+
+
+ // Write out payload hashmap
+ // Don't use hashmap serialization to avoid not-needed data being
+ // marshalled
+ // The map contains only serialized representations of every other object
+ // Everything else is possibly tied to classloaders that exist inside the
+ // server but not in the generic JMX land. they will travel in the payload
+ // as MarshalledValue objects, see the Invocation getter logic
+ //
+ if (payload == null)
+ out.writeInt(0);
+ else
+ {
+ out.writeInt(payload.size());
+ Iterator keys = payload.keySet().iterator();
+ while (keys.hasNext())
+ {
+ Object currentKey = keys.next();
+
+ // This code could be if (object.getClass().getName().startsWith("java")) then don't serialize.
+ // Bench the above for speed.
+
+ out.writeObject(currentKey);
+ out.writeObject(new MarshalledValue(payload.get(currentKey)));
+ }
+ }
+
+ // This map is "safe" as is
+ //out.writeObject(as_is_payload);
+ if (as_is_payload == null)
+ out.writeInt(0);
+ else
+ {
+ out.writeInt(as_is_payload.size());
+
+ Iterator keys = as_is_payload.keySet().iterator();
+ while (keys.hasNext())
+ {
+ Object currentKey = keys.next();
+ out.writeObject(currentKey);
+ out.writeObject(as_is_payload.get(currentKey));
+ }
+ }
+ }
+
+ public void readExternal(java.io.ObjectInput in)
+ throws IOException, ClassNotFoundException
+ {
+ invocationType = (InvocationType) in.readObject();
+ tpc = in.readObject();
+ this.methodHash = in.readLong();
+
+ this.objectName = new Integer(in.readInt());
+
+ pooledMarshalledArgs = (PooledMarshalledValue) in.readObject();
+
+ int payloadSize = in.readInt();
+ if (payloadSize > 0)
+ {
+ payload = new HashMap();
+ for (int i = 0; i < payloadSize; i++)
+ {
+ Object key = in.readObject();
+ Object value = in.readObject();
+ payload.put(key, value);
+ }
+ }
+
+ int as_is_payloadSize = in.readInt();
+ if (as_is_payloadSize > 0)
+ {
+ as_is_payload = new HashMap();
+ for (int i = 0; i < as_is_payloadSize; i++)
+ {
+ Object key = in.readObject();
+ Object value = in.readObject();
+ as_is_payload.put(key, value);
+ }
+ }
+ }
+}
Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/PooledMarshalledValue.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,177 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.invocation.pooled.interfaces;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Arrays;
+
+/**
+ * A simple replacement for the RMI MarshalledObject that uses the thread
+ * context class loader for resolving classes and proxies. This currently does
+ * not support class annotations and dynamic class loading.
+ *
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public class PooledMarshalledValue
+ implements java.io.Externalizable
+{
+ /** Serial Version Identifier. */
+ private static final long serialVersionUID = -1265347772036749711L;
+
+ /**
+ * The serialized form of the value. If <code>serializedForm</code> is
+ * <code>null</code> then the object marshalled was a <code>null</code>
+ * reference.
+ */
+ private byte[] serializedForm;
+
+ /**
+ * The RMI MarshalledObject hash of the serializedForm array
+ */
+ private int hashCode;
+
+ /**
+ * Exposed for externalization.
+ */
+ public PooledMarshalledValue()
+ {
+ super();
+ }
+
+ public PooledMarshalledValue(Object obj) throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream mvos = new OptimizedObjectOutputStream(baos);
+ mvos.writeObject(obj);
+ mvos.flush();
+ serializedForm = baos.toByteArray();
+ mvos.close();
+ // Use the java.rmi.MarshalledObject hash code calculation
+ int hash = 0;
+ for (int i = 0; i < serializedForm.length; i++)
+ {
+ hash = 31 * hash + serializedForm[i];
+ }
+
+ hashCode = hash;
+ }
+
+ public Object get() throws IOException, ClassNotFoundException
+ {
+ if (serializedForm == null)
+ return null;
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(serializedForm);
+ ObjectInputStream mvis = new OptimizedObjectInputStream(bais);
+ Object retValue = mvis.readObject();
+ mvis.close();
+ return retValue;
+ }
+
+ public byte[] toByteArray()
+ {
+ return serializedForm;
+ }
+
+ public int size()
+ {
+ int size = serializedForm != null ? serializedForm.length : 0;
+ return size;
+ }
+
+ /**
+ * Return a hash code for the serialized form of the value.
+ *
+ * @return the serialized form value hash.
+ */
+ public int hashCode()
+ {
+ return hashCode;
+ }
+
+ public boolean equals(Object obj)
+ {
+ if( this == obj )
+ return true;
+
+ boolean equals = false;
+ if( obj instanceof PooledMarshalledValue )
+ {
+ PooledMarshalledValue mv = (PooledMarshalledValue) obj;
+ if( serializedForm == mv.serializedForm )
+ {
+ equals = true;
+ }
+ else
+ {
+ equals = Arrays.equals(serializedForm, mv.serializedForm);
+ }
+ }
+ return equals;
+ }
+
+ /**
+ * The object implements the readExternal method to restore its
+ * contents by calling the methods of DataInput for primitive
+ * types and readObject for objects, strings and arrays. The
+ * readExternal method must read the values in the same sequence
+ * and with the same types as were written by writeExternal.
+ *
+ * @param in the stream to read data from in order to restore the object
+ *
+ * @throws IOException if I/O errors occur
+ * @throws ClassNotFoundException If the class for an object being
+ * restored cannot be found.
+ */
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
+ {
+ int length = in.readInt();
+ serializedForm = null;
+ if( length > 0 )
+ {
+ serializedForm = new byte[length];
+ in.readFully(serializedForm);
+ }
+ hashCode = in.readInt();
+ }
+
+ /**
+ * The object implements the writeExternal method to save its contents
+ * by calling the methods of DataOutput for its primitive values or
+ * calling the writeObject method of ObjectOutput for objects, strings,
+ * and arrays.
+ *
+ * @serialData Overriding methods should use this tag to describe
+ * the data layout of this Externalizable object.
+ * List the sequence of element types and, if possible,
+ * relate the element to a public/protected field and/or
+ * method of this Externalizable class.
+ *
+ * @param out the stream to write the object to
+ *
+ * @throws IOException Includes any I/O exceptions that may occur
+ */
+ public void writeExternal(ObjectOutput out) throws IOException
+ {
+ int length = serializedForm != null ? serializedForm.length : 0;
+ out.writeInt(length);
+ if( length > 0 )
+ {
+ out.write(serializedForm);
+ }
+ out.writeInt(hashCode);
+ }
+}
Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/interfaces/ServerAddress.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,14 +1,29 @@
-/***************************************
- * *
- * JBoss: The OpenSource J2EE WebOS *
- * *
- * Distributable under LGPL license. *
- * See terms of license at gnu.org. *
- * *
- ***************************************/
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
package org.jboss.invocation.pooled.interfaces;
import java.io.Serializable;
+import java.io.IOException;
+import javax.net.SocketFactory;
/**
* This class encapsulates all the required information for a client to
@@ -17,7 +32,8 @@
* It also attempts to provide a fast hash() function since this object
* is used as a key in a hashmap mainted by the ConnectionManager.
*
- * @author <a href="mailto:hiram.chirino at jboss.org">Hiram Chirino</a>
+ * @author Bill Burke
+ * @author Scott.Stark at jboss.org
* @version $Revision$
*/
public class ServerAddress implements Serializable
@@ -26,24 +42,32 @@
private static final long serialVersionUID = -7206359745950445445L;
/**
- * Address of host ot connect to
+ * Address of host to connect to
+ * @serial
*/
public String address;
/**
* Port the service is listening on
+ * @serial
*/
public int port;
/**
* If the TcpNoDelay option should be used on the socket.
+ * @serial
*/
public boolean enableTcpNoDelay = false;
/**
* Timeout of setSoTimeout
+ * @serial
*/
public int timeout = 60000;
+ /** An option socket factory for connecting to the server
+ * @serial
+ */
+ public SocketFactory clientSocketFactory;
/**
* This object is used as a key in a hashmap,
@@ -51,13 +75,26 @@
*/
private transient int hashCode;
- public ServerAddress(String address, int port, boolean enableTcpNoDelay, int timeout)
+ /**
+ * The server address/port representation.
+ *
+ * @param address - hostname/ip of the server
+ * @param port - the invoker port
+ * @param enableTcpNoDelay - the Socket.setTcpNoDelay flag
+ * @param timeout - the Socket.setSoTimeout value
+ * @param clientSocketFactory - optional SocketFactory
+ */
+ public ServerAddress(String address, int port, boolean enableTcpNoDelay,
+ int timeout, SocketFactory clientSocketFactory)
{
this.address = address;
this.port = port;
this.enableTcpNoDelay = enableTcpNoDelay;
this.hashCode = address.hashCode() + port;
+ if( enableTcpNoDelay )
+ this.hashCode ++;
this.timeout = timeout;
+ this.clientSocketFactory = clientSocketFactory;
}
public String toString()
@@ -69,15 +106,14 @@
{
try
{
+ // Compare this to obj
ServerAddress o = (ServerAddress) obj;
- if (o.hashCode != hashCode)
+ if (port != o.port)
return false;
- if (port != port)
+ if (address.equals(o.address) == false)
return false;
- if (!o.address.equals(address))
+ if (enableTcpNoDelay != o.enableTcpNoDelay)
return false;
- if (o.enableTcpNoDelay != enableTcpNoDelay)
- return false;
return true;
}
catch (Throwable e)
@@ -91,4 +127,20 @@
return hashCode;
}
+ /**
+ * Create the transient hashCode
+ * @param in
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ private void readObject(java.io.ObjectInputStream in)
+ throws IOException, ClassNotFoundException
+ {
+ // Trigger default serialization
+ in.defaultReadObject();
+ // Build the hashCode
+ this.hashCode = address.hashCode() + port;
+ if( enableTcpNoDelay )
+ this.hashCode ++;
+ }
}
Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/LRUPool.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,11 +1,9 @@
-/***************************************
- * *
- * JBoss: The OpenSource J2EE WebOS *
- * *
- * Distributable under LGPL license. *
- * See terms of license at gnu.org. *
- * *
- ***************************************/
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
package org.jboss.invocation.pooled.server;
@@ -17,7 +15,7 @@
* it makes sure to call shutdown on the pooled ServerThread
*
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
- *
+ * @version $Revision$
*/
public class LRUPool extends LRUCachePolicy
{
Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvoker.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,38 +1,43 @@
-/***************************************
- * *
- * JBoss: The OpenSource J2EE WebOS *
- * *
- * Distributable under LGPL license. *
- * See terms of license at gnu.org. *
- * *
- ***************************************/
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
package org.jboss.invocation.pooled.server;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.UnknownHostException;
import java.util.LinkedList;
import java.security.PrivilegedExceptionAction;
import java.security.AccessController;
import java.security.PrivilegedActionException;
+import java.lang.reflect.Method;
+import java.rmi.NoSuchObjectException;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import javax.net.SocketFactory;
+import javax.net.ServerSocketFactory;
import org.jboss.invocation.Invocation;
-import org.jboss.invocation.MarshalledInvocation;
import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
import org.jboss.invocation.pooled.interfaces.ServerAddress;
-import org.jboss.invocation.jrmp.interfaces.JRMPInvokerProxy;
+import org.jboss.invocation.pooled.interfaces.PooledMarshalledInvocation;
import org.jboss.logging.Logger;
import org.jboss.proxy.TransactionInterceptor;
import org.jboss.system.Registry;
import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.system.server.ServerConfigUtil;
import org.jboss.tm.TransactionPropagationContextFactory;
import org.jboss.tm.TransactionPropagationContextImporter;
import org.jboss.tm.TransactionPropagationContextUtil;
+import org.jboss.security.SecurityDomain;
+import org.jboss.net.sockets.DefaultSocketFactory;
/**
* This invoker pools Threads and client connections to one server socket.
@@ -50,9 +55,10 @@
* on the server side. Pool, is an LRU pool, so resources should be cleaned up.
*
*
- * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @author Scott.Stark at jboss.org
+ * @version $Revision:$
*
- * @jmx:mbean extends="org.jboss.system.ServiceMBean"
*/
public class PooledInvoker extends ServiceMBeanSupport
implements PooledInvokerMBean, Runnable
@@ -87,11 +93,30 @@
* The port a client will use to connect to the sever.
*/
protected int clientConnectPort = 0;
+ /**
+ * The number of retry attempts on
+ */
+ protected int clientRetryCount = 1;
protected int backlog = 200;
+ /** The class name of the optional custom client socket factory */
+ protected String clientSocketFactoryName;
+
+ /** The class name of the optional custom server socket factory */
+ protected String serverSocketFactoryName;
+
+ /** An optional custom client socket factory */
+ protected SocketFactory clientSocketFactory;
+
+ /** An optional custom server socket factory */
+ protected ServerSocketFactory serverSocketFactory;
+ /** The server socket for */
protected ServerSocket serverSocket = null;
+ /** The name of the security domain to use with server sockets that support SSL */
+ protected String sslDomain;
+
protected int timeout = 60000; // 60 seconds.
protected int maxPoolSize = 300;
@@ -104,6 +129,8 @@
protected LRUPool clientpool;
protected LinkedList threadpool;
protected boolean running = true;
+ /** The logging trace level flag */
+ protected boolean trace = false;
/**
* ObjectName of the <code>transactionManagerService</code> we use.
* Probably should not be here -- used to set txInterceptor tx mananger.
@@ -122,16 +149,7 @@
// The following methods Override the ServiceMBeanSupport base class
//
////////////////////////////////////////////////////////////////////////
- /**
- * Gives this JMX service a name.
- * @return The Name value
- */
- public String getName()
- {
- return "Optimized-Invoker";
- }
-
protected void jmxBind()
{
Registry.bind(getServiceName(), optimizedInvokerProxy);
@@ -144,6 +162,7 @@
*/
public void startService() throws Exception
{
+ trace = log.isTraceEnabled();
///////////////////////////////////////////////////////////
// Setup the transaction stuff
@@ -172,16 +191,28 @@
(clientConnectAddress == null || clientConnectAddress.length() == 0)
? InetAddress.getLocalHost().getHostName()
: clientConnectAddress;
+ /* We need to check the address against "0.0.0.0" as this is not a valid
+ address although some jdks will default to the host, while others fail
+ with java.net.BindException: Cannot assign requested address: connect
+ */
+ clientConnectAddress = ServerConfigUtil.fixRemoteAddress(clientConnectAddress);
-
clientpool = new LRUPool(2, maxPoolSize);
clientpool.create();
threadpool = new LinkedList();
- serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
+ try
+ {
+ serverSocket = new ServerSocket(serverBindPort, backlog, bindAddress);
+ }catch( java.net.BindException be)
+ {
+ throw new Exception("Port "+serverBindPort+" is already in use",be);
+ }
+ serverBindPort = serverSocket.getLocalPort();
clientConnectPort = (clientConnectPort == 0) ? serverSocket.getLocalPort() : clientConnectPort;
- ServerAddress sa = new ServerAddress(clientConnectAddress, clientConnectPort, enableTcpNoDelay, timeout);
- optimizedInvokerProxy = new PooledInvokerProxy(sa, clientMaxPoolSize);
+ ServerAddress sa = new ServerAddress(clientConnectAddress, clientConnectPort,
+ enableTcpNoDelay, timeout, clientSocketFactory);
+ optimizedInvokerProxy = new PooledInvokerProxy(sa, clientMaxPoolSize, clientRetryCount);
///////////////////////////////////////////////////////////
// Register the service with the rest of the JBoss Kernel
@@ -194,20 +225,21 @@
acceptThreads = new Thread[numAcceptThreads];
for (int i = 0; i < numAcceptThreads; i++)
{
- acceptThreads[i] = new Thread(this);
+ String name = "PooledInvokerAcceptor#"+i+"-"+serverBindPort;
+ acceptThreads[i] = new Thread(this, name);
acceptThreads[i].start();
}
}
public void run()
{
-
while (running)
{
try
{
Socket socket = serverSocket.accept();
- //System.out.println("Thread accepted: " + Thread.currentThread());
+ if( trace )
+ log.trace("Accepted: "+socket);
ServerThread thread = null;
boolean newThread = false;
@@ -232,9 +264,11 @@
if (thread == null)
{
clientpool.evict();
- log.debug("**** WAITING *****");
+ if( trace )
+ log.trace("Waiting for a thread...");
clientpool.wait();
- log.debug("**** WOKE UP *****");
+ if( trace )
+ log.trace("Notified of available thread");
}
}
}
@@ -246,16 +280,18 @@
if (newThread)
{
- log.debug("**** ACQUIRED NEW *****");
+ if( trace )
+ log.trace("Created a new thread, t="+thread);
thread.start();
}
else
{
- log.debug("**** ACQUIRED OLD *****");
+ if( trace )
+ log.trace("Reusing thread t="+thread);
thread.wakeup(socket, timeout);
}
}
- catch (Exception ex)
+ catch (Throwable ex)
{
if (running)
log.error("Failed to accept socket connection", ex);
@@ -312,9 +348,14 @@
{
// Deserialize the transaction if it is there
- MarshalledInvocation mi = (MarshalledInvocation) invocation;
+ PooledMarshalledInvocation mi = (PooledMarshalledInvocation) invocation;
invocation.setTransaction(importTPC(mi.getTransactionPropagationContext()));
ObjectName mbean = (ObjectName) Registry.lookup(invocation.getObjectName());
+ if( mbean == null )
+ {
+ System.err.println("NoSuchObjectException: "+invocation.getObjectName());
+ throw new NoSuchObjectException("Failed to find target for objectName: "+invocation.getObjectName());
+ }
// The cl on the thread should be set in another interceptor
Object obj = serverAction.invoke(mbean, "invoke",
@@ -380,7 +421,7 @@
/**
* Setter for property maxPoolSize.
*
- * @param maxPoolSize New value of property serverBindPort.
+ * @param maxPoolSize New value of property maxPoolSize.
* @jmx:managed-attribute
*/
public void setMaxPoolSize(int maxPoolSize)
@@ -434,7 +475,7 @@
/**
*
- * @return Value of property serverBindPort.
+ * @return Value of property CurrentClientPoolSize.
* @jmx:managed-attribute
*/
public int getCurrentClientPoolSize()
@@ -444,7 +485,7 @@
/**
*
- * @return Value of property serverBindPort.
+ * @return Value of property CurrentThreadPoolSize.
* @jmx:managed-attribute
*/
public int getCurrentThreadPoolSize()
@@ -509,6 +550,22 @@
/**
* @jmx:managed-attribute
*/
+ public int getClientRetryCount()
+ {
+ return clientRetryCount;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setClientRetryCount(int clientRetryCount)
+ {
+ this.clientRetryCount = clientRetryCount;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
public int getBacklog()
{
return backlog;
@@ -554,8 +611,102 @@
this.serverBindAddress = serverBindAddress;
}
+ /**
+ * @jmx:managed-attribute
+ */
+ public String getClientSocketFactoryName()
+ {
+ return clientSocketFactoryName;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setClientSocketFactoryName(String clientSocketFactoryName)
+ {
+ this.clientSocketFactoryName = clientSocketFactoryName;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public String getServerSocketFactoryName()
+ {
+ return serverSocketFactoryName;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setServerSocketFactoryName(String serverSocketFactoryName)
+ {
+ this.serverSocketFactoryName = serverSocketFactoryName;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public SocketFactory getClientSocketFactory()
+ {
+ return clientSocketFactory;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setClientSocketFactory(SocketFactory clientSocketFactory)
+ {
+ this.clientSocketFactory = clientSocketFactory;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public ServerSocket getServerSocket()
+ {
+ return serverSocket;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setServerSocket(ServerSocket serverSocket)
+ {
+ this.serverSocket = serverSocket;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public String getSslDomain()
+ {
+ return sslDomain;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setSslDomain(String sslDomain)
+ {
+ this.sslDomain = sslDomain;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public ServerSocketFactory getServerSocketFactory()
+ {
+ return serverSocketFactory;
+ }
+
+ /**
+ * @jmx:managed-attribute
+ */
+ public void setServerSocketFactory(ServerSocketFactory serverSocketFactory)
+ {
+ this.serverSocketFactory = serverSocketFactory;
+ }
-
/**
* mbean get-set pair for field transactionManagerService
* Get the value of transactionManagerService
@@ -588,6 +739,101 @@
return optimizedInvokerProxy;
}
+ /** Load and instantiate the clientSocketFactory, serverSocketFactory using
+ the TCL and set the bind address and SSL domain if the serverSocketFactory
+ supports it.
+ */
+ protected void loadCustomSocketFactories()
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+
+ try
+ {
+ if( clientSocketFactoryName != null )
+ {
+ Class csfClass = loader.loadClass(clientSocketFactoryName);
+ clientSocketFactory = (SocketFactory) csfClass.newInstance();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to load client socket factory", e);
+ clientSocketFactory = null;
+ }
+
+ try
+ {
+ if( serverSocketFactoryName != null )
+ {
+ Class ssfClass = loader.loadClass(serverSocketFactoryName);
+ serverSocketFactory = (ServerSocketFactory) ssfClass.newInstance();
+ if( serverBindAddress != null )
+ {
+ // See if the server socket supports setBindAddress(String)
+ try
+ {
+ Class[] parameterTypes = {String.class};
+ Method m = ssfClass.getMethod("setBindAddress", parameterTypes);
+ Object[] args = {serverBindAddress};
+ m.invoke(serverSocketFactory, args);
+ }
+ catch (NoSuchMethodException e)
+ {
+ log.warn("Socket factory does not support setBindAddress(String)");
+ // Go with default address
+ }
+ catch (Exception e)
+ {
+ log.warn("Failed to setBindAddress="+serverBindAddress+" on socket factory", e);
+ // Go with default address
+ }
+ }
+ /* See if the server socket supports setSecurityDomain(SecurityDomain)
+ if an sslDomain was specified
+ */
+ if( sslDomain != null )
+ {
+ try
+ {
+ InitialContext ctx = new InitialContext();
+ SecurityDomain domain = (SecurityDomain) ctx.lookup(sslDomain);
+ Class[] parameterTypes = {SecurityDomain.class};
+ Method m = ssfClass.getMethod("setSecurityDomain", parameterTypes);
+ Object[] args = {domain};
+ m.invoke(serverSocketFactory, args);
+ }
+ catch(NoSuchMethodException e)
+ {
+ log.error("Socket factory does not support setSecurityDomain(SecurityDomain)");
+ }
+ catch(Exception e)
+ {
+ log.error("Failed to setSecurityDomain="+sslDomain+" on socket factory", e);
+ }
+ }
+ }
+ // If a bind address was specified create a DefaultSocketFactory
+ else if( serverBindAddress != null )
+ {
+ DefaultSocketFactory defaultFactory = new DefaultSocketFactory(backlog);
+ serverSocketFactory = defaultFactory;
+ try
+ {
+ defaultFactory.setBindAddress(serverBindAddress);
+ }
+ catch (UnknownHostException e)
+ {
+ log.error("Failed to setBindAddress="+serverBindAddress+" on socket factory", e);
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("operation failed", e);
+ serverSocketFactory = null;
+ }
+ }
+
/** Perform the MBeanServer.invoke op in a PrivilegedExceptionAction if
* running with a security manager.
*/
Added: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/PooledInvokerMBean.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -0,0 +1,183 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.invocation.pooled.server;
+
+import javax.management.ObjectName;
+
+import org.jboss.invocation.pooled.interfaces.PooledInvokerProxy;
+
+/**
+ * The PooledInvoker standard MBean interface.
+ * @author Bill Burke
+ * @author Scott.Stark at jboss.org
+ * @version $Revision$
+ */
+public interface PooledInvokerMBean extends org.jboss.system.ServiceMBean
+{
+
+ /**
+ * Getter for property numAcceptThreads
+ * @return Value of property numAcceptThreads
+ */
+ int getNumAcceptThreads();
+
+ /**
+ * Setter for property numAcceptThreads
+ * @param size New value of property numAcceptThreads.
+ */
+ void setNumAcceptThreads(int size);
+
+ /**
+ * Getter for property maxPoolSize;
+ * @return Value of property maxPoolSize.
+ */
+ int getMaxPoolSize();
+
+ /**
+ * Setter for property maxPoolSize.
+ * @param maxPoolSize New value of property serverBindPort.
+ */
+ void setMaxPoolSize(int maxPoolSize);
+
+ /**
+ * Getter for property maxPoolSize;
+ * @return Value of property maxPoolSize.
+ */
+ int getClientMaxPoolSize();
+
+ /**
+ * Setter for property maxPoolSize.
+ * @param clientMaxPoolSize New value of property serverBindPort.
+ */
+ void setClientMaxPoolSize(int clientMaxPoolSize);
+
+ /**
+ * Getter for property timeout
+ * @return Value of property timeout
+ */
+ int getSocketTimeout();
+
+ /**
+ * Setter for property timeout
+ * @param time New value of property timeout
+ */
+ void setSocketTimeout(int time);
+
+ /**
+ * @return Current client connection pool size
+ */
+ int getCurrentClientPoolSize();
+
+ /**
+ * @return current connection thread pool size
+ */
+ int getCurrentThreadPoolSize();
+
+ /**
+ * Getter for property serverBindPort.
+ * @return Value of property serverBindPort.
+ */
+ int getServerBindPort();
+
+ /**
+ * Setter for property serverBindPort.
+ * @param serverBindPort New value of property serverBindPort.
+ */
+ void setServerBindPort(int serverBindPort);
+
+ /**
+ * @return the address the client proxy connects to
+ */
+ String getClientConnectAddress();
+ /**
+ * Set the clientConnectAddress
+ * @param clientConnectAddress - address the client proxy connects to
+ */
+ void setClientConnectAddress(java.lang.String clientConnectAddress);
+
+ /**
+ * @return the client local bind port
+ */
+ int getClientConnectPort();
+ /**
+ * @param clientConnectPort - the client local bind port
+ */
+ void setClientConnectPort(int clientConnectPort);
+
+ /**
+ * @return the number of connect retries
+ */
+ public int getClientRetryCount();
+ /**
+ * @param clientRetryCount - the number of connect retries
+ */
+ public void setClientRetryCount(int clientRetryCount);
+
+ /**
+ * @return the server accept backlog
+ */
+ int getBacklog();
+ /**
+ * @param backlog - the server accept backlog
+ */
+ void setBacklog(int backlog);
+
+ /**
+ * @return Socket.setTcpNoDelay flag
+ */
+ boolean isEnableTcpNoDelay();
+ /**
+ * Socket.setTcpNoDelay flag
+ * @param enableTcpNoDelay
+ */
+ void setEnableTcpNoDelay(boolean enableTcpNoDelay);
+
+ String getServerBindAddress();
+
+ void setServerBindAddress(String serverBindAddress);
+
+ /**
+ * mbean get-set pair for field transactionManagerService Get the value of
+ * transactionManagerService
+ * @return value of transactionManagerService
+ */
+ ObjectName getTransactionManagerService();
+
+ /**
+ * Set the value of transactionManagerService
+ * @param transactionManagerService Value to assign to transactionManagerService
+ */
+ void setTransactionManagerService(ObjectName transactionManagerService);
+
+ PooledInvokerProxy getOptimizedInvokerProxy();
+
+ /**
+ * Set the client socket factory implementation
+ * @return the javax.net.SocketFactory implementation class name
+ */
+ public String getClientSocketFactoryName();
+ /**
+ * Set the client factory implementation
+ * @param factoryName - the javax.net.SocketFactory implementation class name
+ */
+ public void setClientSocketFactoryName(String factoryName);
+
+ /**
+ * Set the server socket factory implementation
+ * @return the javax.net.ServerSocketFactory implementation class name
+ */
+ public String getServerSocketFactoryName();
+ /**
+ * Set the server factory implementation
+ * @param factoryName - the javax.net.ServerSocketFactory implementation class name
+ */
+ public void setServerSocketFactoryName(String factoryName);
+
+ public String getSslDomain();
+ public void setSslDomain(String sslDomain);
+
+}
Modified: branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java
===================================================================
--- branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java 2006-09-01 06:57:06 UTC (rev 56497)
+++ branches/JBoss_4_0_0_Case10829/server/src/main/org/jboss/invocation/pooled/server/ServerThread.java 2006-09-01 06:58:56 UTC (rev 56498)
@@ -1,27 +1,24 @@
-/***************************************
- * *
- * JBoss: The OpenSource J2EE WebOS *
- * *
- * Distributable under LGPL license. *
- * See terms of license at gnu.org. *
- * *
- ***************************************/
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
package org.jboss.invocation.pooled.server;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
+import java.io.InterruptedIOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
import java.net.Socket;
-import java.rmi.MarshalledObject;
+import java.util.LinkedList;
+
import org.jboss.invocation.Invocation;
+import org.jboss.invocation.pooled.interfaces.OptimizedObjectInputStream;
+import org.jboss.invocation.pooled.interfaces.OptimizedObjectOutputStream;
import org.jboss.logging.Logger;
-import java.util.LinkedList;
/**
* This Thread object hold a single Socket connection to a client
@@ -38,6 +35,7 @@
* done by the TrunkInvoker.
*
* @author <a href="mailto:bill at jboss.org">Bill Burke</a>
+ * @version $Revision$
*/
public class ServerThread extends Thread
{
@@ -54,9 +52,16 @@
protected volatile boolean shutdown = false;
protected static int id = 0;
- public ServerThread(Socket socket, PooledInvoker invoker, LRUPool clientpool, LinkedList threadpool, int timeout) throws Exception
+ public static synchronized int nextID()
{
- super("PooledInvokerThread-" + id++);
+ int nextID = id ++;
+ return nextID;
+ }
+
+ public ServerThread(Socket socket, PooledInvoker invoker, LRUPool clientpool,
+ LinkedList threadpool, int timeout) throws Exception
+ {
+ super("PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID());
this.socket = socket;
this.invoker = invoker;
this.clientpool = clientpool;
@@ -81,7 +86,7 @@
try
{
this.interrupt();
- this.interrupted(); // clear
+ Thread.interrupted(); // clear
}
catch (Exception ignored) {}
}
@@ -107,7 +112,7 @@
try
{
this.interrupt();
- this.interrupted(); // clear
+ Thread.interrupted(); // clear
}
catch (Exception ignored) {}
}
@@ -117,6 +122,8 @@
public synchronized void wakeup(Socket socket, int timeout) throws Exception
{
this.socket = socket;
+ String name = "PooledInvokerThread-" + socket.getInetAddress().getHostAddress()+"-"+nextID();
+ super.setName(name);
socket.setSoTimeout(timeout);
running = true;
handlingResponse = true;
@@ -155,6 +162,7 @@
clientpool.remove(this);
//System.out.println("adding myself to threadpool");
threadpool.add(this);
+ Thread.interrupted(); // clear any interruption so that we can be pooled.
clientpool.notify();
}
}
@@ -200,12 +208,15 @@
Object response = null;
try
{
+ // Make absolutely sure thread interrupted is cleared.
+ boolean interrupted = Thread.interrupted();
response = invoker.invoke(invocation);
}
catch (Exception ex)
{
response = ex;
}
+ Thread.interrupted(); // clear interrupted state so we don't fail on socket writes
out.writeObject(response);
out.reset();
// to make sure stream gets reset
@@ -227,9 +238,11 @@
handlingResponse = true;
try
{
- out = new ObjectOutputStream(new BufferedOutputStream(socket.getOutputStream()));
+ BufferedOutputStream bos = new BufferedOutputStream(socket.getOutputStream());
+ out = new OptimizedObjectOutputStream(bos);
out.flush();
- in = new ObjectInputStream(new BufferedInputStream(socket.getInputStream()));
+ BufferedInputStream bis = new BufferedInputStream(socket.getInputStream());
+ in = new OptimizedObjectInputStream(bis);
}
catch (Exception e)
{
@@ -246,7 +259,6 @@
running = false;
}
- //System.out.println("****entering re-use loop");
// Re-use loop
while (running)
{
@@ -255,22 +267,22 @@
acknowledge();
processInvocation();
}
- catch (InterruptedIOException iex)
+ catch (InterruptedIOException e)
{
- //System.out.println("exception found!");
- log.debug("socket timed out");
+ log.debug("socket timed out", e);
running = false;
}
- catch (InterruptedException intr)
+ catch (InterruptedException e)
{
- log.debug("interrupted");
+ log.debug("interrupted", e);
}
catch (Exception ex)
{
- //System.out.println("exception found!");
- //log.error("failed", ex);
+ log.debug("failed", ex);
running = false;
}
+ // clear any interruption so that thread can be pooled.
+ Thread.interrupted();
}
//System.out.println("finished loop:" + Thread.currentThread());
// Ok, we've been shutdown. Do appropriate cleanups.
More information about the jboss-cvs-commits
mailing list