[jboss-remoting-commits] JBoss Remoting SVN: r5329 - remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Fri Aug 14 16:52:58 EDT 2009


Author: ron.sigal at jboss.com
Date: 2009-08-14 16:52:58 -0400 (Fri, 14 Aug 2009)
New Revision: 5329

Modified:
   remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
Log:
JBREM-1147: createSocket() periodically checks for updated controlOuputStream; JBREM-1140: removed changes.

Modified: remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
===================================================================
--- remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java	2009-08-13 00:37:00 UTC (rev 5328)
+++ remoting2/branches/2.2/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java	2009-08-14 20:52:58 UTC (rev 5329)
@@ -24,7 +24,6 @@
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.InputStream;
 import java.lang.reflect.Method;
 import java.net.Socket;
 import java.util.Collections;
@@ -82,16 +81,17 @@
    protected String listenerId;
 
    private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
+   private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
+   private int pingWindow = pingWindowFactor * pingFrequency;
    private int maxRetries = Bisocket.MAX_RETRIES_DEFAULT;
    private Socket controlSocket;
    private OutputStream controlOutputStream;
-   private InputStream controlInputStream;
    private Object controlLock = new Object();
    private PingTimerTask pingTimerTask;
    protected boolean isCallbackInvoker;
    protected BooleanHolder pingFailed = new BooleanHolder(false);
-   private boolean enablePingReplies = false;
 
+
    /**
     * @param listenerId
     * @return
@@ -188,6 +188,26 @@
             }
          }
          
+         val = configuration.get(Bisocket.PING_WINDOW_FACTOR);
+         if (val != null && val instanceof String && ((String) val).length() > 0)
+         {
+            try
+            {
+               pingWindowFactor = Integer.valueOf(((String) val)).intValue();
+               log.debug(this + " setting pingWindowFactor to " + pingWindowFactor);
+            }
+            catch (NumberFormatException e)
+            {
+               log.warn("Invalid format for " + "\"" + Bisocket.PING_WINDOW_FACTOR + "\": " + val);
+            }
+         }
+         else if (val != null)
+         {
+            log.warn("\"" + Bisocket.PING_WINDOW_FACTOR + "\" must be specified as a String");
+         }
+         
+         pingWindow = pingWindowFactor * pingFrequency;
+         
          val = configuration.get(Bisocket.MAX_RETRIES);
          if (val != null)
          {
@@ -203,14 +223,6 @@
                      " value of " + val + " to an int value.");
             }
          }
-
-         val = configuration.get(Bisocket.ENABLE_PING_REPLIES);
-         if (val != null)
-         {
-            // Boolean.valueOf doesn't throw exceptions :-)
-            boolean bVal = Boolean.valueOf((String) val).booleanValue();
-            enablePingReplies = bVal;
-         }
       }
    }
 
@@ -237,7 +249,20 @@
       this.pingFrequency = pingFrequency;
    }
 
-
+   
+   public int getPingWindowFactor()
+   {
+      return pingWindowFactor;
+   }
+   
+   
+   public void setPingWindowFactor(int pingWindowFactor)
+   {
+      this.pingWindowFactor = pingWindowFactor;
+      pingWindow = pingWindowFactor * pingFrequency;
+   }
+   
+   
    protected void handleConnect() throws ConnectionFailedException
    {
       // Callback client on server side.
@@ -287,7 +312,6 @@
             try
             {
                controlOutputStream = controlSocket.getOutputStream();
-               controlInputStream = controlSocket.getInputStream();
             }
             catch (IOException e1)
             {
@@ -461,35 +485,47 @@
 
       synchronized (controlLock)
       {
-         controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
-      }
-
-      synchronized (sockets)
-      {
-         if (!sockets.isEmpty())
+         if (log.isTraceEnabled()) log.trace(this + " writing Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
+         try
          {
-            Iterator it = sockets.iterator();
-            Socket socket = (Socket) it.next();
-            it.remove();
-            log.debug(this + " found socket (" + listenerId + "): " + socket);
-            return socket;
+            controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+            if (log.isTraceEnabled()) log.trace(this + " wrote Bisocket.CREATE_ORDINARY_SOCKET");
+            
+            synchronized (sockets)
+            {
+               if (!sockets.isEmpty())
+               {
+                  Iterator it = sockets.iterator();
+                  Socket socket = (Socket) it.next();
+                  it.remove();
+                  log.debug(this + " found socket (" + listenerId + "): " + socket);
+                  return socket;
+               }
+            }
          }
+         catch (IOException e)
+         {
+            log.debug(this + " unable to write Bisocket.CREATE_ORDINARY_SOCKET", e);
+         }
       }
       
       long timeRemaining = timeout; 
-      long start = System.currentTimeMillis(); 
+      long pingFailedWindow = 2 * pingWindow;
+      long pingFailedTimeRemaining = pingFailedWindow;
+      long start = System.currentTimeMillis();
+      OutputStream savedControlOutputStream = controlOutputStream;
 
-      while (isConnected() && !pingFailed.flag && (timeout == 0 || timeRemaining > 0))
+      while (isConnected() && (!pingFailed.flag || pingFailedTimeRemaining > 0) && (timeout == 0 || timeRemaining > 0))
       {
          synchronized (sockets)
-         {
+         {  
             try
             {
                sockets.wait(1000);
             }
             catch (InterruptedException e)
             {
-               log.debug("unexpected interrupt");
+               log.debug(this + " unexpected interrupt");
             }
             
             if (!sockets.isEmpty())
@@ -501,9 +537,26 @@
                return socket;
             }
          }
-
+         
+         if (savedControlOutputStream != controlOutputStream)
+         {
+            savedControlOutputStream = controlOutputStream;
+            log.debug(this + " rewriting Bisocket.CREATE_ORDINARY_SOCKET on " + controlOutputStream);
+            try
+            {
+               controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
+               log.debug(this + " rewrote Bisocket.CREATE_ORDINARY_SOCKET");
+            }
+            catch (IOException e)
+            {
+               log.debug(this + " unable to rewrite Bisocket.CREATE_ORDINARY_SOCKET" + e.getMessage());
+            }
+         }
+         
+         long elapsed = System.currentTimeMillis() - start;
          if (timeout > 0)
-            timeRemaining = timeout - (System.currentTimeMillis() - start);
+            timeRemaining = timeout - elapsed;
+         pingFailedTimeRemaining = pingFailedWindow - elapsed; 
       }
 
       if (!isConnected())
@@ -533,7 +586,7 @@
          controlSocket = socket;
          log.debug(this + " control socket replaced by: " + socket);
          controlOutputStream = controlSocket.getOutputStream();
-         controlInputStream = controlSocket.getInputStream();
+         log.debug("controlOutputStream replaced by: " + controlOutputStream);
       }
 
       if (pingTimerTask != null)
@@ -616,16 +669,18 @@
    {
       private Object controlLock;
       private OutputStream controlOutputStream;
-      private InputStream controlInputStream;
+      private int maxRetries;
+      private Exception savedException;
+      private boolean pingSent;
       private BooleanHolder pingFailed;
-      private boolean enablePingReplies;
-
+      
       PingTimerTask(BisocketClientInvoker invoker)
       {
          controlLock = invoker.controlLock;
          controlOutputStream = invoker.controlOutputStream;
-         controlInputStream = invoker.controlInputStream;
-         enablePingReplies = invoker.enablePingReplies;
+         maxRetries = invoker.getMaxRetries();
+         pingFailed = invoker.pingFailed;
+         pingFailed.flag = false;
       }
       
       public void shutDown()
@@ -633,7 +688,6 @@
          synchronized (controlLock)
          {
             controlOutputStream = null;
-            controlInputStream = null;
          }
          cancel();
          try
@@ -648,54 +702,42 @@
       }
 
       public void run()
-      {
-         boolean ok = false;
-         try
+      {     
+         pingSent = false;
+         
+         for (int i = 0; i < maxRetries; i++)
          {
-            synchronized (controlLock)
+            try
             {
-               if (controlOutputStream == null || controlInputStream == null)
-                  return;
-
-               controlOutputStream.write(Bisocket.PING);
-               if (enablePingReplies)
+               synchronized (controlLock)
                {
-                  int rep = controlInputStream.read();
-                  if (rep == -1)
-                  {
-                     // socket was closed!
-                     shutDown();
+                  if (controlOutputStream == null)
                      return;
-                  }
-                  if (rep != Bisocket.PING)
-                  {
-                     // um - protocol error
-                     log.warn("Protocol error: received unexpected reply to ping on control socket (are ping replies enabled on the server?); shutting down PingTimerTask");
-                     shutDown();
-                     return;
-                  }
+                  
+                  controlOutputStream.write(Bisocket.PING);
                }
-               ok = true;
+               pingSent = true;
+               break;
             }
+            catch (Exception e)
+            {
+               savedException = e;
+               log.debug("Unable to send ping: trying again");
+            }
          }
-         catch (Exception e)
+         
+         if (!pingSent)
          {
-            log.warn("Unable to send ping: shutting down PingTimerTask", e);
+            log.warn("Unable to send ping: shutting down PingTimerTask", savedException);
+            pingFailed.flag = true;
+            shutDown();  
          }
-         finally
-         {
-            if (! ok)
-            {
-               pingFailed.flag = true;
-               shutDown();
-            }
-         }
       }
    }
    
    static class BooleanHolder
    {
-      public volatile boolean flag;
+      public boolean flag;
       
       public BooleanHolder(boolean flag)
       {



More information about the jboss-remoting-commits mailing list