[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Sat Feb 3 00:12:21 EST 2007
User: rsigal
Date: 07/02/03 00:12:21
Modified: src/main/org/jboss/remoting/transport/socket
MicroSocketClientInvoker.java
Log:
JBREM-598, JBREM-650, JBREM-662, JBREM-690, JBREM-692, JBREM-694: sync with remoting_2_x.
Revision Changes Path
1.21 +478 -400 JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroSocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -b -r1.20 -r1.21
--- MicroSocketClientInvoker.java 23 Jan 2007 01:02:29 -0000 1.20
+++ MicroSocketClientInvoker.java 3 Feb 2007 05:12:21 -0000 1.21
@@ -5,6 +5,7 @@
import org.jboss.remoting.ConnectionFailedException;
import org.jboss.remoting.InvokerLocator;
import org.jboss.remoting.RemoteClientInvoker;
+import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.Version;
import org.jboss.remoting.serialization.ClassLoaderUtility;
import org.jboss.remoting.marshal.Marshaller;
@@ -27,40 +28,40 @@
import java.util.Properties;
/**
- * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which
- * must be a SocketServerInvoker.
+ * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
+ * a SocketServerInvoker.
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.20 $
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
+ * @version $Revision: 1.21 $
*/
public class MicroSocketClientInvoker extends RemoteClientInvoker
{
- private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
- private static final boolean trace = log.isTraceEnabled();
+ // Constants ------------------------------------------------------------------------------------
- protected InetAddress addr;
- protected int port;
+ private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
/**
- * can be either true or false and will indicate if client socket should have TCP_NODELAY turned on or off.
- * TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
+ * Can be either true or false and will indicate if client socket should have TCP_NODELAY turned
+ * on or off. TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
* It should only be set for applications that send frequent small bursts of information without
- * getting an immediate response; where timely delivery of data is required
- * (the canonical example is mouse movements). The default is false.
+ * getting an immediate response; where timely delivery of data is required (the canonical
+ * example is mouse movements). The default is false.
*/
public static final String TCP_NODELAY_FLAG = "enableTcpNoDelay";
/**
- * the client side maximum number of threads. The default is MAX_POOL_SIZE.
+ * The client side maximum number of threads. The default is MAX_POOL_SIZE.
*/
public static final String MAX_POOL_SIZE_FLAG = "clientMaxPoolSize";
/**
- * Specifies the fully qualified class name for the custom SocketWrapper implementation
- * to use on the client. Note, will need to make sure this is marked as a client
- * parameter (using the 'isParam' attribute). Making this change will not
- * affect the marshaller/unmarshaller that is used, which may also be a requirement.
+ * Specifies the fully qualified class name for the custom SocketWrapper implementation to use on
+ * the client. Note, will need to make sure this is marked as a client parameter (using the
+ * 'isParam' attribute). Making this change will not affect the marshaller/unmarshaller that is
+ * used, which may also be a requirement.
*/
public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
@@ -70,11 +71,34 @@
public static final boolean TCP_NODELAY_DEFAULT = false;
/**
- * Indicates if will check the socket connection when
- * getting from pool by sending byte over the connection
- * to validate is still good.
+ * Default maximum number of retries to get a valid socket from the* socket pool. This also
+ * translates to number of seconds will wait for connection to be returned to connection pool
+ * before erroring. Default is 30.
+ */
+ public static final int MAX_RETRIES = 30;
+
+ /**
+ * Default maximum number of times a invocation will be made when it gets a SocketException.
+ * Default is 3.
+ */
+ public static final int MAX_CALL_RETRIES = 3;
+
+ /**
+ * Default maximum number of socket connections allowed at any point in time. Default is 50.
+ */
+ public static final int MAX_POOL_SIZE = 50;
+
+
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ /**
+ * Used for debugging (tracing) connections leaks
*/
- protected boolean shouldCheckConnection = false;
+ static int counter = 0;
+
+ protected static final Map connectionPools = new HashMap();
// Performance measurements
public static long getSocketTime = 0;
@@ -84,60 +108,102 @@
public static long deserializeTime = 0;
/**
- * If the TcpNoDelay option should be used on the socket.
+ * Close all sockets in a specific pool.
*/
- protected boolean enableTcpNoDelay = TCP_NODELAY_DEFAULT;
-
- protected String clientSocketClassName = ClientSocketWrapper.class.getName();
- private Constructor clientSocketConstructor = null;
- protected Class clientSocketClass = null;
+ public static void clearPool(ServerAddress sa)
+ {
+ try
+ {
+ LinkedList thepool = (LinkedList)connectionPools.get(sa);
+ if (thepool == null)
+ {
+ return;
+ }
+ synchronized (thepool)
+ {
+ int size = thepool.size();
+ for (int i = 0; i < size; i++)
+ {
+ SocketWrapper socketWrapper = (SocketWrapper)thepool.removeFirst();
+ try
+ {
+ socketWrapper.close();
+ socketWrapper = null;
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ log.debug("Failure", ex);
+ }
+ }
/**
- * Default max number of retries to get a valid socket from the
- * socket pool. This also translates to number of seconds will wait
- * for connection to be returned to connection pool before erroring. Default is 30.
+ * Close all sockets in all pools.
*/
- public static final int MAX_RETRIES = 30;
- public long usedPooled = 0;
- public Object usedPoolLock = new Object();
+ public static void clearPools()
+ {
+ synchronized (connectionPools)
+ {
+ for(Iterator i = connectionPools.keySet().iterator(); i.hasNext();)
+ {
+ ServerAddress sa = (ServerAddress) i.next();
+
+ if (trace) { log.trace("clearing pool for " + sa); }
+ clearPool(sa);
+ }
+ }
+ }
- protected int numberOfRetries = MAX_RETRIES;
+ // Attributes -----------------------------------------------------------------------------------
- /**
- * Default max number of times a invocation will be made
- * when it gets a SocketException. Default is 3.
- */
- public static final int MAX_CALL_RETRIES = 3;
+ private Constructor clientSocketConstructor;
+ private boolean reuseAddress;
+
+ protected InetAddress addr;
+ protected int port;
- protected int numberOfCallRetries = MAX_CALL_RETRIES;
+ // flag being set on true by a disconnect request. If trying to create a connection goes on in a
+ // loop and a disconnect request arrives, this flag will be used to sent this information into
+ // the connect loop
+ private volatile boolean bailOut;
/**
- * Pool for this invoker. This is shared between all
- * instances of proxies attached to a specific invoker
+ * Indicates if will check the socket connection when getting from pool by sending byte over the
+ * connection to validate is still good.
*/
- protected LinkedList pool = null;
+ protected boolean shouldCheckConnection;
/**
- * connection information
+ * If the TcpNoDelay option should be used on the socket.
*/
- protected ServerAddress address;
+ protected boolean enableTcpNoDelay;
- protected static final Map connectionPools = new HashMap();
+ protected String clientSocketClassName;
+ protected Class clientSocketClass;
+ protected int numberOfRetries;
+ protected int numberOfCallRetries;
+ protected int maxPoolSize;
/**
- * Default max number of socket connections allowed at any
- * point in time. Default is 50.
+ * Pool for this invoker. This is shared between all instances of proxies attached to a specific
+ * invoker.
*/
- public static final int MAX_POOL_SIZE = 50;
+ protected LinkedList pool;
- protected int maxPoolSize = MAX_POOL_SIZE;
+ /**
+ * connection information
+ */
+ protected ServerAddress address;
- private boolean reuseAddress = true;
+ public long usedPooled;
+ public Object usedPoolLock;
- // flag being set on true by a disconnect request. If trying to create a connection goes on in a
- // loop and a disconnect request arrives, this flag will be used to sent this information into
- // the connect loop
- private volatile boolean bailOut;
+ // Constructors ---------------------------------------------------------------------------------
public MicroSocketClientInvoker(InvokerLocator locator)
{
@@ -147,22 +213,145 @@
public MicroSocketClientInvoker(InvokerLocator locator, Map configuration)
{
super(locator, configuration);
+
+ clientSocketConstructor = null;
+ reuseAddress = true;
+ shouldCheckConnection = false;
+ enableTcpNoDelay = TCP_NODELAY_DEFAULT;
+ clientSocketClassName = ClientSocketWrapper.class.getName();
+ clientSocketClass = null;
+ numberOfRetries = MAX_RETRIES;
+ numberOfCallRetries = MAX_CALL_RETRIES;
+ pool = null;
+ maxPoolSize = MAX_POOL_SIZE;
+ usedPooled = 0;
+ usedPoolLock = new Object();
+
try
{
setup();
}
catch (Exception ex)
{
- log.error("Error setting up socket client invoker.", ex);
+ log.error("Error setting up " + this, ex);
throw new RuntimeException(ex.getMessage());
}
+
+ log.debug(this + " constructed");
}
- protected void setup()
- throws Exception
+ // Public ---------------------------------------------------------------------------------------
+
+ /**
+ * Indicates if will check socket connection when returning from pool by sending byte to the
+ * server. Default value will be false.
+ */
+ public boolean checkingConnection()
+ {
+ return shouldCheckConnection;
+ }
+
+ /**
+ * Returns if newly created sockets will have SO_REUSEADDR enabled. Default is for this to be
+ * true.
+ */
+ public boolean getReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ /**
+ * Sets if newly created socket should have SO_REUSEADDR enable. Default is true.
+ */
+ public void setReuseAddress(boolean reuse)
+ {
+ reuseAddress = reuse;
+ }
+
+ public synchronized void disconnect()
+ {
+ log.debug(this + " disconnecting ...");
+ bailOut = true;
+ super.disconnect();
+ }
+
+ public void flushConnectionPool()
+ {
+ synchronized (pool)
+ {
+ while (pool != null && pool.size() > 0)
{
- this.addr = InetAddress.getByName(locator.getHost());
- this.port = locator.getPort();
+ SocketWrapper socketWrapper = (SocketWrapper)pool.removeFirst();
+ try
+ {
+ socketWrapper.close();
+ }
+ catch (IOException e)
+ {
+ log.debug("Failed to close socket wrapper", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Sets the number of times an invocation will retry based on getting SocketException.
+ */
+ public void setNumberOfCallRetries(int numberOfCallRetries)
+ {
+ if (numberOfCallRetries < 1)
+ {
+ this.numberOfCallRetries = MAX_CALL_RETRIES;
+ }
+ else
+ {
+ this.numberOfCallRetries = numberOfCallRetries;
+ }
+ }
+
+ public int getNumberOfCallRetries()
+ {
+ return numberOfCallRetries;
+ }
+
+ /**
+ * Sets the number of retries to get a socket connection.
+ *
+ * @param numberOfRetries Must be a number greater than 0.
+ */
+ public void setNumberOfRetries(int numberOfRetries)
+ {
+ if (numberOfRetries < 1)
+ {
+ this.numberOfRetries = MAX_RETRIES;
+ }
+ else
+ {
+ this.numberOfRetries = numberOfRetries;
+ }
+ }
+
+ public int getNumberOfRetries()
+ {
+ return numberOfRetries;
+ }
+
+ /**
+ * The name of of the server.
+ */
+ public String getServerHostName() throws Exception
+ {
+ return address.address;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setup() throws Exception
+ {
+ addr = InetAddress.getByName(locator.getHost());
+ port = locator.getPort();
Properties props = new Properties();
props.putAll(configuration);
@@ -173,31 +362,31 @@
address = createServerAddress();
}
- protected ServerAddress createServerAddress()
- {
- return new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1);
- }
-
protected void configureParameters()
{
Map params = configuration;
- if (params != null)
+
+ if (params == null)
{
+ return;
+ }
+
// look for enableTcpNoDelay param
Object val = params.get(TCP_NODELAY_FLAG);
if (val != null)
{
try
{
- boolean bVal = Boolean.valueOf((String) val).booleanValue();
- enableTcpNoDelay = bVal;
- log.debug("Setting SocketClientInvoker::enableTcpNoDelay to: " + enableTcpNoDelay);
+ enableTcpNoDelay = Boolean.valueOf((String)val).booleanValue();
+ log.debug(this + " setting enableTcpNoDelay to " + enableTcpNoDelay);
}
catch (Exception e)
{
- log.warn("Could not convert " + TCP_NODELAY_FLAG + " value of " + val + " to a boolean value.");
+ log.warn(this + " could not convert " + TCP_NODELAY_FLAG + " value of " +
+ val + " to a boolean value.");
}
}
+
// look for maxPoolSize param
val = params.get(MAX_POOL_SIZE_FLAG);
if (val != null)
@@ -205,22 +394,24 @@
try
{
maxPoolSize = Integer.valueOf((String)val).intValue();
- log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
+ log.debug(this + " setting maxPoolSize to " + maxPoolSize);
}
catch (Exception e)
{
- log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
+ log.warn(this + " could not convert " + MAX_POOL_SIZE_FLAG + " value of " +
+ val + " to a int value");
}
}
+
// look for client socket class name
val = params.get(CLIENT_SOCKET_CLASS_FLAG);
if (val != null)
{
- String value = (String) val;
+ String value = (String)val;
if (value.length() > 0)
{
clientSocketClassName = value;
- log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
+ log.debug(this + " setting client socket wrapper class name to " + clientSocketClassName);
}
}
@@ -231,44 +422,14 @@
if (value.length() > 0)
{
shouldCheckConnection = Boolean.valueOf(value).booleanValue();
+ log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
}
}
}
- }
-
- /**
- * Indicates if will check socket connection when returning from
- * pool by sending byte to the server. Default value will be false.
- */
- public boolean checkingConnection()
- {
- return shouldCheckConnection;
- }
-
- /**
- * Returns if newly created sockets will have SO_REUSEADDR enabled.
- * Default is for this to be true.
- */
- public boolean getReuseAddress()
- {
- return reuseAddress;
- }
- /**
- * Sets if newly created socket should have SO_REUSEADDR enable.
- * Default is true.
- * @param reuse
- */
- public void setReuseAddress(boolean reuse)
- {
- reuseAddress = reuse;
- }
-
- public synchronized void disconnect()
+ protected ServerAddress createServerAddress()
{
- if (trace) { log.trace(this + " disconnecting ..."); }
- bailOut = true;
- super.disconnect();
+ return new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1);
}
protected void finalize() throws Throwable
@@ -277,8 +438,7 @@
super.finalize();
}
- protected synchronized void handleConnect()
- throws ConnectionFailedException
+ protected synchronized void handleConnect() throws ConnectionFailedException
{
initPool();
}
@@ -289,44 +449,73 @@
}
/**
- * Each implementation of the remote client invoker should have
- * a default data type that is uses in the case it is not specified
- * in the invoker locator uri.
+ * Each implementation of the remote client invoker should have a default data type that is used
+ * in the case it is not specified in the invoker locator URI.
*/
protected String getDefaultDataType()
{
return SerializableMarshaller.DATATYPE;
}
- /**
- * @param sessionId
- * @param invocation
- * @param marshaller
- * @return
- * @throws java.io.IOException
- * @throws org.jboss.remoting.ConnectionFailedException
- *
- */
- protected Object transport(String sessionId, Object invocation, Map metadata,
+ protected Object transport(String sessionID, Object invocation, Map metadata,
Marshaller marshaller, UnMarshaller unmarshaller)
throws IOException, ConnectionFailedException, ClassNotFoundException
{
-
- Object response = null;
long start = System.currentTimeMillis();
SocketWrapper socketWrapper = null;
+ Object response = null;
+ boolean oneway = false;
+ int tempTimeout = -1;
+ int savedTimeout = -1;
+
+ if(metadata != null)
+ {
+ // check to see if is one way invocation and return after writing invocation if is
+ Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
+ if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
+ {
+ oneway = true;
+ }
+
+ // look for temporary timeout values
+ String tempTimeoutString = (String) metadata.get(ServerInvoker.TIMEOUT);
+ {
+ if (tempTimeoutString != null)
+ {
+ try
+ {
+ tempTimeout = Integer.valueOf(tempTimeoutString).intValue();
+ log.debug(this + " setting timeout to " + tempTimeout + " for this invocation");
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + ServerInvoker.TIMEOUT + " value of " +
+ tempTimeoutString + " to an integer value.");
+ }
+ }
+ }
+ }
- int x = 0;
+ int retryCount = 0;
SocketException sockEx = null;
- for (; x < numberOfCallRetries; x++)
+
+ for (; retryCount < numberOfCallRetries; retryCount++)
{
try
{
- socketWrapper = getConnection();
+ socketWrapper = getConnection(marshaller, unmarshaller, tempTimeout);
}
catch (Exception e)
{
- throw new CannotConnectException("Can not get connection to server. Problem establishing socket connection for locator - " + locator, e);
+ throw new CannotConnectException(
+ "Can not get connection to server. Problem establishing " +
+ "socket connection for " + locator, e);
+ }
+
+ if (tempTimeout >= 0)
+ {
+ savedTimeout = socketWrapper.getTimeout();
+ socketWrapper.setTimeout(tempTimeout);
}
long end = System.currentTimeMillis() - start;
@@ -351,20 +540,11 @@
writeTime += end;
start = System.currentTimeMillis();
- // check to see if is one way invocation and return if is
- boolean oneway = false;
- if(metadata != null)
- {
- Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
- if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
+ if (oneway)
{
if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
-// return null;
- oneway = true;
}
- }
-
- if (!oneway)
+ else
{
InputStream inputStream = socketWrapper.getInputStream();
if (performVersioning)
@@ -381,6 +561,15 @@
end = System.currentTimeMillis() - start;
readTime += end;
+
+ // Note that resetting the timeout value after closing the socket results
+ // in an exception, so the reset is not done in a finally clause. However,
+ // if a catch clause is ever added that does not close the socket, care
+ // must be taken to reset the timeout in that case.
+ if (tempTimeout >= 0)
+ {
+ socketWrapper.setTimeout(savedTimeout);
+ }
}
catch (SocketException sex)
{
@@ -402,7 +591,7 @@
* so want to flush the pool and try with
* fresh socket as a last effort.
*/
- if (x == (numberOfCallRetries - 2))
+ if (retryCount == (numberOfCallRetries - 2))
{
flushConnectionPool();
}
@@ -429,7 +618,7 @@
}
// need to check if ran out of retries
- if (x >= numberOfCallRetries)
+ if (retryCount >= numberOfCallRetries)
{
handleException(sockEx, socketWrapper);
}
@@ -444,13 +633,11 @@
{
usedPooled--;
}
+ if (trace) { log.trace(this + " returned " + socketWrapper + " to pool"); }
}
else
{
- if (trace)
- {
- log.trace("Pool was already full, will close the connection");
- }
+ if (trace) { log.trace(this + "'s pool is full, will close the connection"); }
try
{
socketWrapper.close();
@@ -461,232 +648,52 @@
}
}
- if (trace) { log.trace(this + " received " + response); }
+ if (trace && !oneway) { log.trace(this + " received response " + response); }
return response;
-
- }
-
- public void flushConnectionPool()
- {
- synchronized (pool)
- {
- while (pool != null && pool.size() > 0)
- {
- SocketWrapper socketWrapper = (SocketWrapper) pool.removeFirst();
- try
- {
- socketWrapper.close();
- }
- catch (IOException e)
- {
-
- }
- }
- }
}
protected Object handleException(Exception ex, SocketWrapper socketWrapper)
throws ClassNotFoundException, MarshalException
{
log.error(this + " got marshalling exception, exiting ...", ex);
+
if (ex instanceof ClassNotFoundException)
{
//TODO: -TME Add better exception handling for class not found exception
log.error("Error loading classes from remote call result.", ex);
- throw (ClassNotFoundException) ex;
- }
-
- throw new MarshalException("Failed to communicate. Problem during marshalling/unmarshalling.", ex);
- }
-
- private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version) throws IOException, ClassNotFoundException
- {
- //TODO: -TME - is switch required?
- switch (version)
- {
- case Version.VERSION_1:
- case Version.VERSION_2:
- {
- if (trace) { log.trace(this + " reading response from input stream"); }
- return unmarshaller.read(inputStream, null);
- }
- default:
- {
- throw new IOException("Can not read data for version " + version + ". Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
- }
- }
- }
-
- private void versionedWrite(OutputStream outputStream, Marshaller marshaller, Object invocation, int version) throws IOException
- {
- //TODO: -TME Should I worry about checking the version here? Only one way to do it at this point
- switch (version)
- {
- case Version.VERSION_1:
- case Version.VERSION_2:
- {
- if (trace) { log.trace(this + " writing invocation on output stream"); }
- marshaller.write(invocation, outputStream);
- if (trace) { log.trace(this + " done writing invocation on output stream"); }
-
- return;
- }
- default:
- {
- throw new IOException("Can not write data for version " + version + ". Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
- }
- }
- }
-
- //TODO: -TME Exact same method in ServerThread
- private int readVersion(InputStream inputStream) throws IOException
- {
- if (trace) { log.trace(this + " reading version from input stream"); }
- int version = inputStream.read();
- if (trace) { log.trace(this + " read version " + version + " from input stream"); }
- return version;
- }
-
- //TODO: -TME Exact same method in ServerThread
- private void writeVersion(OutputStream outputStream, int version) throws IOException
- {
- if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
- outputStream.write(version);
+ throw (ClassNotFoundException)ex;
}
- /**
- * Close all sockets in a specific pool.
- */
- public static void clearPool(ServerAddress sa)
- {
- try
- {
- LinkedList thepool = (LinkedList) connectionPools.get(sa);
- if (thepool == null)
- {
- return;
- }
- synchronized (thepool)
- {
- int size = thepool.size();
- for (int i = 0; i < size; i++)
- {
- SocketWrapper socketWrapper = (SocketWrapper) thepool.removeFirst();
- try
- {
- socketWrapper.close();
- socketWrapper = null;
- }
- catch (Exception ignored)
- {
- }
- }
- }
- }
- catch (Exception ex)
- {
- // ignored
- }
- }
-
- /**
- * Close all sockets in all pools
- */
- public static void clearPools()
- {
- synchronized (connectionPools)
- {
- Iterator it = connectionPools.keySet().iterator();
- while (it.hasNext())
- {
- ServerAddress sa = (ServerAddress) it.next();
-
- if (trace) { log.trace("clearing pool for " + sa); }
-
- clearPool(sa);
- }
- }
+ throw new MarshalException(
+ "Failed to communicate. Problem during marshalling/unmarshalling.", ex);
}
protected void initPool()
{
synchronized (connectionPools)
{
- pool = (LinkedList) connectionPools.get(address);
+ pool = (LinkedList)connectionPools.get(address);
if (pool == null)
{
pool = new LinkedList();
connectionPools.put(address, pool);
+ log.debug(this + " added new pool as " + address);
}
}
}
- /**
- * Sets the number of times an invocation will retry based on getting
- * SocketException.
- *
- * @param numberOfCallRetries
- */
- public void setNumberOfCallRetries(int numberOfCallRetries)
- {
- if (numberOfCallRetries < 1)
- {
- this.numberOfCallRetries = MAX_CALL_RETRIES;
- }
- else
- {
- this.numberOfCallRetries = numberOfCallRetries;
- }
- }
-
- public int getNumberOfCallRetries()
- {
- return numberOfCallRetries;
- }
-
- /**
- * Sets the number of retries to get a socket connection.
- *
- * @param numberOfRetries Must be a number greater than 0
- */
- public void setNumberOfRetries(int numberOfRetries)
- {
- if (numberOfRetries < 1)
- {
- this.numberOfRetries = MAX_RETRIES;
- }
- else
- {
- this.numberOfRetries = numberOfRetries;
- }
- }
-
- public int getNumberOfRetries()
- {
- return numberOfRetries;
- }
-
- /**
- * used for debugging (tracing) connections leaks
- */
- static int counter = 0;
-
- protected SocketWrapper getConnection() throws Exception
+ protected SocketWrapper getConnection(Marshaller marshaller,
+ UnMarshaller unmarshaller,
+ int tempTimeout)
+ throws Exception
{
SocketWrapper pooled = null;
- //
- // Need to retry a few times
- // on socket connection because, at least on Windoze,
- // if too many concurrent threads try to connect
- // at same time, you get ConnectionRefused
- //
- // Retrying seems to be the most performant.
- //
- // This problem always happens with RMI and seems to
- // have nothing to do with backlog or number of threads
- // waiting in accept() on the server.
- //
+ // Need to retry a few times on socket connection because, at least on Windoze, if too many
+ // concurrent threads try to connect at same time, you get ConnectionRefused. Retrying seems
+ // to be the most performant. This problem always happens with RMI and seems to have nothing
+ // to do with backlog or number of threads waiting in accept() on the server.
+
for (int i = 0; i < numberOfRetries; i++)
{
if (bailOut)
@@ -714,24 +721,21 @@
}
else if (usedPooled < maxPoolSize)
{
- // if no connection in pool and all pooled connections
- // not in use, then need create a new connection which
- // will be latered returned to the pool (thus filling out
- // the pool, since starts out empty).
+ // If no connection in pool and all pooled connections not in use, then need create
+ // a new connection which will be latered returned to the pool (thus filling out the
+ // pool, since starts out empty).
Socket socket = null;
long timestamp = System.currentTimeMillis();
try
{
- if (trace)
- {
- log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1));
- }
+ if (trace) { log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1)); }
socket = createSocket(address.address, address.port);
}
catch (Exception ex)
{
- log.debug(this + " got Exception " + ex + ", creation attempt took " + (System.currentTimeMillis() - timestamp) + " ms");
+ log.debug(this + " got Exception " + ex + ", creation attempt took " +
+ (System.currentTimeMillis() - timestamp) + " ms");
if (i + 1 < numberOfRetries)
{
@@ -742,19 +746,36 @@
}
socket.setTcpNoDelay(address.enableTcpNoDelay);
socket.setReuseAddress(reuseAddress);
- pooled = createClientSocket(socket, address.timeout, getLocator().getParameters());
+
+ Map metadata = getLocator().getParameters();
+ if (metadata == null)
+ {
+ metadata = new HashMap(2);
+ }
+ else
+ {
+ metadata = new HashMap(metadata);
+ }
+ metadata.put(SocketWrapper.MARSHALLER, marshaller);
+ metadata.put(SocketWrapper.UNMARSHALLER, unmarshaller);
+ metadata.put(SocketWrapper.TEMP_TIMEOUT, new Integer(tempTimeout));
+
+ pooled = createClientSocket(socket, address.timeout, metadata);
usedPooled++;
break;
}
}
- // waiting 1 second (numberOfRetries along with 1 second wait determines timeout on getting pooled connection)
+
+ // Waiting 1 second (numberOfRetries along with 1 second wait determines timeout on getting
+ // pooled connection)
Thread.sleep(1000);
}
if (pooled == null)
{
- throw new SocketException("Can not obtain client socket connection from pool. Have waited " +
- numberOfRetries + " seconds for available connection (" + usedPooled + " in use)");
+ throw new SocketException("Can not obtain client socket connection from pool. " +
+ "Have waited " + numberOfRetries + " seconds for available connection (" + usedPooled +
+ " in use)");
}
else
{
@@ -762,7 +783,8 @@
}
}
- protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata) throws Exception
+ protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata)
+ throws Exception
{
if (clientSocketConstructor == null)
{
@@ -771,17 +793,17 @@
clientSocketClass = ClassLoaderUtility.loadClass(clientSocketClassName, getClass());
}
- clientSocketConstructor = clientSocketClass.getConstructor(new Class[]{Socket.class});
-
+ Class[] args = new Class[]{Socket.class, Map.class, Integer.class};
+ clientSocketConstructor = clientSocketClass.getConstructor(args);
}
SocketWrapper clientSocketWrapper = null;
- clientSocketWrapper = (SocketWrapper) clientSocketConstructor.newInstance(new Object[]{socket});
+ clientSocketWrapper = (SocketWrapper)clientSocketConstructor.
+ newInstance(new Object[]{socket, metadata, new Integer(timeout)});
return clientSocketWrapper;
}
-
protected Socket createSocket(String address, int port) throws IOException
{
return new Socket(address, port);
@@ -792,11 +814,15 @@
SocketWrapper socketWrapper = null;
while (pool.size() > 0)
{
- socketWrapper = (SocketWrapper) pool.removeFirst();
+ socketWrapper = (SocketWrapper)pool.removeFirst();
try
{
if (socketWrapper != null)
{
+ if (socketWrapper instanceof OpenConnectionChecker)
+ {
+ ((OpenConnectionChecker) socketWrapper).checkOpenConnection();
+ }
if (shouldCheckConnection)
{
socketWrapper.checkConnection();
@@ -810,29 +836,81 @@
}
catch (Exception ex)
{
- if (trace)
- {
- log.trace("Couldn't reuse connection from pool");
- }
+ if (trace) { log.trace(this + " couldn't reuse connection from pool"); }
try
{
socketWrapper.close();
}
- catch (Exception ignored)
+ catch (Exception e)
{
+ log.debug("Failed to close socket wrapper", e);
}
}
}
return null;
}
+ // Private --------------------------------------------------------------------------------------
- /**
- * The name of of the server.
- */
- public String getServerHostName() throws Exception
+ private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version)
+ throws IOException, ClassNotFoundException
{
- return address.address;
+ //TODO: -TME - is switch required?
+ switch (version)
+ {
+ case Version.VERSION_1:
+ case Version.VERSION_2:
+ {
+ if (trace) { log.trace(this + " reading response from unmarshaller"); }
+ return unmarshaller.read(inputStream, null);
}
+ default:
+ {
+ throw new IOException("Can not read data for version " + version + ". " +
+ "Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
+ }
+ }
+ }
+
+ private void versionedWrite(OutputStream outputStream, Marshaller marshaller,
+ Object invocation, int version) throws IOException
+ {
+ //TODO: -TME Should I worry about checking the version here? Only one way to do it at this point
+ switch (version)
+ {
+ case Version.VERSION_1:
+ case Version.VERSION_2:
+ {
+ if (trace) { log.trace(this + " writing invocation to marshaller"); }
+ marshaller.write(invocation, outputStream);
+ if (trace) { log.trace(this + " done writing invocation to marshaller"); }
+
+ return;
+ }
+ default:
+ {
+ throw new IOException("Can not write data for version " + version + ". " +
+ "Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
+ }
+ }
+ }
+
+ //TODO: -TME Exact same method in ServerThread
+ private int readVersion(InputStream inputStream) throws IOException
+ {
+ if (trace) { log.trace(this + " reading version from input stream"); }
+ int version = inputStream.read();
+ if (trace) { log.trace(this + " read version " + version + " from input stream"); }
+ return version;
+ }
+
+ //TODO: -TME Exact same method in ServerThread
+ private void writeVersion(OutputStream outputStream, int version) throws IOException
+ {
+ if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
+ outputStream.write(version);
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list