[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Sun Mar 11 16:11:16 EDT 2007


  User: rsigal  
  Date: 07/03/11 16:11:16

  Modified:    src/main/org/jboss/remoting/transport/bisocket 
                        BisocketServerInvoker.java
  Log:
  JBREM-721, JBREM-722, JBREM-723: (1) Made ControlMonitorTimerTask static; (2) added destroyControlConnection(); (3) handleInternalInvocation() looks for REMOVECLIENTLISTENER messages; (4) ControlConnectionThread gives a new control connection 5 ping cycles before declaring it nonfunctional.
  
  Revision  Changes    Path
  1.8       +99 -28    JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- BisocketServerInvoker.java	23 Feb 2007 07:04:00 -0000	1.7
  +++ BisocketServerInvoker.java	11 Mar 2007 20:11:16 -0000	1.8
  @@ -57,7 +57,7 @@
   /**
    *
    * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  - * @version $Revision: 1.7 $
  + * @version $Revision: 1.8 $
    * <p>
    * Copyright Nov 23, 2006
    * </p>
  @@ -149,15 +149,16 @@
      {
         boolean firstConnection;
   
  -      // restarting connection
         if (locator == null)
         {
  +         // restarting connection
            firstConnection = false;
            BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
   
            try
            {
               locator = clientInvoker.getSecondaryLocator();
  +            listenerIdToInvokerLocatorMap.put(listenerId, locator);
            }
            catch (Throwable t)
            {
  @@ -165,9 +166,9 @@
               throw new IOException("unable to get secondary locator: " + t.getMessage());
            }
         }
  -      // first connection
         else
         {
  +         // first connection
            firstConnection = true;
            listenerIdToInvokerLocatorMap.put(listenerId, locator);
         }
  @@ -208,12 +209,22 @@
            {
               timer = new Timer(true);
            }
  -         controlMonitorTimerTask = new ControlMonitorTimerTask();
  +         controlMonitorTimerTask = new ControlMonitorTimerTask(this);
            timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
         }
      }
   
   
  +   public void destroyControlConnection(String listenerId) throws IOException
  +   {
  +      listenerIdToInvokerLocatorMap.remove(listenerId);
  +      Thread t = (Thread) controlConnectionThreadMap.remove(listenerId);
  +      ((ControlConnectionThread)t).shutdown();
  +      controlMonitorTimerTask.shutdown();
  +      controlMonitorTimerTask = null;
  +   }
  +   
  +   
      public int getPingFrequency()
      {
         return pingFrequency;
  @@ -432,6 +443,18 @@
               }
            }
         }
  +      else if(InternalInvocation.REMOVECLIENTLISTENER.equals(ii.getMethodName()))
  +      {
  +         Map metadata = ir.getRequestPayload();
  +         if(metadata != null)
  +         {
  +            String listenerId = (String) metadata.get(Client.LISTENER_ID_KEY);
  +            if (listenerId != null)
  +            {
  +               listenerIdToServerInvokerMap.remove(listenerId);
  +            }
  +         }
  +      }
   
         return response;
      }
  @@ -445,6 +468,7 @@
         private boolean running;
         private int errorCount;
         private long lastPing = -1;
  +      private int initialAttempts;
   
         ControlConnectionThread(Socket socket, String listenerId) throws IOException
         {
  @@ -473,10 +497,15 @@
   
         boolean checkConnection()
         {
  -         if (lastPing < 0)
  +         if (lastPing < 0 && initialAttempts++ < MAX_INITIAL_ATTEMPTS)
            {
               return true;
            }
  +         else if (lastPing < 0)
  +         {
  +            return false;
  +         }
  +         
            long currentTime = System.currentTimeMillis();
   
            if (log.isTraceEnabled())
  @@ -507,10 +536,34 @@
                  {
                     case Bisocket.CREATE_ORDINARY_SOCKET:
                        InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
  +                     
  +                     for (int i = 0; i < 10; i++)
  +                     {
  +                        try
  +                        {
                        if (socketFactory != null)
                           socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
                        else
                           socket = new Socket(locator.getHost(), locator.getPort());
  +                        }
  +                        catch (Exception e)
  +                        {
  +                           log.debug("Error creating a socket", e);
  +                        }
  +                        
  +                        if (socket != null)
  +                           break;
  +                        
  +                        try
  +                        {
  +                           Thread.sleep(1000);
  +                        }
  +                        catch (InterruptedException e)
  +                        {
  +                           log.debug("unexpected interruption");
  +                        }
  +                     }
  +                     
                        DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
                        dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
                        dos.writeUTF(listenerId);
  @@ -539,8 +592,7 @@
                        shutdown();
                        return;
                     }
  -                  log.error("Unable to read from control connection: " + e.getMessage());
  -                  e.printStackTrace();
  +                  log.error("Unable to process control connection: " + e.getMessage(), e);
                     if (++errorCount > 5)
                     {
                        shutdown();
  @@ -656,18 +708,34 @@
      }
   
   
  -   class ControlMonitorTimerTask extends TimerTask
  +   static class ControlMonitorTimerTask extends TimerTask
      {
         private boolean running = true;
  +      private BisocketServerInvoker invoker;
  +      private Map listenerIdToInvokerLocatorMap;
  +      private Map controlConnectionThreadMap;
   
  -      void shutdown()
  +      ControlMonitorTimerTask(BisocketServerInvoker invoker)
  +      {
  +         this.invoker = invoker;
  +         listenerIdToInvokerLocatorMap = invoker.listenerIdToInvokerLocatorMap;
  +         controlConnectionThreadMap = invoker.controlConnectionThreadMap;
  +      }
  +
  +      synchronized void shutdown()
         {
            running = false;
  +            invoker = null;
  +            listenerIdToInvokerLocatorMap = null;
  +            controlConnectionThreadMap = null;
            cancel();
         }
   
  -      public void run()
  +      public synchronized void run()
         {
  +         if (!running)
  +            return;
  +         
            if (log.isTraceEnabled())
               log.trace("checking connections");
   
  @@ -678,10 +746,10 @@
            }
   
            if (controlConnectionThreads.isEmpty())
  -            cancel();
  +            shutdown();
   
            Iterator it = controlConnectionThreads.iterator();
  -         while (it.hasNext() & running)
  +         while (it.hasNext())
            {
               final ControlConnectionThread t = (ControlConnectionThread) it.next();
               if (!t.checkConnection())
  @@ -694,16 +762,18 @@
                     controlConnectionThreadMap.remove(t.getListenerId());
                  }
   
  -               if (!running)
  -                  return;
  -
                  new Thread()
                  {
                     public void run()
                     {
  +                     synchronized (ControlMonitorTimerTask.this)
  +                     {
  +                        if (!running)
  +                           return;
  +                        
                        try
                        {
  -                        createControlConnection(t.getListenerId(), null);
  +                           invoker.createControlConnection(t.getListenerId(), null);
                        }
                        catch (IOException e)
                        {
  @@ -711,6 +781,7 @@
                           log.error("Unable to recreate control connection: " + locator, e);
                        }
                     }
  +                  }
                  }.start();
   
               }
  
  
  



More information about the jboss-cvs-commits mailing list