[jboss-remoting-commits] JBoss Remoting SVN: r5365 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Tue Aug 18 21:59:20 EDT 2009


Author: ron.sigal at jboss.com
Date: 2009-08-18 21:59:20 -0400 (Tue, 18 Aug 2009)
New Revision: 5365

Modified:
   remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
Log:
JBREM-1120: Added writeTimeout facility; JBREM-1146: added generalized retriable IOException facility.

Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java	2009-08-19 01:53:51 UTC (rev 5364)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java	2009-08-19 01:59:20 UTC (rev 5365)
@@ -76,7 +76,10 @@
     * or wrapped in a RuntimeException.
     */
    public static final String WRAP_INTERRUPTED_EXCEPTION = "wrapInterruptedException";
-
+  
+   /** Key for setting socket write timeout */
+   public static final String WRITE_TIMEOUT = "writeTimeout";
+   
    /**
     * Default value for enable TCP nodelay. Value is false.
     */
@@ -215,6 +218,14 @@
    public Object usedPoolLock;
    
    protected boolean wrapInterruptedException = false;
+   
+   /**
+    * If true, an IOException with message "Connection reset by peer: socket write error" will 
+    * be treated like a SocketException.
+    */
+   protected boolean generalizeSocketException;
+   
+   protected int writeTimeout = -1;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -280,6 +291,38 @@
       reuseAddress = reuse;
    }
 
+   public int getWriteTimeout()
+   {
+      return writeTimeout;
+   }
+
+   public void setWriteTimeout(int writeTimeout)
+   {
+      this.writeTimeout = writeTimeout;
+   }
+   
+   /**
+    * Get the generalizeSocketException.
+    * 
+    * @return the generalizeSocketException.
+    */
+   
+   public synchronized boolean isGeneralizeSocketException()
+   {
+      return generalizeSocketException;
+   }
+
+   /**
+    * Set the generalizeSocketException.
+    * 
+    * @param generalizeSocketException The generalizeSocketException to set.
+    */
+   
+   public synchronized void setGeneralizeSocketException(boolean generalizeSocketException)
+   {
+      this.generalizeSocketException = generalizeSocketException;
+   }
+
    public synchronized void disconnect()
    {
       log.debug(this + " disconnecting ...");
@@ -443,6 +486,22 @@
          shouldCheckConnection = true;
          log.debug(this + " setting shouldCheckConnection to " + shouldCheckConnection);
       }
+      
+      // look for writeTimeout param
+      val = params.get(WRITE_TIMEOUT);
+      if (val != null)
+      {
+         try
+         {
+            writeTimeout = Integer.valueOf((String)val).intValue();
+            log.debug(this + " setting writeTimeout to " + writeTimeout);
+         }
+         catch (Exception e)
+         {
+            log.warn(this + " could not convert " + WRITE_TIMEOUT + " value of " +
+                     val + " to an int value");
+         }
+      }
    }
 
    protected ServerAddress createServerAddress()
@@ -616,47 +675,27 @@
          }
          catch (SocketException sex)
          {
-            log.debug(this + " got SocketException " + sex);
-
-            try
-            {
-               semaphore.release();
-               if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
-               socketWrapper.close();            
-            }
-            catch (Exception ex)
-            {
-               if (trace) { log.trace(this + " couldn't successfully close its socketWrapper", ex); }
-            }
-
-            /**
-             * About to run out of retries and
-             * pool may be full of timed out sockets,
-             * so want to flush the pool and try with
-             * fresh socket as a last effort.
-             */
-            if (retryCount == (numberOfCallRetries - 2))
-            {
-               flushConnectionPool();
-            }
+            handleSocketException(sex, socketWrapper, semaphore, retryCount);
             sockEx = sex;
             continue;
          }
-         catch (Exception ex)
+         catch (IOException e)
          {
-            log.debug(this + " got exception " + ex);
-
-            try
+            if (isGeneralizeSocketException() && e.getMessage().startsWith("Connection reset"))
             {
-               semaphore.release();
-               if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
-               socketWrapper.close();
+               handleSocketException(e, socketWrapper, semaphore, retryCount);
+               sockEx = new SocketException(e.getMessage());
+               continue;
             }
-            catch (Exception ignored)
+            else
             {
+               return handleOtherException(e, semaphore, socketWrapper);
             }
-            return handleException(ex, socketWrapper);
          }
+         catch (Exception ex)
+         {
+            return handleOtherException(ex, semaphore, socketWrapper);
+         }
 
          // call worked, so no need to retry
          break;
@@ -695,6 +734,51 @@
       return response;
    }
 
+
+   protected void handleSocketException(Exception sex, SocketWrapper socketWrapper, Semaphore semaphore, int retryCount)
+   {
+      log.debug(this + " got SocketException " + sex);
+
+      try
+      {
+         semaphore.release();
+         if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
+         socketWrapper.close();            
+      }
+      catch (Exception ex)
+      {
+         if (trace) { log.trace(this + " couldn't successfully close its socketWrapper", ex); }
+      }
+
+      /**
+       * About to run out of retries and
+       * pool may be full of timed out sockets,
+       * so want to flush the pool and try with
+       * fresh socket as a last effort.
+       */
+      if (retryCount == (numberOfCallRetries - 2))
+      {
+         flushConnectionPool();
+      }
+   }
+   
+   protected Object handleOtherException(Exception ex, Semaphore semaphore, SocketWrapper socketWrapper)
+   throws ClassNotFoundException, MarshalException
+   {
+      log.debug(this + " got exception " + ex);
+
+      try
+      {
+         semaphore.release();
+         if (trace) log.trace(this + " released semaphore: " + semaphore.permits());
+         socketWrapper.close();
+      }
+      catch (Exception ignored)
+      {
+      }
+      return handleException(ex, socketWrapper);
+   }
+   
    protected Object handleException(Exception ex, SocketWrapper socketWrapper)
       throws ClassNotFoundException, MarshalException
    {
@@ -809,7 +893,10 @@
          }
          metadata.put(SocketWrapper.MARSHALLER, marshaller);
          metadata.put(SocketWrapper.UNMARSHALLER, unmarshaller);
-
+         if (writeTimeout > 0)
+         {
+            metadata.put(SocketWrapper.WRITE_TIMEOUT, new Integer(writeTimeout));
+         }
          if (timeAllowed > 0)
          {
             timeRemaining = (int) (timeAllowed - (System.currentTimeMillis() - start));



More information about the jboss-remoting-commits mailing list