[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ovidiu Feodorov
ovidiu.feodorov at jboss.com
Mon Jan 29 02:10:00 EST 2007
User: ovidiu
Date: 07/01/29 02:10:00
Modified: src/main/org/jboss/remoting/transport/socket Tag:
remoting_2_x MicroSocketClientInvoker.java
ServerSocketWrapper.java SocketClientInvoker.java
SocketWrapper.java
Log:
minor refactoring and logging improvments while trying to fix http://jira.jboss.org/jira/browse/JBREM-691
Revision Changes Path
No revision
No revision
1.16.2.15 +396 -373 JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroSocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
retrieving revision 1.16.2.14
retrieving revision 1.16.2.15
diff -u -b -r1.16.2.14 -r1.16.2.15
--- MicroSocketClientInvoker.java 29 Jan 2007 05:27:53 -0000 1.16.2.14
+++ MicroSocketClientInvoker.java 29 Jan 2007 07:10:00 -0000 1.16.2.15
@@ -28,40 +28,40 @@
import java.util.Properties;
/**
- * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which
- * must be a SocketServerInvoker.
+ * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
+ * a SocketServerInvoker.
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.16.2.14 $
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ *
+ * @version $Revision: 1.16.2.15 $
*/
public class MicroSocketClientInvoker extends RemoteClientInvoker
{
- private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
- private static final boolean trace = log.isTraceEnabled();
+ // Constants ------------------------------------------------------------------------------------
- protected InetAddress addr;
- protected int port;
+ private static final Logger log = Logger.getLogger(MicroSocketClientInvoker.class);
/**
- * can be either true or false and will indicate if client socket should have TCP_NODELAY turned on or off.
- * TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
+ * Can be either true or false and will indicate if client socket should have TCP_NODELAY turned
+ * on or off. TCP_NODELAY is for a specific purpose; to disable the Nagle buffering algorithm.
* It should only be set for applications that send frequent small bursts of information without
- * getting an immediate response; where timely delivery of data is required
- * (the canonical example is mouse movements). The default is false.
+ * getting an immediate response; where timely delivery of data is required (the canonical
+ * example is mouse movements). The default is false.
*/
public static final String TCP_NODELAY_FLAG = "enableTcpNoDelay";
/**
- * the client side maximum number of threads. The default is MAX_POOL_SIZE.
+ * The client side maximum number of threads. The default is MAX_POOL_SIZE.
*/
public static final String MAX_POOL_SIZE_FLAG = "clientMaxPoolSize";
/**
- * Specifies the fully qualified class name for the custom SocketWrapper implementation
- * to use on the client. Note, will need to make sure this is marked as a client
- * parameter (using the 'isParam' attribute). Making this change will not
- * affect the marshaller/unmarshaller that is used, which may also be a requirement.
+ * Specifies the fully qualified class name for the custom SocketWrapper implementation to use on
+ * the client. Note, will need to make sure this is marked as a client parameter (using the
+ * 'isParam' attribute). Making this change will not affect the marshaller/unmarshaller that is
+ * used, which may also be a requirement.
*/
public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
@@ -71,11 +71,34 @@
public static final boolean TCP_NODELAY_DEFAULT = false;
/**
- * Indicates if will check the socket connection when
- * getting from pool by sending byte over the connection
- * to validate is still good.
+ * Default maximum number of retries to get a valid socket from the* socket pool. This also
+ * translates to number of seconds will wait for connection to be returned to connection pool
+ * before erroring. Default is 30.
*/
- protected boolean shouldCheckConnection = false;
+ public static final int MAX_RETRIES = 30;
+
+ /**
+ * Default maximum number of times a invocation will be made when it gets a SocketException.
+ * Default is 3.
+ */
+ public static final int MAX_CALL_RETRIES = 3;
+
+ /**
+ * Default maximum number of socket connections allowed at any point in time. Default is 50.
+ */
+ public static final int MAX_POOL_SIZE = 50;
+
+
+ // Static ---------------------------------------------------------------------------------------
+
+ private static boolean trace = log.isTraceEnabled();
+
+ /**
+ * Used for debugging (tracing) connections leaks
+ */
+ static int counter = 0;
+
+ protected static final Map connectionPools = new HashMap();
// Performance measurements
public static long getSocketTime = 0;
@@ -85,60 +108,102 @@
public static long deserializeTime = 0;
/**
- * If the TcpNoDelay option should be used on the socket.
+ * Close all sockets in a specific pool.
*/
- protected boolean enableTcpNoDelay = TCP_NODELAY_DEFAULT;
-
- protected String clientSocketClassName = ClientSocketWrapper.class.getName();
- private Constructor clientSocketConstructor = null;
- protected Class clientSocketClass = null;
+ public static void clearPool(ServerAddress sa)
+ {
+ try
+ {
+ LinkedList thepool = (LinkedList)connectionPools.get(sa);
+ if (thepool == null)
+ {
+ return;
+ }
+ synchronized (thepool)
+ {
+ int size = thepool.size();
+ for (int i = 0; i < size; i++)
+ {
+ SocketWrapper socketWrapper = (SocketWrapper)thepool.removeFirst();
+ try
+ {
+ socketWrapper.close();
+ socketWrapper = null;
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ log.debug("Failure", ex);
+ }
+ }
/**
- * Default max number of retries to get a valid socket from the
- * socket pool. This also translates to number of seconds will wait
- * for connection to be returned to connection pool before erroring. Default is 30.
+ * Close all sockets in all pools.
*/
- public static final int MAX_RETRIES = 30;
- public long usedPooled = 0;
- public Object usedPoolLock = new Object();
+ public static void clearPools()
+ {
+ synchronized (connectionPools)
+ {
+ for(Iterator i = connectionPools.keySet().iterator(); i.hasNext();)
+ {
+ ServerAddress sa = (ServerAddress) i.next();
- protected int numberOfRetries = MAX_RETRIES;
+ if (trace) { log.trace("clearing pool for " + sa); }
+ clearPool(sa);
+ }
+ }
+ }
- /**
- * Default max number of times a invocation will be made
- * when it gets a SocketException. Default is 3.
- */
- public static final int MAX_CALL_RETRIES = 3;
+ // Attributes -----------------------------------------------------------------------------------
- protected int numberOfCallRetries = MAX_CALL_RETRIES;
+ private Constructor clientSocketConstructor;
+ private boolean reuseAddress;
+
+ protected InetAddress addr;
+ protected int port;
+
+ // flag being set on true by a disconnect request. If trying to create a connection goes on in a
+ // loop and a disconnect request arrives, this flag will be used to sent this information into
+ // the connect loop
+ private volatile boolean bailOut;
/**
- * Pool for this invoker. This is shared between all
- * instances of proxies attached to a specific invoker
+ * Indicates if will check the socket connection when getting from pool by sending byte over the
+ * connection to validate is still good.
*/
- protected LinkedList pool = null;
+ protected boolean shouldCheckConnection;
/**
- * connection information
+ * If the TcpNoDelay option should be used on the socket.
*/
- protected ServerAddress address;
+ protected boolean enableTcpNoDelay;
- protected static final Map connectionPools = new HashMap();
+ protected String clientSocketClassName;
+ protected Class clientSocketClass;
+ protected int numberOfRetries;
+ protected int numberOfCallRetries;
+ protected int maxPoolSize;
/**
- * Default max number of socket connections allowed at any
- * point in time. Default is 50.
+ * Pool for this invoker. This is shared between all instances of proxies attached to a specific
+ * invoker.
*/
- public static final int MAX_POOL_SIZE = 50;
+ protected LinkedList pool;
- protected int maxPoolSize = MAX_POOL_SIZE;
+ /**
+ * connection information
+ */
+ protected ServerAddress address;
- private boolean reuseAddress = true;
+ public long usedPooled;
+ public Object usedPoolLock;
- // flag being set on true by a disconnect request. If trying to create a connection goes on in a
- // loop and a disconnect request arrives, this flag will be used to sent this information into
- // the connect loop
- private volatile boolean bailOut;
+ // Constructors ---------------------------------------------------------------------------------
public MicroSocketClientInvoker(InvokerLocator locator)
{
@@ -148,22 +213,145 @@
public MicroSocketClientInvoker(InvokerLocator locator, Map configuration)
{
super(locator, configuration);
+
+ clientSocketConstructor = null;
+ reuseAddress = true;
+ shouldCheckConnection = false;
+ enableTcpNoDelay = TCP_NODELAY_DEFAULT;
+ clientSocketClassName = ClientSocketWrapper.class.getName();
+ clientSocketClass = null;
+ numberOfRetries = MAX_RETRIES;
+ numberOfCallRetries = MAX_CALL_RETRIES;
+ pool = null;
+ maxPoolSize = MAX_POOL_SIZE;
+ usedPooled = 0;
+ usedPoolLock = new Object();
+
try
{
setup();
}
catch (Exception ex)
{
- log.error("Error setting up socket client invoker.", ex);
+ log.error("Error setting up " + this, ex);
throw new RuntimeException(ex.getMessage());
}
+
+ log.debug(this + " constructed");
}
- protected void setup()
- throws Exception
+ // Public ---------------------------------------------------------------------------------------
+
+ /**
+ * Indicates if will check socket connection when returning from pool by sending byte to the
+ * server. Default value will be false.
+ */
+ public boolean checkingConnection()
+ {
+ return shouldCheckConnection;
+ }
+
+ /**
+ * Returns if newly created sockets will have SO_REUSEADDR enabled. Default is for this to be
+ * true.
+ */
+ public boolean getReuseAddress()
+ {
+ return reuseAddress;
+ }
+
+ /**
+ * Sets if newly created socket should have SO_REUSEADDR enable. Default is true.
+ */
+ public void setReuseAddress(boolean reuse)
{
- this.addr = InetAddress.getByName(locator.getHost());
- this.port = locator.getPort();
+ reuseAddress = reuse;
+ }
+
+ public synchronized void disconnect()
+ {
+ log.debug(this + " disconnecting ...");
+ bailOut = true;
+ super.disconnect();
+ }
+
+ public void flushConnectionPool()
+ {
+ synchronized (pool)
+ {
+ while (pool != null && pool.size() > 0)
+ {
+ SocketWrapper socketWrapper = (SocketWrapper)pool.removeFirst();
+ try
+ {
+ socketWrapper.close();
+ }
+ catch (IOException e)
+ {
+ log.debug("Failed to close socket wrapper", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Sets the number of times an invocation will retry based on getting SocketException.
+ */
+ public void setNumberOfCallRetries(int numberOfCallRetries)
+ {
+ if (numberOfCallRetries < 1)
+ {
+ this.numberOfCallRetries = MAX_CALL_RETRIES;
+ }
+ else
+ {
+ this.numberOfCallRetries = numberOfCallRetries;
+ }
+ }
+
+ public int getNumberOfCallRetries()
+ {
+ return numberOfCallRetries;
+ }
+
+ /**
+ * Sets the number of retries to get a socket connection.
+ *
+ * @param numberOfRetries Must be a number greater than 0.
+ */
+ public void setNumberOfRetries(int numberOfRetries)
+ {
+ if (numberOfRetries < 1)
+ {
+ this.numberOfRetries = MAX_RETRIES;
+ }
+ else
+ {
+ this.numberOfRetries = numberOfRetries;
+ }
+ }
+
+ public int getNumberOfRetries()
+ {
+ return numberOfRetries;
+ }
+
+ /**
+ * The name of of the server.
+ */
+ public String getServerHostName() throws Exception
+ {
+ return address.address;
+ }
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
+ protected void setup() throws Exception
+ {
+ addr = InetAddress.getByName(locator.getHost());
+ port = locator.getPort();
Properties props = new Properties();
props.putAll(configuration);
@@ -171,33 +359,34 @@
configureParameters();
- address = createServerAddress();
- }
-
- protected ServerAddress createServerAddress()
- {
- return new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1);
+ address = new ServerAddress(addr.getHostAddress(), port, enableTcpNoDelay, -1);
}
protected void configureParameters()
{
Map params = configuration;
- if (params != null)
+
+ if (params == null)
{
+ return;
+ }
+
// look for enableTcpNoDelay param
Object val = params.get(TCP_NODELAY_FLAG);
if (val != null)
{
try
{
- enableTcpNoDelay = Boolean.valueOf((String) val).booleanValue();
- log.debug(this + " setting SocketClientInvoker::enableTcpNoDelay to " + enableTcpNoDelay);
+ enableTcpNoDelay = Boolean.valueOf((String)val).booleanValue();
+ log.debug(this + " setting enableTcpNoDelay to " + enableTcpNoDelay);
}
catch (Exception e)
{
- log.warn("Could not convert " + TCP_NODELAY_FLAG + " value of " + val + " to a boolean value.");
+ log.warn(this + " could not convert " + TCP_NODELAY_FLAG + " value of " +
+ val + " to a boolean value.");
}
}
+
// look for maxPoolSize param
val = params.get(MAX_POOL_SIZE_FLAG);
if (val != null)
@@ -205,22 +394,24 @@
try
{
maxPoolSize = Integer.valueOf((String)val).intValue();
- log.debug(this + " setting SocketClientInvoker::maxPoolSize to: " + maxPoolSize);
+ log.debug(this + " setting maxPoolSize to " + maxPoolSize);
}
catch (Exception e)
{
- log.warn("Could not convert " + MAX_POOL_SIZE_FLAG + " value of " + val + " to a int value");
+ log.warn(this + " could not convert " + MAX_POOL_SIZE_FLAG + " value of " +
+ val + " to a int value");
}
}
+
// look for client socket class name
val = params.get(CLIENT_SOCKET_CLASS_FLAG);
if (val != null)
{
- String value = (String) val;
+ String value = (String)val;
if (value.length() > 0)
{
clientSocketClassName = value;
- log.debug(this + " setting ClientSocket class name to " + clientSocketClassName);
+ log.debug(this + " setting client socket wrapper class name to " + clientSocketClassName);
}
}
@@ -231,54 +422,18 @@
if (value.length() > 0)
{
shouldCheckConnection = Boolean.valueOf(value).booleanValue();
- }
+ log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
}
}
}
- /**
- * Indicates if will check socket connection when returning from
- * pool by sending byte to the server. Default value will be false.
- */
- public boolean checkingConnection()
- {
- return shouldCheckConnection;
- }
-
- /**
- * Returns if newly created sockets will have SO_REUSEADDR enabled.
- * Default is for this to be true.
- */
- public boolean getReuseAddress()
- {
- return reuseAddress;
- }
-
- /**
- * Sets if newly created socket should have SO_REUSEADDR enable.
- * Default is true.
- * @param reuse
- */
- public void setReuseAddress(boolean reuse)
- {
- reuseAddress = reuse;
- }
-
- public synchronized void disconnect()
- {
- if (trace) { log.trace(this + " disconnecting ..."); }
- bailOut = true;
- super.disconnect();
- }
-
protected void finalize() throws Throwable
{
disconnect();
super.finalize();
}
- protected synchronized void handleConnect()
- throws ConnectionFailedException
+ protected synchronized void handleConnect() throws ConnectionFailedException
{
initPool();
}
@@ -289,24 +444,21 @@
}
/**
- * Each implementation of the remote client invoker should have
- * a default data type that is uses in the case it is not specified
- * in the invoker locator uri.
+ * Each implementation of the remote client invoker should have a default data type that is used
+ * in the case it is not specified in the invoker locator URI.
*/
protected String getDefaultDataType()
{
return SerializableMarshaller.DATATYPE;
}
- protected Object transport(String sessionId, Object invocation, Map metadata,
+ protected Object transport(String sessionID, Object invocation, Map metadata,
Marshaller marshaller, UnMarshaller unmarshaller)
throws IOException, ConnectionFailedException, ClassNotFoundException
{
-
- Object response = null;
long start = System.currentTimeMillis();
SocketWrapper socketWrapper = null;
-
+ Object response = null;
boolean oneway = false;
int tempTimeout = -1;
int savedTimeout = -1;
@@ -317,7 +469,6 @@
Object val = metadata.get(org.jboss.remoting.Client.ONEWAY_FLAG);
if(val != null && val instanceof String && Boolean.valueOf((String)val).booleanValue())
{
- if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
oneway = true;
}
@@ -333,7 +484,7 @@
}
catch (Exception e)
{
- log.warn("Could not convert " + ServerInvoker.TIMEOUT + " value of " +
+ log.warn(this + " could not convert " + ServerInvoker.TIMEOUT + " value of " +
tempTimeoutString + " to an integer value.");
}
}
@@ -488,227 +639,46 @@
return response;
}
- public void flushConnectionPool()
- {
- synchronized (pool)
- {
- while (pool != null && pool.size() > 0)
- {
- SocketWrapper socketWrapper = (SocketWrapper) pool.removeFirst();
- try
- {
- socketWrapper.close();
- }
- catch (IOException e)
- {
-
- }
- }
- }
- }
-
protected Object handleException(Exception ex, SocketWrapper socketWrapper)
throws ClassNotFoundException, MarshalException
{
log.error(this + " got marshalling exception, exiting ...", ex);
+
if (ex instanceof ClassNotFoundException)
{
//TODO: -TME Add better exception handling for class not found exception
log.error("Error loading classes from remote call result.", ex);
- throw (ClassNotFoundException) ex;
- }
-
- throw new MarshalException("Failed to communicate. Problem during marshalling/unmarshalling.", ex);
- }
-
- private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version) throws IOException, ClassNotFoundException
- {
- //TODO: -TME - is switch required?
- switch (version)
- {
- case Version.VERSION_1:
- case Version.VERSION_2:
- {
- if (trace) { log.trace(this + " reading response from unmarshaller"); }
- return unmarshaller.read(inputStream, null);
- }
- default:
- {
- throw new IOException("Can not read data for version " + version + ". Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
- }
- }
- }
-
- private void versionedWrite(OutputStream outputStream, Marshaller marshaller, Object invocation, int version) throws IOException
- {
- //TODO: -TME Should I worry about checking the version here? Only one way to do it at this point
- switch (version)
- {
- case Version.VERSION_1:
- case Version.VERSION_2:
- {
- if (trace) { log.trace(this + " writing invocation to marshaller"); }
- marshaller.write(invocation, outputStream);
- if (trace) { log.trace(this + " done writing invocation to marshaller"); }
-
- return;
- }
- default:
- {
- throw new IOException("Can not write data for version " + version + ". Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
- }
- }
+ throw (ClassNotFoundException)ex;
}
- //TODO: -TME Exact same method in ServerThread
- private int readVersion(InputStream inputStream) throws IOException
- {
- if (trace) { log.trace(this + " reading version from input stream"); }
- int version = inputStream.read();
- if (trace) { log.trace(this + " read version " + version + " from input stream"); }
- return version;
- }
-
- //TODO: -TME Exact same method in ServerThread
- private void writeVersion(OutputStream outputStream, int version) throws IOException
- {
- if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
- outputStream.write(version);
- }
-
- /**
- * Close all sockets in a specific pool.
- */
- public static void clearPool(ServerAddress sa)
- {
- try
- {
- LinkedList thepool = (LinkedList) connectionPools.get(sa);
- if (thepool == null)
- {
- return;
- }
- synchronized (thepool)
- {
- int size = thepool.size();
- for (int i = 0; i < size; i++)
- {
- SocketWrapper socketWrapper = (SocketWrapper) thepool.removeFirst();
- try
- {
- socketWrapper.close();
- socketWrapper = null;
- }
- catch (Exception ignored)
- {
- }
- }
- }
- }
- catch (Exception ex)
- {
- // ignored
- }
- }
-
- /**
- * Close all sockets in all pools
- */
- public static void clearPools()
- {
- synchronized (connectionPools)
- {
- Iterator it = connectionPools.keySet().iterator();
- while (it.hasNext())
- {
- ServerAddress sa = (ServerAddress) it.next();
-
- if (trace) { log.trace("clearing pool for " + sa); }
-
- clearPool(sa);
- }
- }
+ throw new MarshalException(
+ "Failed to communicate. Problem during marshalling/unmarshalling.", ex);
}
protected void initPool()
{
synchronized (connectionPools)
{
- pool = (LinkedList) connectionPools.get(address);
+ pool = (LinkedList)connectionPools.get(address);
if (pool == null)
{
pool = new LinkedList();
connectionPools.put(address, pool);
+ log.debug(this + " added new pool as " + address);
}
}
}
- /**
- * Sets the number of times an invocation will retry based on getting
- * SocketException.
- *
- * @param numberOfCallRetries
- */
- public void setNumberOfCallRetries(int numberOfCallRetries)
- {
- if (numberOfCallRetries < 1)
- {
- this.numberOfCallRetries = MAX_CALL_RETRIES;
- }
- else
- {
- this.numberOfCallRetries = numberOfCallRetries;
- }
- }
-
- public int getNumberOfCallRetries()
- {
- return numberOfCallRetries;
- }
-
- /**
- * Sets the number of retries to get a socket connection.
- *
- * @param numberOfRetries Must be a number greater than 0
- */
- public void setNumberOfRetries(int numberOfRetries)
- {
- if (numberOfRetries < 1)
- {
- this.numberOfRetries = MAX_RETRIES;
- }
- else
- {
- this.numberOfRetries = numberOfRetries;
- }
- }
-
- public int getNumberOfRetries()
- {
- return numberOfRetries;
- }
-
- /**
- * used for debugging (tracing) connections leaks
- */
- static int counter = 0;
-
- protected SocketWrapper getConnection(Marshaller marshaller, UnMarshaller unmarshaller) throws Exception
+ protected SocketWrapper getConnection(Marshaller marshaller, UnMarshaller unmarshaller)
+ throws Exception
{
SocketWrapper pooled = null;
- //
- // Need to retry a few times
- // on socket connection because, at least on Windoze,
- // if too many concurrent threads try to connect
- // at same time, you get ConnectionRefused
- //
- // Retrying seems to be the most performant.
- //
- // This problem always happens with RMI and seems to
- // have nothing to do with backlog or number of threads
- // waiting in accept() on the server.
- //
+ // Need to retry a few times on socket connection because, at least on Windoze, if too many
+ // concurrent threads try to connect at same time, you get ConnectionRefused. Retrying seems
+ // to be the most performant. This problem always happens with RMI and seems to have nothing
+ // to do with backlog or number of threads waiting in accept() on the server.
+
for (int i = 0; i < numberOfRetries; i++)
{
if (bailOut)
@@ -736,24 +706,21 @@
}
else if (usedPooled < maxPoolSize)
{
- // if no connection in pool and all pooled connections
- // not in use, then need create a new connection which
- // will be latered returned to the pool (thus filling out
- // the pool, since starts out empty).
+ // If no connection in pool and all pooled connections not in use, then need create
+ // a new connection which will be latered returned to the pool (thus filling out the
+ // pool, since starts out empty).
Socket socket = null;
long timestamp = System.currentTimeMillis();
try
{
- if (trace)
- {
- log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1));
- }
+ if (trace) { log.trace(this + " creating socket " + (counter++) + ", attempt " + (i + 1)); }
socket = createSocket(address.address, address.port);
}
catch (Exception ex)
{
- log.debug(this + " got Exception " + ex + ", creation attempt took " + (System.currentTimeMillis() - timestamp) + " ms");
+ log.debug(this + " got Exception " + ex + ", creation attempt took " +
+ (System.currentTimeMillis() - timestamp) + " ms");
if (i + 1 < numberOfRetries)
{
@@ -782,14 +749,17 @@
break;
}
}
- // waiting 1 second (numberOfRetries along with 1 second wait determines timeout on getting pooled connection)
+
+ // Waiting 1 second (numberOfRetries along with 1 second wait determines timeout on getting
+ // pooled connection)
Thread.sleep(1000);
}
if (pooled == null)
{
- throw new SocketException("Can not obtain client socket connection from pool. Have waited " +
- numberOfRetries + " seconds for available connection (" + usedPooled + " in use)");
+ throw new SocketException("Can not obtain client socket connection from pool. " +
+ "Have waited " + numberOfRetries + " seconds for available connection (" + usedPooled +
+ " in use)");
}
else
{
@@ -797,7 +767,8 @@
}
}
- protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata) throws Exception
+ protected SocketWrapper createClientSocket(Socket socket, int timeout, Map metadata)
+ throws Exception
{
if (clientSocketConstructor == null)
{
@@ -812,12 +783,12 @@
}
SocketWrapper clientSocketWrapper = null;
- clientSocketWrapper = (SocketWrapper) clientSocketConstructor.newInstance(new Object[]{socket, metadata, new Integer(timeout)});
+ clientSocketWrapper = (SocketWrapper)clientSocketConstructor.
+ newInstance(new Object[]{socket, metadata, new Integer(timeout)});
return clientSocketWrapper;
}
-
protected Socket createSocket(String address, int port) throws IOException
{
return new Socket(address, port);
@@ -828,7 +799,7 @@
SocketWrapper socketWrapper = null;
while (pool.size() > 0)
{
- socketWrapper = (SocketWrapper) pool.removeFirst();
+ socketWrapper = (SocketWrapper)pool.removeFirst();
try
{
if (socketWrapper != null)
@@ -846,29 +817,81 @@
}
catch (Exception ex)
{
- if (trace)
- {
- log.trace("Couldn't reuse connection from pool");
- }
+ if (trace) { log.trace(this + " couldn't reuse connection from pool"); }
try
{
socketWrapper.close();
}
- catch (Exception ignored)
+ catch (Exception e)
{
+ log.debug("Failed to close socket wrapper", e);
}
}
}
return null;
}
+ // Private --------------------------------------------------------------------------------------
- /**
- * The name of of the server.
- */
- public String getServerHostName() throws Exception
+ private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int version)
+ throws IOException, ClassNotFoundException
{
- return address.address;
+ //TODO: -TME - is switch required?
+ switch (version)
+ {
+ case Version.VERSION_1:
+ case Version.VERSION_2:
+ {
+ if (trace) { log.trace(this + " reading response from unmarshaller"); }
+ return unmarshaller.read(inputStream, null);
+ }
+ default:
+ {
+ throw new IOException("Can not read data for version " + version + ". " +
+ "Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
}
+ }
+ }
+
+ private void versionedWrite(OutputStream outputStream, Marshaller marshaller,
+ Object invocation, int version) throws IOException
+ {
+ //TODO: -TME Should I worry about checking the version here? Only one way to do it at this point
+ switch (version)
+ {
+ case Version.VERSION_1:
+ case Version.VERSION_2:
+ {
+ if (trace) { log.trace(this + " writing invocation to marshaller"); }
+ marshaller.write(invocation, outputStream);
+ if (trace) { log.trace(this + " done writing invocation to marshaller"); }
+
+ return;
+ }
+ default:
+ {
+ throw new IOException("Can not write data for version " + version + ". " +
+ "Supported versions: " + Version.VERSION_1 + ", " + Version.VERSION_2);
+ }
+ }
+ }
+
+ //TODO: -TME Exact same method in ServerThread
+ private int readVersion(InputStream inputStream) throws IOException
+ {
+ if (trace) { log.trace(this + " reading version from input stream"); }
+ int version = inputStream.read();
+ if (trace) { log.trace(this + " read version " + version + " from input stream"); }
+ return version;
+ }
+
+ //TODO: -TME Exact same method in ServerThread
+ private void writeVersion(OutputStream outputStream, int version) throws IOException
+ {
+ if (trace) { log.trace(this + " writing version " + version + " on output stream"); }
+ outputStream.write(version);
+ }
+
+ // Inner classes --------------------------------------------------------------------------------
}
1.14.6.5 +0 -2 JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerSocketWrapper.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: ServerSocketWrapper.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/ServerSocketWrapper.java,v
retrieving revision 1.14.6.4
retrieving revision 1.14.6.5
diff -u -b -r1.14.6.4 -r1.14.6.5
--- ServerSocketWrapper.java 29 Jan 2007 05:21:31 -0000 1.14.6.4
+++ ServerSocketWrapper.java 29 Jan 2007 07:10:00 -0000 1.14.6.5
@@ -36,8 +36,6 @@
{
final static private Logger log = Logger.getLogger(ServerSocketWrapper.class);
- private static boolean trace = log.isTraceEnabled();
-
public ServerSocketWrapper(Socket socket) throws Exception
{
super(socket);
1.38.2.4 +4 -3 JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketClientInvoker.java,v
retrieving revision 1.38.2.3
retrieving revision 1.38.2.4
diff -u -b -r1.38.2.3 -r1.38.2.4
--- SocketClientInvoker.java 23 Jan 2007 09:39:03 -0000 1.38.2.3
+++ SocketClientInvoker.java 29 Jan 2007 07:10:00 -0000 1.38.2.4
@@ -40,7 +40,7 @@
*
* @author <a href="mailto:jhaynie at vocalocity.net">Jeff Haynie</a>
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
- * @version $Revision: 1.38.2.3 $
+ * @version $Revision: 1.38.2.4 $
*/
public class SocketClientInvoker extends MicroSocketClientInvoker
{
@@ -91,11 +91,12 @@
try
{
timeout = Integer.valueOf((String) val).intValue();;
- log.debug("Setting SocketClientInvoker::timeout to: " + timeout);
+ log.debug(this + " setting timeout to " + timeout);
}
catch (Exception e)
{
- log.warn("Could not convert " + SO_TIMEOUT_FLAG + " value of " + val + " to a int value.");
+ log.warn(this + " could not convert " + SO_TIMEOUT_FLAG + " value of " +
+ val + " to a int value.");
}
}
}
1.8.10.3 +30 -12 JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketWrapper.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: SocketWrapper.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/SocketWrapper.java,v
retrieving revision 1.8.10.2
retrieving revision 1.8.10.3
diff -u -b -r1.8.10.2 -r1.8.10.3
--- SocketWrapper.java 29 Jan 2007 05:28:29 -0000 1.8.10.2
+++ SocketWrapper.java 29 Jan 2007 07:10:00 -0000 1.8.10.3
@@ -35,28 +35,34 @@
*/
public abstract class SocketWrapper
{
+ // Constants ------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SocketWrapper.class);
+
public static final String MARSHALLER = "marshaller";
public static final String UNMARSHALLER = "unmarshaller";
- private static final Logger log = Logger.getLogger(SocketWrapper.class);
+ // Static ---------------------------------------------------------------------------------------
private static boolean trace = log.isTraceEnabled();
+ // Attributes -----------------------------------------------------------------------------------
+
private Socket socket;
private int timeout;
+ // Constructors ---------------------------------------------------------------------------------
+
protected SocketWrapper(Socket socket)
{
if (trace) { log.trace("creating SocketWrapper for " + socket); }
-
this.socket = socket;
}
protected SocketWrapper(Socket socket, Integer timeoutInt) throws SocketException
{
- if (trace) { log.trace("creating SocketWrapper for " + socket + ", using timeout " + timeoutInt); }
+ this(socket);
- this.socket = socket;
if(timeoutInt != null)
{
this.timeout = timeoutInt.intValue();
@@ -64,14 +70,11 @@
}
}
- public abstract OutputStream getOutputStream() throws IOException;
-
- public abstract InputStream getInputStream() throws IOException;
-
- public abstract void checkConnection() throws IOException;
+ // Public ---------------------------------------------------------------------------------------
public void setTimeout(int timeout) throws SocketException
{
+ if (trace) { log.trace(this + " setting timeout to " + timeout); }
this.timeout = timeout;
if(socket != null)
{
@@ -84,15 +87,25 @@
return timeout;
}
-
public void close() throws IOException
{
if(socket != null)
{
+ log.debug(this + " closing");
socket.close();
}
}
+ public abstract OutputStream getOutputStream() throws IOException;
+
+ public abstract InputStream getInputStream() throws IOException;
+
+ public abstract void checkConnection() throws IOException;
+
+ // Package protected ----------------------------------------------------------------------------
+
+ // Protected ------------------------------------------------------------------------------------
+
protected Socket getSocket()
{
return socket;
@@ -106,10 +119,15 @@
{
socket.close();
}
- catch(Exception ignored)
+ catch(Exception e)
{
+ log.debug(this + " failed to close socket", e);
}
}
}
+ // Private --------------------------------------------------------------------------------------
+
+ // Inner classes --------------------------------------------------------------------------------
+
}
\ No newline at end of file
More information about the jboss-cvs-commits
mailing list