[jboss-cvs] JBossAS SVN: r58749 - branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 29 13:35:32 EST 2006
Author: tom.elrod at jboss.com
Date: 2006-11-29 13:35:31 -0500 (Wed, 29 Nov 2006)
New Revision: 58749
Modified:
branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
Log:
ASPATCH-121 - patch for JBAS-3216 to JBoss_4_0_4_GA_CP branch.
Modified: branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java
===================================================================
--- branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java 2006-11-29 18:22:27 UTC (rev 58748)
+++ branches/JBoss_4_0_4_GA_CP/server/src/main/org/jboss/invocation/pooled/interfaces/PooledInvokerProxy.java 2006-11-29 18:35:31 UTC (rev 58749)
@@ -32,7 +32,9 @@
import java.io.EOFException;
import java.io.OptionalDataException;
import java.io.UnsupportedEncodingException;
+import java.io.InterruptedIOException;
import java.net.Socket;
+import java.net.SocketException;
import java.rmi.MarshalledObject;
import java.rmi.NoSuchObjectException;
import java.rmi.ServerException;
@@ -79,7 +81,7 @@
* Factory for transaction propagation contexts.
*
* @todo marcf remove all transaction spill from here
- *
+ *
* When set to a non-null value, it is used to get transaction
* propagation contexts for remote method invocations.
* If <code>null</code>, transactions are not propagated on
@@ -88,7 +90,7 @@
protected static TransactionPropagationContextFactory tpcFactory = null;
// @todo: MOVE TO TRANSACTION
- //
+ //
// TPC factory
public static void setTPCFactory(TransactionPropagationContextFactory tpcf) {
tpcFactory = tpcf;
@@ -104,6 +106,10 @@
public static long usedPooled = 0;
/** The number of connections in use */
private static int inUseCount = 0;
+ /** The number of socket connections made */
+ private static long socketConnectCount = 0;
+ /** The number of socket close calls made */
+ private static long socketCloseCount = 0;
/**
* Set number of retries in getSocket method
@@ -215,6 +221,17 @@
}
}
+ public String toString()
+ {
+ StringBuffer tmp = new StringBuffer("ClientSocket@");
+ tmp.append(System.identityHashCode(this));
+ tmp.append('[');
+ tmp.append("socket=");
+ tmp.append(socket.toString());
+ tmp.append(']');
+ return tmp.toString();
+ }
+
/**
* @todo should this be handled with weak references as this should
* work better with gc
@@ -225,10 +242,19 @@
{
if( trace )
log.trace("Closing socket in finalize: "+socket);
- try { socket.close(); } catch (Exception ignored) {}
+ try
+ {
+ socketCloseCount --;
+ socket.close();
}
+ catch (Exception ignored) {}
+ finally
+ {
+ socket = null;
}
}
+ }
+ }
/**
* Clear all class level stats
@@ -240,7 +266,7 @@
writeTime = 0;
serializeTime = 0;
deserializeTime = 0;
- usedPooled = 0;
+ usedPooled = 0;
}
/**
@@ -258,6 +284,14 @@
{
return usedPooled;
}
+ public static long getSocketConnectCount()
+ {
+ return socketConnectCount;
+ }
+ public static long getSocketCloseCount()
+ {
+ return socketCloseCount;
+ }
/**
* @return the total number of pooled connections across all ServerAddresses
@@ -324,20 +358,27 @@
int size = thepool.size();
for (int i = 0; i < size; i++)
{
- ClientSocket socket = (ClientSocket)thepool.removeFirst();
+ ClientSocket cs = null;
try
{
+ ClientSocket socket = (ClientSocket)thepool.removeFirst();
+ cs = socket;
if( trace )
- log.trace("Closing, ClientSocket: "+socket.socket);
+ log.trace("Closing, ClientSocket: "+socket);
+ socketCloseCount --;
socket.socket.close();
- socket.socket = null;
}
catch (Exception ignored)
{
}
+ finally
+ {
+ if( cs != null )
+ cs.socket = null;
}
}
}
+ }
catch (Exception ex)
{
// ignored
@@ -375,6 +416,7 @@
protected ClientSocket getConnection() throws Exception
{
Socket socket = null;
+ ClientSocket cs = null;
//
// Need to retry a few times
@@ -387,7 +429,7 @@
// This problem always happens with RMI and seems to
// have nothing to do with backlog or number of threads
// waiting in accept() on the server.
- //
+ //
for (int i = 0; i < retryCount; i++)
{
ClientSocket pooled = getPooledConnection();
@@ -404,18 +446,36 @@
{
log.trace("Connecting to addr: "+address.address
+", port: "+address.port
- +",clientSocketFactory: "+address.clientSocketFactory);
+ +",clientSocketFactory: "+address.clientSocketFactory
+ +",enableTcpNoDelay: "+address.enableTcpNoDelay
+ +",timeout: "+address.timeout);
}
if( address.clientSocketFactory != null )
socket = address.clientSocketFactory.createSocket(address.address, address.port);
else
socket = new Socket(address.address, address.port);
+ socketConnectCount ++;
if( trace )
- log.trace("Connected: "+socket);
+ log.trace("Connected, socket="+socket);
+
+ socket.setTcpNoDelay(address.enableTcpNoDelay);
+ cs = new ClientSocket(socket, address.timeout);
+ inUseCount ++;
+ if( trace )
+ {
+ log.trace("New ClientSocket: "+cs
+ +", usedPooled="+ usedPooled
+ +", inUseCount="+ inUseCount
+ +", socketConnectCount="+ socketConnectCount
+ +", socketCloseCount="+ socketCloseCount
+ );
+ }
break;
}
- catch (ConnectException ex)
+ catch (Exception ex)
{
+ if( ex instanceof InterruptedIOException || ex instanceof SocketException )
+ {
if( trace )
log.trace("Connect failed", ex);
if (i + 1 < retryCount)
@@ -423,16 +483,14 @@
Thread.sleep(1);
continue;
}
+ }
throw ex;
}
}
// Should not happen
- if( socket == null )
+ if( cs == null )
throw new ConnectException("Failed to obtain a socket, tries="+retryCount);
-
- inUseCount ++;
- socket.setTcpNoDelay(address.enableTcpNoDelay);
- return new ClientSocket(socket, address.timeout);
+ return cs;
}
protected synchronized ClientSocket getPooledConnection()
@@ -440,31 +498,52 @@
ClientSocket socket = null;
while (pool.size() > 0)
{
- socket = (ClientSocket)pool.removeFirst();
try
{
+ synchronized( pool )
+ {
+ socket = (ClientSocket)pool.removeFirst();
+ }
// Test to see if socket is alive by send ACK message
+ if( trace )
+ log.trace("Checking pooled socket: "+socket+", address: "+socket.socket.getLocalSocketAddress());
final byte ACK = 1;
socket.out.writeByte(ACK);
socket.out.flush();
socket.in.readByte();
if( trace )
- log.trace("Using pooled socket: "+socket);
+ {
+ log.trace("Using pooled ClientSocket: "+socket
+ +", usedPooled="+ usedPooled
+ +", inUseCount="+ inUseCount
+ +", socketConnectCount="+ socketConnectCount
+ +", socketCloseCount="+ socketCloseCount
+ );
+ }
return socket;
}
catch (Exception ex)
{
if( trace )
- log.trace("Failed to validate pooled socket: "+socket);
+ log.trace("Failed to validate pooled socket: "+socket, ex);
try
{
+ if( socket != null )
+ {
+ socketCloseCount --;
socket.socket.close();
}
+ }
catch (Exception ignored)
{
}
+ finally
+ {
+ if( socket != null )
+ socket.socket = null;
}
}
+ }
return null;
}
@@ -477,12 +556,15 @@
protected synchronized boolean returnConnection(ClientSocket socket)
{
boolean pooled = false;
+ synchronized( pool )
+ {
if (pool.size() < maxPoolSize)
{
pool.add(socket);
inUseCount --;
pooled = true;
}
+ }
return pooled;
}
@@ -498,7 +580,7 @@
* ???
*
* @todo MOVE TO TRANSACTION
- *
+ *
* @return the transaction propagation context of the transaction
* associated with the current thread.
* Returns <code>null</code> if the transaction manager was never
@@ -512,8 +594,8 @@
/**
- * The invocation on the delegate, calls the right invoker. Remote if we are remote,
- * local if we are local.
+ * The invocation on the delegate, calls the right invoker. Remote if we are remote,
+ * local if we are local.
*/
public Object invoke(Invocation invocation)
throws Exception
@@ -561,14 +643,20 @@
}
catch (Exception ex)
{
+ if( trace )
+ log.trace("Failure during invoke", ex);
try
{
+ socketCloseCount --;
socket.socket.close();
}
catch (Exception ignored) {}
- //System.out.println("got read exception, exiting");
- throw new ConnectException("Failed to communicate", ex);
+ finally
+ {
+ socket.socket = null;
}
+ throw new java.rmi.ConnectException("Failure during invoke", ex);
+ }
// Put socket back in pool for reuse
if( returnConnection(socket) == false )
@@ -578,10 +666,15 @@
log.trace("Closing unpooled socket: "+socket);
try
{
+ socketCloseCount --;
socket.socket.close();
}
catch (Exception ignored) {}
+ finally
+ {
+ socket.socket = null;
}
+ }
// Return response
@@ -618,7 +711,7 @@
/**
* Write out the serializable data
* @serialData address ServerAddress
- * @serialData maxPoolSize int
+ * @serialData maxPoolSize int
* @serialData WIRE_VERSION int version
* @serialData retryCount int
* @param out
More information about the jboss-cvs-commits
mailing list