[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Wed Mar 14 01:06:25 EDT 2007


  User: rsigal  
  Date: 07/03/14 01:06:25

  Modified:    src/main/org/jboss/remoting/transport/bisocket  Tag:
                        remoting_2_x BisocketServerInvoker.java
  Log:
  JBREM-725, JBREM-726: (1) Creates control connection socket in loop; (2) fixed NPE in createControlConnection(); (3) added control connection teardown to handleInternalInvocation(); (4) added some synchronization;
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.1.2.15  +127 -20   JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
  retrieving revision 1.1.2.14
  retrieving revision 1.1.2.15
  diff -u -b -r1.1.2.14 -r1.1.2.15
  --- BisocketServerInvoker.java	12 Mar 2007 18:59:26 -0000	1.1.2.14
  +++ BisocketServerInvoker.java	14 Mar 2007 05:06:25 -0000	1.1.2.15
  @@ -25,6 +25,7 @@
   import java.io.DataInputStream;
   import java.io.DataOutputStream;
   import java.io.IOException;
  +import java.io.InterruptedIOException;
   import java.net.InetAddress;
   import java.net.ServerSocket;
   import java.net.Socket;
  @@ -57,7 +58,7 @@
   /**
    *
    * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  - * @version $Revision: 1.1.2.14 $
  + * @version $Revision: 1.1.2.15 $
    * <p>
    * Copyright Nov 23, 2006
    * </p>
  @@ -69,7 +70,7 @@
      private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
      private static Timer timer;
   
  -   private Map listenerIdToInvokerLocatorMap = new HashMap();
  +   private Map listenerIdToInvokerLocatorMap = Collections.synchronizedMap(new HashMap());
      private ServerSocket secondaryServerSocket;
      private InvokerLocator secondaryLocator;
      private SecondaryServerSocketThread secondaryServerSocketThread;
  @@ -77,6 +78,7 @@
      private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;
      private int pingWindowFactor = Bisocket.PING_WINDOW_FACTOR_DEFAULT;
      private int pingWindow = pingWindowFactor * pingFrequency;
  +   private int socketCreationRetries = Bisocket.MAX_RETRIES_DEFAULT;
      private ControlMonitorTimerTask controlMonitorTimerTask;
      protected boolean isCallbackServer = false;
   
  @@ -103,6 +105,22 @@
      {
         if (isCallbackServer)
         {
  +         Object val = configuration.get(Bisocket.MAX_RETRIES);
  +         if (val != null)
  +         {
  +            try
  +            {
  +               int nVal = Integer.valueOf((String) val).intValue();
  +               socketCreationRetries = nVal;
  +               log.debug("Setting socket creation retry limit: " + socketCreationRetries);
  +            }
  +            catch (Exception e)
  +            {
  +               log.warn("Could not convert " + Bisocket.MAX_RETRIES +
  +                     " value of " + val + " to an int value.");
  +            }
  +         }
  +         
            if(maxPoolSize <= 0)
            {
               maxPoolSize = MAX_POOL_SIZE_DEFAULT;
  @@ -155,6 +173,12 @@
            firstConnection = false;
            BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
   
  +         if (clientInvoker == null)
  +         {
  +            log.warn("Unable to retrieve client invoker: must have disconnected");
  +            throw new ClientUnavailableException();
  +         }
  +         
            try
            {
               locator = clientInvoker.getSecondaryLocator();
  @@ -162,7 +186,7 @@
            }
            catch (Throwable t)
            {
  -            log.error("unable to get secondary locator");
  +            log.error("unable to get secondary locator", t);
               throw new IOException("unable to get secondary locator: " + t.getMessage());
            }
         }
  @@ -176,10 +200,43 @@
         log.debug("creating control connection: " + locator);
   
         Socket socket = null;
  +      IOException savedException = null;
  +      
  +      for (int i = 0; i < socketCreationRetries; i++)
  +      {
  +         try
  +         {
         if (socketFactory != null)
            socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
         else
            socket = new Socket(locator.getHost(), locator.getPort());
  +         }
  +         catch (IOException e)
  +         {
  +            log.debug("Error creating a control socket", e);
  +            savedException = e;
  +         }
  +         
  +         if (socket != null)
  +            break;
  +         
  +         try
  +         {
  +            Thread.sleep(1000);
  +         }
  +         catch (InterruptedException e)
  +         {
  +            log.debug("received interrupt");
  +            throw new InterruptedIOException("interrupt in createControlConnection()");
  +         }
  +      }
  +
  +      if (socket == null)
  +      {
  +         log.error("unable to create control connection after "
  +                   + socketCreationRetries + " retries", savedException);
  +         throw savedException;
  +      }
   
         DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
         if (firstConnection)
  @@ -218,10 +275,21 @@
      public void destroyControlConnection(String listenerId) throws IOException
      {
         listenerIdToInvokerLocatorMap.remove(listenerId);
  -      Thread t = (Thread) controlConnectionThreadMap.remove(listenerId);
  +      Thread t = null;
  +      
  +      synchronized (controlConnectionThreadMap)
  +      {
  +         t = (Thread) controlConnectionThreadMap.remove(listenerId);
  +      }
  +      
  +      if (t != null)
  +      {
         ((ControlConnectionThread)t).shutdown();
  -      controlMonitorTimerTask.shutdown();
  -      controlMonitorTimerTask = null;
  +      }
  +      else
  +      {
  +         log.warn("unrecognized listener ID: " + listenerId);
  +      }
      }
      
      
  @@ -312,12 +380,17 @@
         if (controlMonitorTimerTask != null)
            controlMonitorTimerTask.shutdown();
   
  -      Iterator it = controlConnectionThreadMap.values().iterator();
  +      Iterator it = null;
  +      synchronized (controlConnectionThreadMap)
  +      {
  +         it = controlConnectionThreadMap.values().iterator();
  +      }
  +      
         while (it.hasNext())
         {
            ControlConnectionThread t = (ControlConnectionThread) it.next();
  -         t.shutdown();
            it.remove();
  +         t.shutdown();
         }
   
         if (secondaryServerSocketThread != null)
  @@ -452,6 +525,8 @@
               if (listenerId != null)
               {
                  listenerIdToServerInvokerMap.remove(listenerId);
  +               BisocketClientInvoker.removeBisocketClientInvoker(listenerId);
  +               destroyControlConnection(listenerId);
               }
            }
         }
  @@ -538,7 +613,9 @@
                     case Bisocket.CREATE_ORDINARY_SOCKET:
                        InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
                        
  -                     for (int i = 0; i < 10; i++)
  +                     IOException savedException = null;
  +                     
  +                     for (int i = 0; i < socketCreationRetries; i++)
                        {
                           try
                           {
  @@ -547,9 +624,10 @@
                              else
                                 socket = new Socket(locator.getHost(), locator.getPort());
                           }
  -                        catch (Exception e)
  +                        catch (IOException e)
                           {
                              log.debug("Error creating a socket", e);
  +                           savedException = e;
                           }
                           
                           if (socket != null)
  @@ -561,9 +639,24 @@
                           }
                           catch (InterruptedException e)
                           {
  -                           log.debug("unexpected interruption");
  +                           if (running)
  +                           {
  +                              log.warn("received unexpected interrupt");
  +                              continue;
  +                           }
  +                           else
  +                           {
  +                              return;
                           }
                        }
  +                     }
  +                     
  +                     if (socket == null)
  +                     {
  +                        log.error("Unable to create socket after " + socketCreationRetries 
  +                                  + " retries", savedException);
  +                        continue;
  +                     }
                        
                        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
                        dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
  @@ -747,7 +840,10 @@
            }
   
            if (controlConnectionThreads.isEmpty())
  +         {
               shutdown();
  +            return;
  +         }
   
            Iterator it = controlConnectionThreads.iterator();
            while (it.hasNext())
  @@ -763,7 +859,7 @@
                     controlConnectionThreadMap.remove(t.getListenerId());
                  }
   
  -               new Thread()
  +               Thread t2 = new Thread()
                  {
                     public void run()
                     {
  @@ -776,17 +872,28 @@
                           {
                              invoker.createControlConnection(t.getListenerId(), null);
                           }
  +                        catch (ClientUnavailableException e)
  +                        {
  +                           Object locator = listenerIdToInvokerLocatorMap.get(t.getListenerId());
  +                           log.warn("Unable to recreate control connection: " + locator);
  +                        }
                           catch (IOException e)
                           {
  -                           InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
  +                           Object locator = listenerIdToInvokerLocatorMap.get(t.getListenerId());
                              log.error("Unable to recreate control connection: " + locator, e);
                           }
                        }
                     }
  -               }.start();
  +               };
  +               t2.setName("controlConnectionRecreate:" + t.getName());
  +               t2.start();
   
               }
            }
         }
      }
  +   
  +   static class ClientUnavailableException extends IOException
  +   {
  +   }
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list