[jboss-remoting-commits] JBoss Remoting SVN: r4213 - remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed May 21 23:38:26 EDT 2008


Author: ron.sigal at jboss.com
Date: 2008-05-21 23:38:26 -0400 (Wed, 21 May 2008)
New Revision: 4213

Modified:
   remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java
Log:
JBREM-982: (1) Introduced variable useOnewayConnectionTimeout; (2) gets classloader, InputStream, and OutputStream during initialization..

Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java	2008-05-22 03:35:00 UTC (rev 4212)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/transport/socket/ServerThread.java	2008-05-22 03:38:26 UTC (rev 4213)
@@ -52,8 +52,6 @@
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -100,6 +98,8 @@
    {
       return idGenerator++;
    }
+   
+   private static ClassLoader classLoader = SecurityUtility.getClassLoader(ServerThread.class);
 
    // Attributes -----------------------------------------------------------------------------------
 
@@ -124,6 +124,9 @@
 
    protected Marshaller marshaller;
    protected UnMarshaller unmarshaller;
+   
+   protected int version;
+   protected boolean performVersioning;
 
    // the unique identity of the thread, which won't change during the life of the thread. The
    // thread may get associated with different IP addresses though.
@@ -146,6 +149,8 @@
    
    private boolean reuseAfterTimeout;
 
+   private boolean useOnewayConnectionTimeout = false;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerThread(Socket socket, SocketServerInvoker invoker, LRUPool clientpool,
@@ -166,6 +171,9 @@
 
       if (invoker != null)
       {
+         version = invoker.getVersion();
+         performVersioning = Version.performVersioning(version);
+         
          Map configMap = invoker.getConfiguration();
          String checkValue = (String)configMap.get(SocketServerInvoker.CHECK_CONNECTION_KEY);
          if (checkValue != null && checkValue.length() > 0)
@@ -191,6 +199,21 @@
                         " value of " + o + " to a int value");
             }
          }
+         
+         o = configMap.get(MicroSocketClientInvoker.USE_ONEWAY_CONNECTION_TIMEOUT);
+         if (o != null)
+         {
+            try
+            {
+               useOnewayConnectionTimeout = Boolean.valueOf((String)o).booleanValue();
+               log.debug(this + " setting useOnewayConnectionTimeout to " + useOnewayConnectionTimeout);
+            }
+            catch (Exception e)
+            {
+               log.warn(this + " could not convert " + MicroSocketClientInvoker.USE_ONEWAY_CONNECTION_TIMEOUT +
+                        " value of " + o + " to a boolean value");
+            }
+         }
       }
    }
 
@@ -454,6 +477,8 @@
    protected void dorun()
    {
       running = true;
+      InputStream inputStream = null;
+      OutputStream outputStream = null;
 
       // lazy initialize the socketWrapper on the worker thread itself. We do this to avoid to have
       // it done on the acceptor thread (prone to lockup)
@@ -463,7 +488,9 @@
 
          socketWrapper =
             createServerSocketWrapper(socket, timeout, invoker.getLocator().getParameters());
-
+         inputStream = socketWrapper.getInputStream();
+         outputStream = socketWrapper.getOutputStream();
+         
          boolean valueSet = false;
          Map configMap = invoker.getConfiguration();
          Object o = configMap.get(CONTINUE_AFTER_TIMEOUT);
@@ -492,7 +519,7 @@
          
          // Always do first one without an ACK because its not needed
          if(trace) { log.trace("processing first invocation without acknowledging"); }
-         processInvocation(socketWrapper);
+         processInvocation(socketWrapper, inputStream, outputStream);
       }
       catch (Exception ex)
       {
@@ -519,7 +546,7 @@
             }
             
             acknowledge(socketWrapper);
-            processInvocation(socketWrapper);
+            processInvocation(socketWrapper, inputStream, outputStream);
          }
          catch (EvictionException e)
          {
@@ -633,15 +660,11 @@
       }
    }
 
-   protected void processInvocation(SocketWrapper socketWrapper) throws Exception
+   protected void processInvocation(SocketWrapper socketWrapper, InputStream inputStream, OutputStream outputStream) throws Exception
    {
       if(trace) { log.trace("preparing to process next invocation invocation"); }
 
       // Ok, now read invocation and invoke
-      
-      int version = invoker.getVersion();
-      boolean performVersioning = Version.performVersioning(version);
-      InputStream inputStream = socketWrapper.getInputStream();
 
       if (performVersioning)
       {
@@ -669,24 +692,16 @@
          }
       }
       
-      completeInvocation(socketWrapper, inputStream, performVersioning, version);
+      completeInvocation(socketWrapper, inputStream, outputStream, performVersioning, version);
    }
    
    protected synchronized void completeInvocation(
                                   SocketWrapper socketWrapper,
                                   InputStream inputStream,
-                                  boolean performVersioning,
-                                  int version)
+                                  OutputStream outputStream,
+                                  boolean performVersioning, int version)
    throws Exception
    {  
-      ClassLoader classLoader = (ClassLoader) AccessController.doPrivileged( new PrivilegedAction()
-      {
-         public Object run()
-         {
-            return ServerThread.class.getClassLoader();
-         }
-      });
-      
       Object obj = versionedRead(inputStream, invoker, classLoader, version);
 
       // setting timestamp since about to start processing
@@ -741,6 +756,14 @@
       {
          if(trace) { log.trace("oneway request, writing no reply on the wire"); }
       }
+      else if (isOneway(req))
+      {
+         if (useOnewayConnectionTimeout && performVersioning)
+         {
+            writeVersion(outputStream, version);
+            outputStream.flush();
+         }
+      }
       else
       {
          if(!createdInvocationRequest)
@@ -750,15 +773,11 @@
             resp = new InvocationResponse(req.getSessionId(), resp, isError, req.getReturnPayload());
          }
 
-         OutputStream outputStream = socketWrapper.getOutputStream();
          if (performVersioning)
          {
             writeVersion(outputStream, version);
          }
-
-         if (isOneway(req))
-            resp = null;
-         
+    
          versionedWrite(outputStream, invoker, classLoader, resp, version);
       }
 
@@ -939,8 +958,6 @@
    private void processNewSocket()
    {
       InvokerLocator locator = invoker.getLocator();
-      
-      ClassLoader classLoader = SecurityUtility.getClassLoader(ServerThread.class);
       String dataType = invoker.getDataType();
       String serializationType = invoker.getSerializationType();
 
@@ -995,8 +1012,12 @@
    
    private int readVersion(InputStream inputStream) throws Exception
    {
-      if(trace) { log.trace(this + " blocking to read version from input stream"); }
-      long start = System.currentTimeMillis();
+      long start = -1;
+      if(trace) 
+      { 
+         log.trace(this + " blocking to read version from input stream");
+         start = System.currentTimeMillis();
+      }
 
       synchronized (evictionLock)
       {




More information about the jboss-remoting-commits mailing list