[jboss-remoting-commits] JBoss Remoting SVN: r4213 - remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Wed May 21 23:38:26 EDT 2008
Author: ron.sigal at jboss.com
Date: 2008-05-21 23:38:26 -0400 (Wed, 21 May 2008)
New Revision: 4213
Modified:
remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java
Log:
JBREM-982: (1) Introduced variable useOnewayConnectionTimeout; (2) gets classloader, InputStream, and OutputStream during initialization..
Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java 2008-05-22 03:35:00 UTC (rev 4212)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java 2008-05-22 03:38:26 UTC (rev 4213)
@@ -52,8 +52,6 @@
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -100,6 +98,8 @@
{
return idGenerator++;
}
+
+ private static ClassLoader classLoader = SecurityUtility.getClassLoader(ServerThread.class);
// Attributes -----------------------------------------------------------------------------------
@@ -124,6 +124,9 @@
protected Marshaller marshaller;
protected UnMarshaller unmarshaller;
+
+ protected int version;
+ protected boolean performVersioning;
// the unique identity of the thread, which won't change during the life of the thread. The
// thread may get associated with different IP addresses though.
@@ -146,6 +149,8 @@
private boolean reuseAfterTimeout;
+ private boolean useOnewayConnectionTimeout = false;
+
// Constructors ---------------------------------------------------------------------------------
public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
@@ -166,6 +171,9 @@
if (invoker != null)
{
+ version = invoker.getVersion();
+ performVersioning = Version.performVersioning(version);
+
Map configMap = invoker.getConfiguration();
String checkValue = (String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
if (checkValue != null && checkValue.length() > 0)
@@ -191,6 +199,21 @@
" value of " + o + " to a int value");
}
}
+
+ o = configMap.get(MicroSocketClientInvoker.USE_ONEWAY_CONNECTION_TIMEOUT);
+ if (o != null)
+ {
+ try
+ {
+ useOnewayConnectionTimeout = Boolean.valueOf((String)o).booleanValue();
+ log.debug(this + " setting useOnewayConnectionTimeout to " + useOnewayConnectionTimeout);
+ }
+ catch (Exception e)
+ {
+ log.warn(this + " could not convert " + MicroSocketClientInvoker.USE_ONEWAY_CONNECTION_TIMEOUT +
+ " value of " + o + " to a boolean value");
+ }
+ }
}
}
@@ -454,6 +477,8 @@
protected void dorun()
{
running = true;
+ InputStream inputStream = null;
+ OutputStream outputStream = null;
// lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
// it done on the acceptor thread (prone to lockup)
@@ -463,7 +488,9 @@
socketWrapper =
createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
-
+ inputStream = socketWrapper.getInputStream();
+ outputStream = socketWrapper.getOutputStream();
+
boolean valueSet = false;
Map configMap = invoker.getConfiguration();
Object o = configMap.get(CONTINUE_AFTER_TIMEOUT);
@@ -492,7 +519,7 @@
// Always do first one without an ACK because its not needed
if(trace) { log.trace("processing first invocation without acknowledging"); }
- processInvocation(socketWrapper);
+ processInvocation(socketWrapper, inputStream, outputStream);
}
catch (Exception ex)
{
@@ -519,7 +546,7 @@
}
acknowledge(socketWrapper);
- processInvocation(socketWrapper);
+ processInvocation(socketWrapper, inputStream, outputStream);
}
catch (EvictionException e)
{
@@ -633,15 +660,11 @@
}
}
- protected void processInvocation(SocketWrapper socketWrapper) throws Exception
+ protected void processInvocation(SocketWrapper socketWrapper, InputStream inputStream, OutputStream outputStream) throws Exception
{
if(trace) { log.trace("preparing to process next invocation invocation"); }
// Ok, now read invocation and invoke
-
- int version = invoker.getVersion();
- boolean performVersioning = Version.performVersioning(version);
- InputStream inputStream = socketWrapper.getInputStream();
if (performVersioning)
{
@@ -669,24 +692,16 @@
}
}
- completeInvocation(socketWrapper, inputStream, performVersioning, version);
+ completeInvocation(socketWrapper, inputStream, outputStream, performVersioning, version);
}
protected synchronized void completeInvocation(
SocketWrapper socketWrapper,
InputStream inputStream,
- boolean performVersioning,
- int version)
+ OutputStream outputStream,
+ boolean performVersioning, int version)
throws Exception
{
- ClassLoader classLoader = (ClassLoader) AccessController.doPrivileged( new PrivilegedAction()
- {
- public Object run()
- {
- return ServerThread.class.getClassLoader();
- }
- });
-
Object obj = versionedRead(inputStream, invoker, classLoader, version);
// setting timestamp since about to start processing
@@ -741,6 +756,14 @@
{
if(trace) { log.trace("oneway request, writing no reply on the wire"); }
}
+ else if (isOneway(req))
+ {
+ if (useOnewayConnectionTimeout && performVersioning)
+ {
+ writeVersion(outputStream, version);
+ outputStream.flush();
+ }
+ }
else
{
if(!createdInvocationRequest)
@@ -750,15 +773,11 @@
resp = new InvocationResponse(req.getSessionId(), resp, isError, req.getReturnPayload());
}
- OutputStream outputStream = socketWrapper.getOutputStream();
if (performVersioning)
{
writeVersion(outputStream, version);
}
-
- if (isOneway(req))
- resp = null;
-
+
versionedWrite(outputStream, invoker, classLoader, resp, version);
}
@@ -939,8 +958,6 @@
private void processNewSocket()
{
InvokerLocator locator = invoker.getLocator();
-
- ClassLoader classLoader = SecurityUtility.getClassLoader(ServerThread.class);
String dataType = invoker.getDataType();
String serializationType = invoker.getSerializationType();
@@ -995,8 +1012,12 @@
private int readVersion(InputStream inputStream) throws Exception
{
- if(trace) { log.trace(this + " blocking to read version from input stream"); }
- long start = System.currentTimeMillis();
+ long start = -1;
+ if(trace)
+ {
+ log.trace(this + " blocking to read version from input stream");
+ start = System.currentTimeMillis();
+ }
synchronized (evictionLock)
{
More information about the jboss-remoting-commits
mailing list