[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/socket ...

Ron Sigal ron_sigal at yahoo.com
Sat Jun 30 03:31:09 EDT 2007


  User: rsigal  
  Date: 07/06/30 03:31:09

  Modified:    src/main/org/jboss/remoting/transport/socket  Tag:
                        remoting_2_x MicroSocketClientInvoker.java
  Log:
  JBREM-641:  Added management of connections used in client side oneway invocations.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.16.2.28 +247 -13   JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: MicroSocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java,v
  retrieving revision 1.16.2.27
  retrieving revision 1.16.2.28
  diff -u -b -r1.16.2.27 -r1.16.2.28
  --- MicroSocketClientInvoker.java	19 May 2007 02:41:08 -0000	1.16.2.27
  +++ MicroSocketClientInvoker.java	30 Jun 2007 07:31:09 -0000	1.16.2.28
  @@ -3,11 +3,15 @@
   import org.jboss.logging.Logger;
   import org.jboss.remoting.CannotConnectException;
   import org.jboss.remoting.ConnectionFailedException;
  +import org.jboss.remoting.InvocationRequest;
   import org.jboss.remoting.InvokerLocator;
   import org.jboss.remoting.RemoteClientInvoker;
   import org.jboss.remoting.ServerInvoker;
   import org.jboss.remoting.Version;
   import org.jboss.remoting.serialization.ClassLoaderUtility;
  +import org.jboss.remoting.callback.CallbackPoller;
  +import org.jboss.remoting.invocation.InternalInvocation;
  +import org.jboss.remoting.invocation.OnewayInvocation;
   import org.jboss.remoting.marshal.Marshaller;
   import org.jboss.remoting.marshal.UnMarshaller;
   import org.jboss.remoting.marshal.VersionedMarshaller;
  @@ -23,12 +27,17 @@
   import java.net.Socket;
   import java.net.SocketException;
   import java.net.InetSocketAddress;
  +import java.net.SocketTimeoutException;
   import java.rmi.MarshalException;
   import java.util.HashMap;
  +import java.util.HashSet;
   import java.util.Iterator;
   import java.util.LinkedList;
   import java.util.Map;
   import java.util.Properties;
  +import java.util.Set;
  +import java.util.Timer;
  +import java.util.TimerTask;
   
   /**
    * SocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker, which must be
  @@ -38,7 +47,7 @@
    * @author <a href="mailto:telrod at e2technologies.net">Tom Elrod</a>
    * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
    *
  - * @version $Revision: 1.16.2.27 $
  + * @version $Revision: 1.16.2.28 $
    */
   public class MicroSocketClientInvoker extends RemoteClientInvoker
   {
  @@ -68,6 +77,12 @@
       */
      public static final String CLIENT_SOCKET_CLASS_FLAG = "clientSocketClass";
   
  +   /** Key for setting timeout used by OnewayConnectionTask */
  +   public static final String ONEWAY_CONNECTION_DELAY = "onewayConnectionDelay";
  +   
  +   /** Key for setting timeout used by OnewayConnectionTask */
  +   public static final String ONEWAY_CONNECTION_TIMEOUT = "onewayConnectionTimeout";
  +
      /**
       * Default value for enable TCP nodelay. Value is false.
       */
  @@ -91,6 +106,11 @@
       */
      public static final int MAX_POOL_SIZE = 50;
   
  +   /** Default delay value used by OnewayConnectionTask.  Value is 5 seconds. */
  +   public static final int ONEWAY_CONNECTION_DELAY_DEFAULT = 5000;
  +
  +   /** Default timeout value used by OnewayConnectionTask.  Value is 1 seconds. */
  +   public static final int ONEWAY_CONNECTION_TIMEOUT_DEFAULT = 1000;
   
      // Static ---------------------------------------------------------------------------------------
   
  @@ -103,6 +123,8 @@
   
      protected static final Map connectionPools = new HashMap();
   
  +   protected Timer onewayConnectionTimer;
  +
      // Performance measurements
      public static long getSocketTime = 0;
      public static long readTime = 0;
  @@ -192,6 +214,8 @@
      protected int numberOfRetries;
      protected int numberOfCallRetries;
      protected int maxPoolSize;
  +   protected int onewayConnectionDelay;
  +   protected int onewayConnectionTimeout;
   
      /**
       * Pool for this invoker. This is shared between all instances of proxies attached to a specific
  @@ -228,6 +252,8 @@
         numberOfCallRetries = MAX_CALL_RETRIES;
         pool = null;
         maxPoolSize = MAX_POOL_SIZE;
  +      onewayConnectionDelay = ONEWAY_CONNECTION_DELAY_DEFAULT;
  +      onewayConnectionTimeout = ONEWAY_CONNECTION_TIMEOUT_DEFAULT;
         usedPooled = 0;
         usedPoolLock = new Object();
   
  @@ -431,6 +457,38 @@
            shouldCheckConnection = true;
            log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
         }
  +      
  +      // look for onewayConnectionDelay param
  +      val = params.get(ONEWAY_CONNECTION_DELAY);
  +      if (val != null)
  +      {
  +         try
  +         {
  +            onewayConnectionDelay = Integer.valueOf((String)val).intValue();
  +            log.debug(this + " setting onewayConnectionDelay to " + onewayConnectionDelay);
  +         }
  +         catch (Exception e)
  +         {
  +            log.warn(this + " could not convert " + ONEWAY_CONNECTION_DELAY + " value of " +
  +                     val + " to an int value");
  +         }
  +      }
  +      
  +      // look for onewayConnectionTimeout param
  +      val = params.get(ONEWAY_CONNECTION_TIMEOUT);
  +      if (val != null)
  +      {
  +         try
  +         {
  +            onewayConnectionTimeout = Integer.valueOf((String)val).intValue();
  +            log.debug(this + " setting onewayConnectionTimeout to " + onewayConnectionTimeout);
  +         }
  +         catch (Exception e)
  +         {
  +            log.warn(this + " could not convert " + ONEWAY_CONNECTION_TIMEOUT + " value of " +
  +                     val + " to an int value");
  +         }
  +      }
      }
   
      protected ServerAddress createServerAddress()
  @@ -653,6 +711,33 @@
            handleException(sockEx, socketWrapper);
         }
   
  +      boolean clientSideOneway = true;
  +      if (invocation instanceof InvocationRequest)
  +      {
  +         InvocationRequest ir = (InvocationRequest) invocation;
  +         if (ir.getParameter() instanceof OnewayInvocation)
  +            clientSideOneway = false;
  +      }
  +
  +      if (clientSideOneway)
  +      {
  +         // Hand over to oneway connection task to test if connection may be reused.
  +         synchronized (OnewayConnectionTask.class)
  +         {
  +            if (onewayConnectionTimer == null)
  +            {
  +               onewayConnectionTimer = new Timer(true);
  +               log.debug(this + " created oneway connection timer: " + onewayConnectionTimer);
  +            }
  +         }
  +         
  +         TimerTask onewayConnectionTask
  +            = new OnewayConnectionTask(this, socketWrapper, marshaller, unmarshaller);
  +         onewayConnectionTimer.schedule(onewayConnectionTask, onewayConnectionDelay);
  +         if (trace) log.trace(this + " scheduled OnewayConnectionTask for: " + socketWrapper);
  +      }
  +      else
  +      {
         // Put socket back in pool for reuse
         synchronized (pool)
         {
  @@ -677,6 +762,7 @@
               }
            }
         }
  +      }
   
         if (trace && !oneway) { log.trace(this + " received response " + response);  }
         return response;
  @@ -996,4 +1082,152 @@
   
      // Inner classes --------------------------------------------------------------------------------
   
  +   /**
  +    * OnewayConnectionTask takes sockets used for client side oneway invocations and,
  +    * once the oneway invocation has concluded, returns the socket to the socket pool.
  +    */
  +   protected static class OnewayConnectionTask extends TimerTask
  +   {
  +      private static InvocationRequest invocation;
  +      
  +      static
  +      {
  +         Object[] params = new Object[1];
  +         params[0] = "onewayEcho";
  +         String echoMethod = InternalInvocation.ECHO;
  +         InternalInvocation ii = new InternalInvocation(echoMethod, params); 
  +         invocation = new InvocationRequest(null, null, ii, null, null, null);
  +      }
  +      
  +      private MicroSocketClientInvoker invoker;
  +      private SocketWrapper wrapper;
  +      private Marshaller marshaller;
  +      private UnMarshaller unmarshaller;
  +      private int onewayConnectionTimeout;
  +
  +      
  +      public OnewayConnectionTask(MicroSocketClientInvoker invoker,
  +                                         SocketWrapper wrapper,
  +                                         Marshaller marshaller,
  +                                         UnMarshaller unmarshaller)
  +      {
  +         this.invoker = invoker;
  +         this.wrapper = wrapper;
  +         this.marshaller = marshaller;
  +         this.unmarshaller = unmarshaller;
  +         this.onewayConnectionTimeout = invoker.onewayConnectionTimeout;
  +      }
  +      
  +      public void run()
  +      {
  +         try
  +         {
  +            int originalTimeout = wrapper.getTimeout();
  +            wrapper.setTimeout(onewayConnectionTimeout);
  +            OutputStream outputStream = wrapper.getOutputStream();
  +            InputStream inputStream = wrapper.getInputStream();
  +            int version = Version.getDefaultVersion();
  +            boolean performVersioning = Version.performVersioning();
  +            
  +            if (performVersioning)
  +            {
  +               invoker.writeVersion(outputStream, version);
  +            }
  +
  +            invoker.versionedWrite(outputStream, marshaller, invocation, version);
  +
  +            if (performVersioning)
  +            {
  +               int readVersion = invoker.readVersion(inputStream);
  +
  +               if (readVersion == -1)
  +               {
  +                  if (trace) log.trace(invoker + "end of file: closing connection");
  +                  destroyConnection(wrapper, true);
  +               }
  +               if (readVersion == SocketWrapper.CLOSING)
  +               {
  +                  log.info(invoker + " Received version 254: treating as end of file");
  +                  destroyConnection(wrapper, true);
  +               }
  +            }
  +
  +            invoker.versionedRead(inputStream, unmarshaller, version);
  +            returnConnection(wrapper, originalTimeout);
  +         }
  +         catch (Exception e)
  +         {
  +            if (trace) log.trace(invoker + " oneway connection task error: ", e);
  +            destroyConnection(wrapper, true);
  +         }
  +      }
  +      
  +      private void returnConnection(SocketWrapper wrapper, int originalTimeout)
  +      {
  +         if (trace) log.trace(invoker + " returning connection to pool: " + wrapper);
  +         
  +         // Put socket back in pool for reuse
  +         try
  +         {
  +            synchronized (invoker.pool)
  +            {
  +               if (invoker.pool.size() < invoker.maxPoolSize)
  +               {
  +                  wrapper.setTimeout(originalTimeout);
  +                  invoker.pool.add(wrapper);
  +                  synchronized(invoker.usedPoolLock)
  +                  {
  +                     invoker.usedPooled--;
  +                  }
  +                  if (trace) { log.trace(invoker + " returned " + wrapper + " to pool"); }
  +               }
  +               else
  +               {
  +                  if (trace) { log.trace(invoker + "'pool is full, will close connection: " + wrapper); }
  +                  destroyConnection(wrapper, false);
  +               }
  +            }
  +         }
  +         catch (Exception e)
  +         {
  +            if (trace)
  +            {
  +               log.trace(invoker + " error: " + e.getMessage(), e);
  +               log.trace(invoker + " closing connection: " + wrapper);
  +            }
  +            try
  +            {
  +               destroyConnection(wrapper, true);
  +               synchronized(invoker.usedPoolLock)
  +               {
  +                  invoker.usedPooled--;
  +               }
  +            }
  +            catch (Exception ignored)
  +            {
  +            } 
  +         }
  +      }
  +      
  +      private void destroyConnection(SocketWrapper wrapper, boolean decrement)
  +      {
  +         if (trace) log.trace(invoker + " destroying connection: " + wrapper);
  +         
  +         try
  +         {
  +            wrapper.close();
  +         }
  +         catch (Exception e)
  +         {
  +         }
  +         
  +         if (decrement)
  +         {
  +            synchronized(invoker.usedPoolLock)
  +            {
  +               invoker.usedPooled--;
  +            }
  +         }
  +      }
  +   }
   }
  
  
  



More information about the jboss-cvs-commits mailing list