Author: ron.sigal(a)jboss.com
Date: 2008-05-21 23:38:26 -0400 (Wed, 21 May 2008)
New Revision: 4213
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java
Log:
JBREM-982: (1) Introduced variable useOnewayConnectionTimeout; (2) gets classloader,
InputStream, and OutputStream during initialization..
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java
===================================================================
---
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java 2008-05-22
03:35:00 UTC (rev 4212)
+++
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java 2008-05-22
03:38:26 UTC (rev 4213)
@@ -52,8 +52,6 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -100,6 +98,8 @@
{
return idGenerator++;
}
+
+ private static ClassLoader classLoader =
SecurityUtility.getClassLoader(ServerThread.class);
// Attributes
-----------------------------------------------------------------------------------
@@ -124,6 +124,9 @@
protected Marshaller marshaller;
protected UnMarshaller unmarshaller;
+
+ protected int version;
+ protected boolean performVersioning;
// the unique identity of the thread, which won't change during the life of the
thread. The
// thread may get associated with different IP addresses though.
@@ -146,6 +149,8 @@
private boolean reuseAfterTimeout;
+ private boolean useOnewayConnectionTimeout = false;
+
// Constructors
---------------------------------------------------------------------------------
public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
@@ -166,6 +171,9 @@
if (invoker != null)
{
+ version = invoker.getVersion();
+ performVersioning = Version.performVersioning(version);
+
Map configMap = invoker.getConfiguration();
String checkValue =
(String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
if (checkValue != null && checkValue.length() > 0)
@@ -191,6 +199,21 @@
" value of " + o + " to a int value");
}
}
+
+ o = configMap.get(MicroSocketClientInvoker.USE_ONEWAY_CONNECTION_TIMEOUT);
+ if (o != null)
+ {
+ try
+ {
+ useOnewayConnectionTimeout = Boolean.valueOf((String)o).booleanValue();
+ log.debug(this + " setting useOnewayConnectionTimeout to " +
useOnewayConnectionTimeout);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " +
MicroSocketClientInvoker.USE_ONEWAY_CONNECTION_TIMEOUT +
+ " value of " + o + " to a boolean value");
+ }
+ }
}
}
@@ -454,6 +477,8 @@
protected void dorun()
{
running = true;
+ InputStream inputStream = null;
+ OutputStream outputStream = null;
// lazy initialize the socketWrapper on the worker thread itself. We do this to
avoid to have
// it done on the acceptor thread (prone to lockup)
@@ -463,7 +488,9 @@
socketWrapper =
createServerSocketWrapper(socket, timeout,
invoker.getLocator().getParameters());
-
+ inputStream = socketWrapper.getInputStream();
+ outputStream = socketWrapper.getOutputStream();
+
boolean valueSet = false;
Map configMap = invoker.getConfiguration();
Object o = configMap.get(CONTINUE_AFTER_TIMEOUT);
@@ -492,7 +519,7 @@
// Always do first one without an ACK because its not needed
if(trace) { log.trace("processing first invocation without
acknowledging"); }
- processInvocation(socketWrapper);
+ processInvocation(socketWrapper, inputStream, outputStream);
}
catch (Exception ex)
{
@@ -519,7 +546,7 @@
}
acknowledge(socketWrapper);
- processInvocation(socketWrapper);
+ processInvocation(socketWrapper, inputStream, outputStream);
}
catch (EvictionException e)
{
@@ -633,15 +660,11 @@
}
}
- protected void processInvocation(SocketWrapper socketWrapper) throws Exception
+ protected void processInvocation(SocketWrapper socketWrapper, InputStream inputStream,
OutputStream outputStream) throws Exception
{
if(trace) { log.trace("preparing to process next invocation invocation");
}
// Ok, now read invocation and invoke
-
- int version = invoker.getVersion();
- boolean performVersioning = Version.performVersioning(version);
- InputStream inputStream = socketWrapper.getInputStream();
if (performVersioning)
{
@@ -669,24 +692,16 @@
}
}
- completeInvocation(socketWrapper, inputStream, performVersioning, version);
+ completeInvocation(socketWrapper, inputStream, outputStream, performVersioning,
version);
}
protected synchronized void completeInvocation(
SocketWrapper socketWrapper,
InputStream inputStream,
- boolean performVersioning,
- int version)
+ OutputStream outputStream,
+ boolean performVersioning, int version)
throws Exception
{
- ClassLoader classLoader = (ClassLoader) AccessController.doPrivileged( new
PrivilegedAction()
- {
- public Object run()
- {
- return ServerThread.class.getClassLoader();
- }
- });
-
Object obj = versionedRead(inputStream, invoker, classLoader, version);
// setting timestamp since about to start processing
@@ -741,6 +756,14 @@
{
if(trace) { log.trace("oneway request, writing no reply on the wire");
}
}
+ else if (isOneway(req))
+ {
+ if (useOnewayConnectionTimeout && performVersioning)
+ {
+ writeVersion(outputStream, version);
+ outputStream.flush();
+ }
+ }
else
{
if(!createdInvocationRequest)
@@ -750,15 +773,11 @@
resp = new InvocationResponse(req.getSessionId(), resp, isError,
req.getReturnPayload());
}
- OutputStream outputStream = socketWrapper.getOutputStream();
if (performVersioning)
{
writeVersion(outputStream, version);
}
-
- if (isOneway(req))
- resp = null;
-
+
versionedWrite(outputStream, invoker, classLoader, resp, version);
}
@@ -939,8 +958,6 @@
private void processNewSocket()
{
InvokerLocator locator = invoker.getLocator();
-
- ClassLoader classLoader = SecurityUtility.getClassLoader(ServerThread.class);
String dataType = invoker.getDataType();
String serializationType = invoker.getSerializationType();
@@ -995,8 +1012,12 @@
private int readVersion(InputStream inputStream) throws Exception
{
- if(trace) { log.trace(this + " blocking to read version from input
stream"); }
- long start = System.currentTimeMillis();
+ long start = -1;
+ if(trace)
+ {
+ log.trace(this + " blocking to read version from input stream");
+ start = System.currentTimeMillis();
+ }
synchronized (evictionLock)
{