[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ron Sigal ron_sigal at yahoo.com
Fri Jul 6 02:04:43 EDT 2007


  User: rsigal  
  Date: 07/07/06 02:04:43

  Modified:    src/main/org/jboss/remoting/transport/socket  Tag:
                        remoting_2_x MicroSocketClientInvoker.java
  Log:
  JBREM-706:  Check usability of socket used form client side oneway invocation on same thread as invocation.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.16.2.32 +43 -223   JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MicroSocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
  retrieving revision 1.16.2.31
  retrieving revision 1.16.2.32
  diff -u -b -r1.16.2.31 -r1.16.2.32
  --- MicroSocketClientInvoker.java	4 Jul 2007 03:46:39 -0000	1.16.2.31
  +++ MicroSocketClientInvoker.java	6 Jul 2007 06:04:43 -0000	1.16.2.32
  @@ -9,7 +9,6 @@
   import org.jboss.remoting.ServerInvoker;
   import org.jboss.remoting.Version;
   import org.jboss.remoting.serialization.ClassLoaderUtility;
  -import org.jboss.remoting.invocation.InternalInvocation;
   import org.jboss.remoting.invocation.OnewayInvocation;
   import org.jboss.remoting.marshal.Marshaller;
   import org.jboss.remoting.marshal.UnMarshaller;
  @@ -32,8 +31,6 @@
   import java.util.LinkedList;
   import java.util.Map;
   import java.util.Properties;
  -import java.util.Timer;
  -import java.util.TimerTask;
   
   /**
    * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
  @@ -43,7 +40,7 @@
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.16.2.31 $
  + * @version $Revision: 1.16.2.32 $
    */
   public class MicroSocketClientInvoker extends RemoteClientInvoker
   {
  @@ -74,9 +71,6 @@
      public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
      
      /** Key for setting timeout used by OnewayConnectionTask */
  -   public static final String ONEWAY_CONNECTION_DELAY = "onewayConnectionDelay";
  -   
  -   /** Key for setting timeout used by OnewayConnectionTask */
      public static final String ONEWAY_CONNECTION_TIMEOUT = "onewayConnectionTimeout";
   
      /**
  @@ -102,11 +96,8 @@
       */
      public static final int MAX_POOL_SIZE = 50;
      
  -   /** Default delay value used by OnewayConnectionTask.  Value is 5 seconds. */
  -   public static final int ONEWAY_CONNECTION_DELAY_DEFAULT = 5000;
  -
  -   /** Default timeout value used by OnewayConnectionTask.  Value is 1 seconds. */
  -   public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 1000;
  +   /** Default timeout value used by OnewayConnectionTask.  Value is 2 seconds. */
  +   public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 2000;
   
      // Static ---------------------------------------------------------------------------------------
   
  @@ -119,8 +110,6 @@
   
      protected static final Map connectionPools = new HashMap();
      
  -   protected Timer onewayConnectionTimer;
  -
      // Performance measurements
      public static long getSocketTime = 0;
      public static long readTime = 0;
  @@ -210,7 +199,6 @@
      protected int numberOfRetries;
      protected int numberOfCallRetries;
      protected int maxPoolSize;
  -   protected int onewayConnectionDelay;
      protected int onewayConnectionTimeout;
   
      /**
  @@ -248,7 +236,6 @@
         numberOfCallRetries = MAX_CALL_RETRIES;
         pool = null;
         maxPoolSize = MAX_POOL_SIZE;
  -      onewayConnectionDelay = ONEWAY_CONNECTION_DELAY_DEFAULT;
         onewayConnectionTimeout = ONEWAY_CONNECTION_TIMEOUT_DEFAULT;
         usedPooled = 0;
         usedPoolLock = new Object();
  @@ -454,22 +441,6 @@
            log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
         }
         
  -      // look for onewayConnectionDelay param
  -      val = params.get(ONEWAY_CONNECTION_DELAY);
  -      if (val != null)
  -      {
  -         try
  -         {
  -            onewayConnectionDelay = Integer.valueOf((String)val).intValue();
  -            log.debug(this + " setting onewayConnectionDelay to " + onewayConnectionDelay);
  -         }
  -         catch (Exception e)
  -         {
  -            log.warn(this + " could not convert " + ONEWAY_CONNECTION_DELAY + " value of " +
  -                     val + " to an int value");
  -         }
  -      }
  -      
         // look for onewayConnectionTimeout param
         val = params.get(ONEWAY_CONNECTION_TIMEOUT);
         if (val != null)
  @@ -558,6 +529,14 @@
            }
         }
   
  +      boolean serverSideOneway = false;
  +      if (oneway && invocation instanceof InvocationRequest)
  +      {
  +         InvocationRequest ir = (InvocationRequest) invocation;
  +         if (ir.getParameter() instanceof OnewayInvocation)
  +            serverSideOneway = true;
  +      }
  +      
         int retryCount = 0;
         SocketException sockEx = null;
   
  @@ -613,12 +592,19 @@
               writeTime += end;
               start = System.currentTimeMillis();
   
  -            if (oneway)
  +            if (serverSideOneway)
               {
                  if(trace) { log.trace(this + " sent oneway invocation, so not waiting for response, returning null"); }
               }
               else
               {
  +               int onewaySavedTimeout = -1;
  +               if (oneway)
  +               {
  +                  onewaySavedTimeout = socketWrapper.getTimeout();
  +                  socketWrapper.setTimeout(onewayConnectionTimeout);
  +               }
  +               
                  InputStream inputStream = socketWrapper.getInputStream();
                  if (performVersioning)
                  {
  @@ -635,6 +621,13 @@
                  }
   
                  response = versionedRead(inputStream, unmarshaller, version);
  +               
  +               // Note that if an exception is thrown, the socket is thrown away,
  +               // so there's no need to reset the timeout value.
  +               if (oneway)
  +               {
  +                  socketWrapper.setTimeout(onewaySavedTimeout);
  +               }
               }
   
               end = System.currentTimeMillis() - start;
  @@ -694,6 +687,10 @@
               catch (Exception ignored)
               {
               }
  +            
  +            if (oneway)
  +               return null;
  +            else
               return handleException(ex, socketWrapper);
            }
   
  @@ -707,33 +704,6 @@
            handleException(sockEx, socketWrapper);
         }
         
  -      boolean clientSideOneway = oneway;
  -      if (invocation instanceof InvocationRequest)
  -      {
  -         InvocationRequest ir = (InvocationRequest) invocation;
  -         if (ir.getParameter() instanceof OnewayInvocation)
  -            clientSideOneway = false;
  -      }
  -
  -      if (clientSideOneway)
  -      {
  -         // Hand over to oneway connection task to test if connection may be reused.
  -         synchronized (OnewayConnectionTask.class)
  -         {
  -            if (onewayConnectionTimer == null)
  -            {
  -               onewayConnectionTimer = new Timer(true);
  -               log.debug(this + " created oneway connection timer: " + onewayConnectionTimer);
  -            }
  -         }
  -         
  -         TimerTask onewayConnectionTask
  -            = new OnewayConnectionTask(this, socketWrapper, marshaller, unmarshaller);
  -         onewayConnectionTimer.schedule(onewayConnectionTask, onewayConnectionDelay);
  -         if (trace) log.trace(this + " scheduled OnewayConnectionTask for: " + socketWrapper);
  -      }
  -      else
  -      {
            // Put socket back in pool for reuse
            synchronized (pool)
            {
  @@ -758,7 +728,6 @@
                  }
               }
            }
  -      }
   
         if (trace && !oneway) { log.trace(this + " received response " + response);  }
         return response;
  @@ -1077,153 +1046,4 @@
      }
   
      // Inner classes --------------------------------------------------------------------------------
  -
  -   /**
  -    * OnewayConnectionTask takes sockets used for client side oneway invocations and,
  -    * once the oneway invocation has concluded, returns the socket to the socket pool.
  -    */
  -   protected static class OnewayConnectionTask extends TimerTask 
  -   {
  -      private static InvocationRequest invocation;
  -      
  -      static
  -      {
  -         Object[] params = new Object[1];
  -         params[0] = "onewayEcho";
  -         String echoMethod = InternalInvocation.ECHO;
  -         InternalInvocation ii = new InternalInvocation(echoMethod, params); 
  -         invocation = new InvocationRequest(null, null, ii, null, null, null);
  -      }
  -      
  -      private MicroSocketClientInvoker invoker;
  -      private SocketWrapper wrapper;
  -      private Marshaller marshaller;
  -      private UnMarshaller unmarshaller;
  -      private int onewayConnectionTimeout;
  -
  -      
  -      public OnewayConnectionTask(MicroSocketClientInvoker invoker,
  -                                         SocketWrapper wrapper,
  -                                         Marshaller marshaller,
  -                                         UnMarshaller unmarshaller)
  -      {
  -         this.invoker = invoker;
  -         this.wrapper = wrapper;
  -         this.marshaller = marshaller;
  -         this.unmarshaller = unmarshaller;
  -         this.onewayConnectionTimeout = invoker.onewayConnectionTimeout;
  -      }
  -      
  -      public void run()
  -      {
  -         try
  -         {
  -            int originalTimeout = wrapper.getTimeout();
  -            wrapper.setTimeout(onewayConnectionTimeout);
  -            OutputStream outputStream = wrapper.getOutputStream();
  -            InputStream inputStream = wrapper.getInputStream();
  -            int version = Version.getDefaultVersion();
  -            boolean performVersioning = Version.performVersioning();
  -            
  -            if (performVersioning)
  -            {
  -               invoker.writeVersion(outputStream, version);
  -            }
  -
  -            invoker.versionedWrite(outputStream, marshaller, invocation, version);
  -
  -            if (performVersioning)
  -            {
  -               int readVersion = invoker.readVersion(inputStream);
  -
  -               if (readVersion == -1)
  -               {
  -                  if (trace) log.trace(invoker + "end of file: closing connection");
  -                  destroyConnection(wrapper, true);
  -               }
  -               if (readVersion == SocketWrapper.CLOSING)
  -               {
  -                  log.info(invoker + " Received version 254: treating as end of file");
  -                  destroyConnection(wrapper, true);
  -               }
  -            }
  -
  -            invoker.versionedRead(inputStream, unmarshaller, version);
  -            returnConnection(wrapper, originalTimeout);
  -         }
  -         catch (Exception e)
  -         {
  -            if (trace) log.trace(invoker + " oneway connection task error: ", e);
  -            destroyConnection(wrapper, true);
  -         }
  -      }
  -      
  -      private void returnConnection(SocketWrapper wrapper, int originalTimeout)
  -      {
  -         if (trace) log.trace(invoker + " returning connection to pool: " + wrapper);
  -         
  -         // Put socket back in pool for reuse
  -         try
  -         {
  -            synchronized (invoker.pool)
  -            {
  -               if (invoker.pool.size() < invoker.maxPoolSize)
  -               {
  -                  wrapper.setTimeout(originalTimeout);
  -                  invoker.pool.add(wrapper);
  -                  synchronized(invoker.usedPoolLock)
  -                  {
  -                     invoker.usedPooled--;
  -                  }
  -                  if (trace) { log.trace(invoker + " returned " + wrapper + " to pool"); }
  -               }
  -               else
  -               {
  -                  if (trace) { log.trace(invoker + "'pool is full, will close connection: " + wrapper); }
  -                  destroyConnection(wrapper, false);
  -               }
  -            }
  -         }
  -         catch (Exception e)
  -         {
  -            if (trace)
  -            {
  -               log.trace(invoker + " error: " + e.getMessage(), e);
  -               log.trace(invoker + " closing connection: " + wrapper);
  -            }
  -            try
  -            {
  -               destroyConnection(wrapper, true);
  -               synchronized(invoker.usedPoolLock)
  -               {
  -                  invoker.usedPooled--;
  -               }
  -            }
  -            catch (Exception ignored)
  -            {
  -            } 
  -         }
  -      }
  -      
  -      private void destroyConnection(SocketWrapper wrapper, boolean decrement)
  -      {
  -         if (trace) log.trace(invoker + " destroying connection: " + wrapper);
  -         
  -         try
  -         {
  -            wrapper.close();
  -         }
  -         catch (Exception e)
  -         {
  -         }
  -         
  -         if (decrement)
  -         {
  -            synchronized(invoker.usedPoolLock)
  -            {
  -               invoker.usedPooled--;
  -            }
  -         }
  -      }
  -   }
   }
  
  
  



More information about the jboss-cvs-commits mailing list