Author: ron.sigal(a)jboss.com
Date: 2009-08-18 21:59:20 -0400 (Tue, 18 Aug 2009)
New Revision: 5365
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
Log:
JBREM-1120: Added writeTimeout facility; JBREM-1146: added generalized retriable
IOException facility.
Modified:
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
===================================================================
---
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java 2009-08-19
01:53:51 UTC (rev 5364)
+++
remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java 2009-08-19
01:59:20 UTC (rev 5365)
@@ -76,7 +76,10 @@
* or wrapped in a RuntimeException.
*/
public static final String WRAP_INTERRUPTED_EXCEPTION =
"wrapInterruptedException";
-
+
+ /** Key for setting socket write timeout */
+ public static final String WRITE_TIMEOUT = "writeTimeout";
+
/**
* Default value for enable TCP nodelay. Value is false.
*/
@@ -215,6 +218,14 @@
public Object usedPoolLock;
protected boolean wrapInterruptedException = false;
+
+ /**
+ * If true, an IOException with message "Connection reset by peer: socket write
error" will
+ * be treated like a SocketException.
+ */
+ protected boolean generalizeSocketException;
+
+ protected int writeTimeout = -1;
// Constructors
---------------------------------------------------------------------------------
@@ -280,6 +291,38 @@
reuseAddress = reuse;
}
+ public int getWriteTimeout()
+ {
+ return writeTimeout;
+ }
+
+ public void setWriteTimeout(int writeTimeout)
+ {
+ this.writeTimeout = writeTimeout;
+ }
+
+ /**
+ * Get the generalizeSocketException.
+ *
+ * @return the generalizeSocketException.
+ */
+
+ public synchronized boolean isGeneralizeSocketException()
+ {
+ return generalizeSocketException;
+ }
+
+ /**
+ * Set the generalizeSocketException.
+ *
+ * @param generalizeSocketException The generalizeSocketException to set.
+ */
+
+ public synchronized void setGeneralizeSocketException(boolean
generalizeSocketException)
+ {
+ this.generalizeSocketException = generalizeSocketException;
+ }
+
public synchronized void disconnect()
{
log.debug(this + " disconnecting ...");
@@ -443,6 +486,22 @@
shouldCheckConnection = true;
log.debug(this + " setting shouldCheckConnection to " +
shouldCheckConnection);
}
+
+ // look for writeTimeout param
+ val = params.get(WRITE_TIMEOUT);
+ if (val != null)
+ {
+ try
+ {
+ writeTimeout = Integer.valueOf((String)val).intValue();
+ log.debug(this + " setting writeTimeout to " + writeTimeout);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + WRITE_TIMEOUT + "
value of " +
+ val + " to an int value");
+ }
+ }
}
protected ServerAddress createServerAddress()
@@ -616,47 +675,27 @@
}
catch (SocketException sex)
{
- log.debug(this + " got SocketException " + sex);
-
- try
- {
- semaphore.release();
- if (trace) log.trace(this + " released semaphore: " +
semaphore.permits());
- socketWrapper.close();
- }
- catch (Exception ex)
- {
- if (trace) { log.trace(this + " couldn't successfully close its
socketWrapper", ex); }
- }
-
- /**
- * About to run out of retries and
- * pool may be full of timed out sockets,
- * so want to flush the pool and try with
- * fresh socket as a last effort.
- */
- if (retryCount == (numberOfCallRetries - 2))
- {
- flushConnectionPool();
- }
+ handleSocketException(sex, socketWrapper, semaphore, retryCount);
sockEx = sex;
continue;
}
- catch (Exception ex)
+ catch (IOException e)
{
- log.debug(this + " got exception " + ex);
-
- try
+ if (isGeneralizeSocketException() &&
e.getMessage().startsWith("Connection reset"))
{
- semaphore.release();
- if (trace) log.trace(this + " released semaphore: " +
semaphore.permits());
- socketWrapper.close();
+ handleSocketException(e, socketWrapper, semaphore, retryCount);
+ sockEx = new SocketException(e.getMessage());
+ continue;
}
- catch (Exception ignored)
+ else
{
+ return handleOtherException(e, semaphore, socketWrapper);
}
- return handleException(ex, socketWrapper);
}
+ catch (Exception ex)
+ {
+ return handleOtherException(ex, semaphore, socketWrapper);
+ }
// call worked, so no need to retry
break;
@@ -695,6 +734,51 @@
return response;
}
+
+ protected void handleSocketException(Exception sex, SocketWrapper socketWrapper,
Semaphore semaphore, int retryCount)
+ {
+ log.debug(this + " got SocketException " + sex);
+
+ try
+ {
+ semaphore.release();
+ if (trace) log.trace(this + " released semaphore: " +
semaphore.permits());
+ socketWrapper.close();
+ }
+ catch (Exception ex)
+ {
+ if (trace) { log.trace(this + " couldn't successfully close its
socketWrapper", ex); }
+ }
+
+ /**
+ * About to run out of retries and
+ * pool may be full of timed out sockets,
+ * so want to flush the pool and try with
+ * fresh socket as a last effort.
+ */
+ if (retryCount == (numberOfCallRetries - 2))
+ {
+ flushConnectionPool();
+ }
+ }
+
+ protected Object handleOtherException(Exception ex, Semaphore semaphore, SocketWrapper
socketWrapper)
+ throws ClassNotFoundException, MarshalException
+ {
+ log.debug(this + " got exception " + ex);
+
+ try
+ {
+ semaphore.release();
+ if (trace) log.trace(this + " released semaphore: " +
semaphore.permits());
+ socketWrapper.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ return handleException(ex, socketWrapper);
+ }
+
protected Object handleException(Exception ex, SocketWrapper socketWrapper)
throws ClassNotFoundException, MarshalException
{
@@ -809,7 +893,10 @@
}
metadata.put(SocketWrapper.MARSHALLER, marshaller);
metadata.put(SocketWrapper.UNMARSHALLER, unmarshaller);
-
+ if (writeTimeout > 0)
+ {
+ metadata.put(SocketWrapper.WRITE_TIMEOUT, new Integer(writeTimeout));
+ }
if (timeAllowed > 0)
{
timeRemaining = (int) (timeAllowed - (System.currentTimeMillis() - start));