[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...
Ron Sigal
ron_sigal at yahoo.com
Fri Jul 6 02:04:43 EDT 2007
User: rsigal
Date: 07/07/06 02:04:43
Modified: src/main/org/jboss/remoting/transport/socket Tag:
remoting_2_x MicroSocketClientInvoker.java
Log:
JBREM-706: Check usability of socket used form client side oneway invocation on same thread as invocation.
Revision Changes Path
No revision
No revision
1.16.2.32 +43 -223 JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
(In the diff below, changes in quantity of whitespace are not shown.)
Index: MicroSocketClientInvoker.java
===================================================================
RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
retrieving revision 1.16.2.31
retrieving revision 1.16.2.32
diff -u -b -r1.16.2.31 -r1.16.2.32
--- MicroSocketClientInvoker.java 4 Jul 2007 03:46:39 -0000 1.16.2.31
+++ MicroSocketClientInvoker.java 6 Jul 2007 06:04:43 -0000 1.16.2.32
@@ -9,7 +9,6 @@
import org.jboss.remoting.ServerInvoker;
import org.jboss.remoting.Version;
import org.jboss.remoting.serialization.ClassLoaderUtility;
-import org.jboss.remoting.invocation.InternalInvocation;
import org.jboss.remoting.invocation.OnewayInvocation;
import org.jboss.remoting.marshal.Marshaller;
import org.jboss.remoting.marshal.UnMarshaller;
@@ -32,8 +31,6 @@
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
-import java.util.Timer;
-import java.util.TimerTask;
/**
* SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
@@ -43,7 +40,7 @@
* @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
*
- * @version $Revision: 1.16.2.31 $
+ * @version $Revision: 1.16.2.32 $
*/
public class MicroSocketClientInvoker extends RemoteClientInvoker
{
@@ -74,9 +71,6 @@
public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
/** Key for setting timeout used by OnewayConnectionTask */
- public static final String ONEWAY_CONNECTION_DELAY = "onewayConnectionDelay";
-
- /** Key for setting timeout used by OnewayConnectionTask */
public static final String ONEWAY_CONNECTION_TIMEOUT = "onewayConnectionTimeout";
/**
@@ -102,11 +96,8 @@
*/
public static final int MAX_POOL_SIZE = 50;
- /** Default delay value used by OnewayConnectionTask. Value is 5 seconds. */
- public static final int ONEWAY_CONNECTION_DELAY_DEFAULT = 5000;
-
- /** Default timeout value used by OnewayConnectionTask. Value is 1 seconds. */
- public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 1000;
+ /** Default timeout value used by OnewayConnectionTask. Value is 2 seconds. */
+ public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 2000;
// Static ---------------------------------------------------------------------------------------
@@ -119,8 +110,6 @@
protected static final Map connectionPools = new HashMap();
- protected Timer onewayConnectionTimer;
-
// Performance measurements
public static long getSocketTime = 0;
public static long readTime = 0;
@@ -210,7 +199,6 @@
protected int numberOfRetries;
protected int numberOfCallRetries;
protected int maxPoolSize;
- protected int onewayConnectionDelay;
protected int onewayConnectionTimeout;
/**
@@ -248,7 +236,6 @@
numberOfCallRetries = MAX_CALL_RETRIES;
pool = null;
maxPoolSize = MAX_POOL_SIZE;
- onewayConnectionDelay = ONEWAY_CONNECTION_DELAY_DEFAULT;
onewayConnectionTimeout = ONEWAY_CONNECTION_TIMEOUT_DEFAULT;
usedPooled = 0;
usedPoolLock = new Object();
@@ -454,22 +441,6 @@
log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
}
- // look for onewayConnectionDelay param
- val = params.get(ONEWAY_CONNECTION_DELAY);
- if (val != null)
- {
- try
- {
- onewayConnectionDelay = Integer.valueOf((String)val).intValue();
- log.debug(this + " setting onewayConnectionDelay to " + onewayConnectionDelay);
- }
- catch (Exception e)
- {
- log.warn(this + " could not convert " + ONEWAY_CONNECTION_DELAY + " value of " +
- val + " to an int value");
- }
- }
-
// look for onewayConnectionTimeout param
val = params.get(ONEWAY_CONNECTION_TIMEOUT);
if (val != null)
@@ -558,6 +529,14 @@
}
}
+ boolean serverSideOneway = false;
+ if (oneway && invocation instanceof InvocationRequest)
+ {
+ InvocationRequest ir = (InvocationRequest) invocation;
+ if (ir.getParameter() instanceof OnewayInvocation)
+ serverSideOneway = true;
+ }
+
int retryCount = 0;
SocketException sockEx = null;
@@ -613,12 +592,19 @@
writeTime += end;
start = System.currentTimeMillis();
- if (oneway)
+ if (serverSideOneway)
{
if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
}
else
{
+ int onewaySavedTimeout = -1;
+ if (oneway)
+ {
+ onewaySavedTimeout = socketWrapper.getTimeout();
+ socketWrapper.setTimeout(onewayConnectionTimeout);
+ }
+
InputStream inputStream = socketWrapper.getInputStream();
if (performVersioning)
{
@@ -635,6 +621,13 @@
}
response = versionedRead(inputStream, unmarshaller, version);
+
+ // Note that if an exception is thrown, the socket is thrown away,
+ // so there's no need to reset the timeout value.
+ if (oneway)
+ {
+ socketWrapper.setTimeout(onewaySavedTimeout);
+ }
}
end = System.currentTimeMillis() - start;
@@ -694,6 +687,10 @@
catch (Exception ignored)
{
}
+
+ if (oneway)
+ return null;
+ else
return handleException(ex, socketWrapper);
}
@@ -707,33 +704,6 @@
handleException(sockEx, socketWrapper);
}
- boolean clientSideOneway = oneway;
- if (invocation instanceof InvocationRequest)
- {
- InvocationRequest ir = (InvocationRequest) invocation;
- if (ir.getParameter() instanceof OnewayInvocation)
- clientSideOneway = false;
- }
-
- if (clientSideOneway)
- {
- // Hand over to oneway connection task to test if connection may be reused.
- synchronized (OnewayConnectionTask.class)
- {
- if (onewayConnectionTimer == null)
- {
- onewayConnectionTimer = new Timer(true);
- log.debug(this + " created oneway connection timer: " + onewayConnectionTimer);
- }
- }
-
- TimerTask onewayConnectionTask
- = new OnewayConnectionTask(this, socketWrapper, marshaller, unmarshaller);
- onewayConnectionTimer.schedule(onewayConnectionTask, onewayConnectionDelay);
- if (trace) log.trace(this + " scheduled OnewayConnectionTask for: " + socketWrapper);
- }
- else
- {
// Put socket back in pool for reuse
synchronized (pool)
{
@@ -758,7 +728,6 @@
}
}
}
- }
if (trace && !oneway) { log.trace(this + " received response " + response); }
return response;
@@ -1077,153 +1046,4 @@
}
// Inner classes --------------------------------------------------------------------------------
-
- /**
- * OnewayConnectionTask takes sockets used for client side oneway invocations and,
- * once the oneway invocation has concluded, returns the socket to the socket pool.
- */
- protected static class OnewayConnectionTask extends TimerTask
- {
- private static InvocationRequest invocation;
-
- static
- {
- Object[] params = new Object[1];
- params[0] = "onewayEcho";
- String echoMethod = InternalInvocation.ECHO;
- InternalInvocation ii = new InternalInvocation(echoMethod, params);
- invocation = new InvocationRequest(null, null, ii, null, null, null);
- }
-
- private MicroSocketClientInvoker invoker;
- private SocketWrapper wrapper;
- private Marshaller marshaller;
- private UnMarshaller unmarshaller;
- private int onewayConnectionTimeout;
-
-
- public OnewayConnectionTask(MicroSocketClientInvoker invoker,
- SocketWrapper wrapper,
- Marshaller marshaller,
- UnMarshaller unmarshaller)
- {
- this.invoker = invoker;
- this.wrapper = wrapper;
- this.marshaller = marshaller;
- this.unmarshaller = unmarshaller;
- this.onewayConnectionTimeout = invoker.onewayConnectionTimeout;
- }
-
- public void run()
- {
- try
- {
- int originalTimeout = wrapper.getTimeout();
- wrapper.setTimeout(onewayConnectionTimeout);
- OutputStream outputStream = wrapper.getOutputStream();
- InputStream inputStream = wrapper.getInputStream();
- int version = Version.getDefaultVersion();
- boolean performVersioning = Version.performVersioning();
-
- if (performVersioning)
- {
- invoker.writeVersion(outputStream, version);
- }
-
- invoker.versionedWrite(outputStream, marshaller, invocation, version);
-
- if (performVersioning)
- {
- int readVersion = invoker.readVersion(inputStream);
-
- if (readVersion == -1)
- {
- if (trace) log.trace(invoker + "end of file: closing connection");
- destroyConnection(wrapper, true);
- }
- if (readVersion == SocketWrapper.CLOSING)
- {
- log.info(invoker + " Received version 254: treating as end of file");
- destroyConnection(wrapper, true);
- }
- }
-
- invoker.versionedRead(inputStream, unmarshaller, version);
- returnConnection(wrapper, originalTimeout);
- }
- catch (Exception e)
- {
- if (trace) log.trace(invoker + " oneway connection task error: ", e);
- destroyConnection(wrapper, true);
- }
- }
-
- private void returnConnection(SocketWrapper wrapper, int originalTimeout)
- {
- if (trace) log.trace(invoker + " returning connection to pool: " + wrapper);
-
- // Put socket back in pool for reuse
- try
- {
- synchronized (invoker.pool)
- {
- if (invoker.pool.size() < invoker.maxPoolSize)
- {
- wrapper.setTimeout(originalTimeout);
- invoker.pool.add(wrapper);
- synchronized(invoker.usedPoolLock)
- {
- invoker.usedPooled--;
- }
- if (trace) { log.trace(invoker + " returned " + wrapper + " to pool"); }
- }
- else
- {
- if (trace) { log.trace(invoker + "'pool is full, will close connection: " + wrapper); }
- destroyConnection(wrapper, false);
- }
- }
- }
- catch (Exception e)
- {
- if (trace)
- {
- log.trace(invoker + " error: " + e.getMessage(), e);
- log.trace(invoker + " closing connection: " + wrapper);
- }
- try
- {
- destroyConnection(wrapper, true);
- synchronized(invoker.usedPoolLock)
- {
- invoker.usedPooled--;
- }
- }
- catch (Exception ignored)
- {
- }
- }
- }
-
- private void destroyConnection(SocketWrapper wrapper, boolean decrement)
- {
- if (trace) log.trace(invoker + " destroying connection: " + wrapper);
-
- try
- {
- wrapper.close();
- }
- catch (Exception e)
- {
- }
-
- if (decrement)
- {
- synchronized(invoker.usedPoolLock)
- {
- invoker.usedPooled--;
- }
- }
- }
- }
}
More information about the jboss-cvs-commits
mailing list