[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Sat Dec 30 02:57:15 EST 2006


  User: rsigal  
  Date: 06/12/30 02:57:15

  Modified:    src/main/org/jboss/remoting/transport/bisocket  Tag:
                        remoting_2_x BisocketServerInvoker.java
  Log:
  JBREM-650: (1) Replaced TimerUtil with separate Timer; (2) get new secondary locator on restar control connection restart; (3) control connection monitor doesn't fail if callback client never registered; (4) control socket can be created before client invoker is created; (5) control connection is restarted in new thread.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.1.2.4   +86 -92    JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
  retrieving revision 1.1.2.3
  retrieving revision 1.1.2.4
  diff -u -b -r1.1.2.3 -r1.1.2.4
  --- BisocketServerInvoker.java	20 Dec 2006 08:21:01 -0000	1.1.2.3
  +++ BisocketServerInvoker.java	30 Dec 2006 07:57:15 -0000	1.1.2.4
  @@ -35,6 +35,7 @@
   import java.util.Iterator;
   import java.util.LinkedList;
   import java.util.Map;
  +import java.util.Timer;
   import java.util.TimerTask;
   
   import org.jboss.remoting.Client;
  @@ -46,13 +47,12 @@
   import org.jboss.remoting.transport.socket.LRUPool;
   import org.jboss.remoting.transport.socket.ServerThread;
   import org.jboss.remoting.transport.socket.SocketServerInvoker;
  -import org.jboss.remoting.util.TimerUtil;
   
   
   /**
    *  
    * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  - * @version $Revision: 1.1.2.3 $
  + * @version $Revision: 1.1.2.4 $
    * <p>
    * Copyright Nov 23, 2006
    * </p>
  @@ -60,6 +60,7 @@
   public class BisocketServerInvoker extends SocketServerInvoker
   {  
      private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
  +   private static Timer timer;
   
      private Map listenerIdToInvokerLocatorMap = new HashMap();
      private ServerSocket secondaryServerSocket;
  @@ -78,12 +79,6 @@
      }
      
      
  -//   public static void addBisocketServerInvoker(String listenerId, BisocketServerInvoker invoker)
  -//   {
  -//      listenerIdToServerInvokerMap.put(listenerId, invoker);
  -//   }
  -   
  -   
      public BisocketServerInvoker(InvokerLocator locator)
      {
         super(locator);
  @@ -128,10 +123,8 @@
            secondaryServerSocketThread.setName("secondaryServerSocketThread");
            secondaryServerSocketThread.setDaemon(true);
            secondaryServerSocketThread.start();
  -         log.info("started secondary port: " + host + ":" + freePort);
  +         log.debug("started secondary port: " + host + ":" + freePort);
         }
  -//    ServerInvocationHandler handler = new InternalInvocationHandler(secondaryServerSocket);
  -//    addInvocationHandler(Bisocket.BISOCKET_INTERNAL_SUBSYSTEM, handler);
      }
      
      
  @@ -146,20 +139,30 @@
      {
         boolean firstConnection;
         
  -      // first connection
  +      // restarting connection
         if (locator == null)
         {
            firstConnection = false;
  -         locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
  +         BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
  +
  +         try
  +         {
  +            locator = clientInvoker.getSecondaryLocator();
         }
  -      // restarted connection
  +         catch (Throwable t)
  +         {
  +            log.error("unable to get secondary locator");
  +            throw new IOException("unable to get secondary locator: " + t.getMessage());
  +         }
  +      }
  +      // first connection
         else
         {
            firstConnection = true;
            listenerIdToInvokerLocatorMap.put(listenerId, locator);
         }
         
  -      log.info("creating control connection: " + locator);
  +      log.debug("creating control connection: " + locator);
         
         Socket socket = null;
         if (socketFactory != null)
  @@ -187,14 +190,16 @@
         }
         
         thread.start();
  -      log.info("created control connection: " + locator);
  +      log.debug("created control connection: " + socket.toString());
         
         if (controlMonitorTimerTask == null)
         {
  +         if (timer == null)
  +         {
  +            timer = new Timer();
  +         }
            controlMonitorTimerTask = new ControlMonitorTimerTask();
  -         TimerUtil.schedule(controlMonitorTimerTask, pingFrequency);
  -         log.info(this + ": scheduled ControlMonitorTimerTask: " + controlMonitorTimerTask);
  -         log.info("pingFrequency: " + pingFrequency);
  +         timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
         }
      }
      
  @@ -209,7 +214,6 @@
      {
         this.pingFrequency = pingFrequency;
         pingWindow = 2 * pingFrequency;
  -      log.info("set ping frequency: " + pingFrequency);
      }
      
      
  @@ -235,13 +239,10 @@
      protected void cleanup()
      {
         super.cleanup();
  -      log.info(this + ": $$$entering cleanup()");
  -      log.info("controlMonitorTimerTask: " + this.controlMonitorTimerTask);
         
         if (controlMonitorTimerTask != null)
  -      {
            controlMonitorTimerTask.shutdown();
  -      }
  +      
         Iterator it = controlConnectionThreadMap.values().iterator();
         while (it.hasNext())
         {
  @@ -249,19 +250,15 @@
            t.shutdown();
            it.remove();
         }
  -      log.info("secondaryServerSocketThread: " + secondaryServerSocketThread);
  +
         if (secondaryServerSocketThread != null)
  -      {
            secondaryServerSocketThread.shutdown();
  -      }
  +
         if (secondaryServerSocket != null)
         {
            try
            {
               secondaryServerSocket.close();
  -            log.info("closed secondary port: " +
  -                     secondaryServerSocket.getInetAddress() + ":" +
  -                     secondaryServerSocket.getLocalPort());
            }
            catch (IOException e)
            {
  @@ -289,7 +286,6 @@
      {
         if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
         {
  -         log.info("returning secondaryLocator: " + secondaryLocator); 
            return secondaryLocator;
         }
         
  @@ -304,7 +300,6 @@
               if (listenerId != null)
               {
                  listenerIdToServerInvokerMap.put(listenerId, this);
  -               log.info("registered " + listenerId + " -> " + this);
               }
            }
         }
  @@ -320,7 +315,7 @@
         private DataInputStream dis;
         private boolean running;
         private int errorCount;
  -      private long lastPing = System.currentTimeMillis();
  +      private long lastPing = -1;
         
         ControlConnectionThread(Socket socket, String listenerId) throws IOException
         {
  @@ -349,9 +344,16 @@
         
         boolean checkConnection()
         {
  +         if (lastPing < 0)
  +         {
  +            return true;
  +         }
            long currentTime = System.currentTimeMillis();
  -         log.info("elapsed: " + (currentTime - lastPing));
  -         log.info("returning: " + ((currentTime - lastPing > pingWindow) ? false : true)); 
  +         
  +         if (log.isTraceEnabled())
  +         {
  +            log.trace("elapsed: " + (currentTime - lastPing));
  +         }
            return (currentTime - lastPing <= pingWindow);
         }
         
  @@ -362,7 +364,6 @@
         
         public void run()
         {
  -         log.info("starting ControlConnectionThread");
            running = true;
            while (running)
            {
  @@ -376,8 +377,6 @@
                  switch (action)
                  {
                     case Bisocket.CREATE_ORDINARY_SOCKET:
  -//                     String listenerId = dis.readUTF();
  -//                     dis.readUTF();
                        InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
                        if (socketFactory != null)
                           socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
  @@ -389,8 +388,6 @@
                        break;
                        
                     case Bisocket.PING:
  -                     log.info("got ping");
  -//                     log.info("lastPing: " + lastPing);
                        continue;
                        
                     case -1:
  @@ -407,12 +404,13 @@
                  if (running)
                  {
                     if ("socket closed".equals(e.getMessage()) || 
  -                      "Socket is closed".equals(e.getMessage()))
  +                      "Socket is closed".equals(e.getMessage()) ||
  +                      "Connection reset".equals(e.getMessage()))
                     {
                        shutdown();
                        return;
                     }
  -                  log.error("Unable to create new Socket: " + e.getMessage());
  +                  log.error("Unable to read from control connection: " + e.getMessage());
                     e.printStackTrace();
                     if (++errorCount > 5)
                     {
  @@ -436,7 +434,9 @@
                                                  clientpool, threadpool,
                                                  getTimeout(), serverSocketClass);
                        thread.start();
  -                     log.info("created: " + thread);
  +                     
  +                     if (log.isDebugEnabled())
  +                        log.debug("created: " + thread);
                     }
                     catch (Exception e)
                     {
  @@ -455,12 +455,6 @@
      }
      
      
  -//   protected ServerThread createServerThread(Socket socket) throws Exception
  -//   {
  -//      return new BisocketServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
  -//   }
  -   
  -   
      class SecondaryServerSocketThread extends Thread
      {
         private ServerSocket secondaryServerSocket;
  @@ -475,7 +469,6 @@
         {
            running = false;
            interrupt();
  -         log.info(this + ": shut down");
         }
         
         public void run()
  @@ -485,6 +478,7 @@
               try
               {
                  Socket socket = secondaryServerSocket.accept();
  +               log.debug("accepted: " + socket);
                  DataInputStream dis = new DataInputStream(socket.getInputStream());
                  int action = dis.read();
                  String listenerId = dis.readUTF();
  @@ -493,14 +487,22 @@
                  {
                     case Bisocket.CREATE_CONTROL_SOCKET:
                        BisocketClientInvoker invoker;
  -                     invoker =  BisocketClientInvoker.getBisocketClientInvoker(listenerId);
  +                     invoker =  BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
  +                     if (invoker == null)
  +                     {
  +                        log.debug("SecondaryServerSocketThread: transferring socket: " + listenerId);
  +                        BisocketClientInvoker.transferSocket(listenerId, socket);
  +                     }
  +                     else
  +                     {
                        invoker.replaceControlSocket(socket);
  -                     log.info("SecondaryServerSocketThread: created secondary socket: " + listenerId);
  +                        log.debug("SecondaryServerSocketThread: created secondary socket: " + listenerId);
  +                     }
                        break;
                        
                     case Bisocket.CREATE_ORDINARY_SOCKET: 
                        BisocketClientInvoker.transferSocket(listenerId, socket);
  -                     log.info("SecondaryServerSocketThread: transferred socket: " + listenerId);
  +                     log.debug("SecondaryServerSocketThread: transferred socket: " + listenerId);
                        break;
                        
                     default:
  @@ -533,30 +535,43 @@
         {
            running = false;
            cancel();
  -         log.info("shutting down " + this);
         }
         
         public void run()
         {
  +         if (log.isTraceEnabled())
  +            log.trace("checking connections");
  +         
            Collection controlConnectionThreads = null;
            synchronized (controlConnectionThreadMap)
            {
               controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
            }
            
  +         if (controlConnectionThreads.isEmpty())
  +            cancel();
  +         
            Iterator it = controlConnectionThreads.iterator();
            while (it.hasNext() & running)
            {
  -            ControlConnectionThread t = (ControlConnectionThread) it.next();
  +            final ControlConnectionThread t = (ControlConnectionThread) it.next();
               if (!t.checkConnection())
               {
  -               log.info("detected failure on control connection: requesting new control connection");
  +               log.warn(this + ": detected failure on control connection " + t + ": requesting new control connection");
                  t.shutdown();
  -               it.remove();
  +               
  +               synchronized (controlConnectionThreadMap)
  +               {
  +                  controlConnectionThreadMap.remove(t.getListenerId());
  +               }
                  
                  if (!running)
                     return;
                  
  +               new Thread()
  +               {
  +                  public void run()
  +                  {
                  try
                  {
                     createControlConnection(t.getListenerId(), null);
  @@ -564,34 +579,13 @@
                  catch (IOException e)
                  {
                     InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
  -                  log.error(this + ": " + "Unable to recreate control connection: " + locator, e);
  -                  e.printStackTrace();
  +                        log.error("Unable to recreate control connection: " + locator, e);
  +                     }  
                  }
  +               }.start();
  +
               }
            }
         }
      }
  -   
  -   
  -//   class InternalInvocationHandler implements ServerInvocationHandler
  -//   {
  -//      InvokerLocator locator;
  -//      
  -//      InternalInvocationHandler(ServerSocket ss)
  -//      {
  -//         String host = ss.getInetAddress().getHostAddress();
  -//         int port = ss.getLocalPort();
  -//         locator = new InvokerLocator(null, host, port, null, null);   
  -//      }
  -//      
  -//      public Object invoke(InvocationRequest invocation) throws Throwable
  -//      {
  -//         return locator;
  -//      }
  -//
  -//      public void setMBeanServer(MBeanServer server) {}
  -//      public void setInvoker(ServerInvoker invoker) {}
  -//      public void addListener(InvokerCallbackHandler callbackHandler) {}
  -//      public void removeListener(InvokerCallbackHandler callbackHandler) {}
  -//   }
   }
  \ No newline at end of file
  
  
  



More information about the jboss-cvs-commits mailing list