[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Tue Jan 16 03:16:43 EST 2007


  User: rsigal  
  Date: 07/01/16 03:16:43

  Modified:    src/main/org/jboss/remoting/transport/bisocket  
                        BisocketServerInvoker.java
                        BisocketClientInvoker.java
  Log:
  JBREM-650:  Updated to conform to new version on remoting_2_x branch.
  
  Revision  Changes    Path
  1.3       +155 -182  JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketServerInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketServerInvoker.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- BisocketServerInvoker.java	27 Dec 2006 05:58:10 -0000	1.2
  +++ BisocketServerInvoker.java	16 Jan 2007 08:16:43 -0000	1.3
  @@ -35,6 +35,7 @@
   import java.util.Iterator;
   import java.util.LinkedList;
   import java.util.Map;
  +import java.util.Timer;
   import java.util.TimerTask;
   
   import org.jboss.remoting.Client;
  @@ -46,20 +47,23 @@
   import org.jboss.remoting.transport.socket.LRUPool;
   import org.jboss.remoting.transport.socket.ServerThread;
   import org.jboss.remoting.transport.socket.SocketServerInvoker;
  -import org.jboss.remoting.util.TimerUtil;
  +import org.jboss.logging.Logger;
   
   
   /**
    *  
    * @author <a href="ron.sigal at jboss.com">Ron Sigal</a>
  - * @version $Revision: 1.2 $
  + * @version $Revision: 1.3 $
    * <p>
    * Copyright Nov 23, 2006
    * </p>
    */
   public class BisocketServerInvoker extends SocketServerInvoker
   {  
  +   private static final Logger log = Logger.getLogger(BisocketServerInvoker.class);
  +
      private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
  +   private static Timer timer;
   
      private Map listenerIdToInvokerLocatorMap = new HashMap();
      private ServerSocket secondaryServerSocket;
  @@ -78,12 +82,6 @@
      }
      
      
  -//   public static void addBisocketServerInvoker(String listenerId, BisocketServerInvoker invoker)
  -//   {
  -//      listenerIdToServerInvokerMap.put(listenerId, invoker);
  -//   }
  -   
  -   
      public BisocketServerInvoker(InvokerLocator locator)
      {
         super(locator);
  @@ -128,11 +126,8 @@
            secondaryServerSocketThread.setName("secondaryServerSocketThread");
            secondaryServerSocketThread.setDaemon(true);
            secondaryServerSocketThread.start();
  -         log.info("started secondary port: " + host + ":" + freePort);
  +         log.debug("started secondary port: " + host + ":" + freePort);
         }
  -      log.info("started: " + this);
  -//    ServerInvocationHandler handler = new InternalInvocationHandler(secondaryServerSocket);
  -//    addInvocationHandler(Bisocket.BISOCKET_INTERNAL_SUBSYSTEM, handler);
      }
      
      
  @@ -147,44 +142,47 @@
      {
         boolean firstConnection;
         
  -      // first connection
  +      // restarting connection
         if (locator == null)
         {
            firstConnection = false;
  -         locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
  +         BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
  +
  +         try
  +         {
  +            locator = clientInvoker.getSecondaryLocator();
  +         }
  +         catch (Throwable t)
  +         {
  +            log.error("unable to get secondary locator");
  +            throw new IOException("unable to get secondary locator: " + t.getMessage());
         }
  -      // restarted connection
  +      }
  +      // first connection
         else
         {
            firstConnection = true;
            listenerIdToInvokerLocatorMap.put(listenerId, locator);
         }
         
  -      log.info(this + ": creating control connection: " + locator);
  +      log.debug("creating control connection: " + locator);
         
         Socket socket = null;
         if (socketFactory != null)
            socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
         else
            socket = new Socket(locator.getHost(), locator.getPort());
  -      log.info(this + ": created socket: " + socket);
         
         DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
  -      log.info("got DataOutputStream: " + dos);
         if (firstConnection)
         {
  -         log.info("writing: " + Bisocket.CREATE_ORDINARY_SOCKET);
            dos.write(Bisocket.CREATE_ORDINARY_SOCKET);
  -         log.info("wrote: " + Bisocket.CREATE_ORDINARY_SOCKET);
         }
         else
         {
  -         log.info("writing: " + Bisocket.CREATE_CONTROL_SOCKET);
            dos.write(Bisocket.CREATE_CONTROL_SOCKET);
  -         log.info("wrote: " + Bisocket.CREATE_CONTROL_SOCKET);
         }
         dos.writeUTF(listenerId);
  -      log.info(this + ": wrote listenerId: " + listenerId);
         Thread thread = new ControlConnectionThread(socket, listenerId);
         thread.setName("control: " + socket.toString());
         thread.setDaemon(true);
  @@ -195,14 +193,16 @@
         }
         
         thread.start();
  -      log.info("created control connection: " + locator);
  +      log.debug("created control connection: " + socket.toString());
         
         if (controlMonitorTimerTask == null)
         {
  +         if (timer == null)
  +         {
  +            timer = new Timer();
  +         }
            controlMonitorTimerTask = new ControlMonitorTimerTask();
  -         TimerUtil.schedule(controlMonitorTimerTask, pingFrequency);
  -         log.info(this + ": scheduled ControlMonitorTimerTask: " + controlMonitorTimerTask);
  -         log.info("pingFrequency: " + pingFrequency);
  +         timer.schedule(controlMonitorTimerTask, pingFrequency, pingFrequency);
         }
      }
      
  @@ -217,7 +217,6 @@
      {
         this.pingFrequency = pingFrequency;
         pingWindow = 2 * pingFrequency;
  -      log.info("set ping frequency: " + pingFrequency);
      }
      
      
  @@ -243,13 +242,10 @@
      protected void cleanup()
      {
         super.cleanup();
  -      log.info(this + ": $$$entering cleanup()");
  -      log.info("controlMonitorTimerTask: " + this.controlMonitorTimerTask);
         
         if (controlMonitorTimerTask != null)
  -      {
            controlMonitorTimerTask.shutdown();
  -      }
  +
         Iterator it = controlConnectionThreadMap.values().iterator();
         while (it.hasNext())
         {
  @@ -257,19 +253,15 @@
            t.shutdown();
            it.remove();
         }
  -      log.info("secondaryServerSocketThread: " + secondaryServerSocketThread);
  +
         if (secondaryServerSocketThread != null)
  -      {
            secondaryServerSocketThread.shutdown();
  -      }
  +
         if (secondaryServerSocket != null)
         {
            try
            {
               secondaryServerSocket.close();
  -            log.info("closed secondary port: " +
  -                     secondaryServerSocket.getInetAddress() + ":" +
  -                     secondaryServerSocket.getLocalPort());
            }
            catch (IOException e)
            {
  @@ -297,7 +289,6 @@
      {
         if(Bisocket.GET_SECONDARY_INVOKER_LOCATOR.equals(ii.getMethodName()))
         {
  -         log.info("returning secondaryLocator: " + secondaryLocator); 
            return secondaryLocator;
         }
         
  @@ -312,8 +303,6 @@
               if (listenerId != null)
               {
                  listenerIdToServerInvokerMap.put(listenerId, this);
  -               log.info("registered " + listenerId + " -> " + this);
  -//               new Exception().printStackTrace();
               }
            }
         }
  @@ -360,12 +349,14 @@
         {
            if (lastPing < 0)
            {
  -            log.info("returning true");
               return true;
            }
            long currentTime = System.currentTimeMillis();
  -         log.info(this + ": elapsed: " + (currentTime - lastPing));
  -         log.info("returning: " + ((currentTime - lastPing > pingWindow) ? false : true)); 
  +
  +         if (log.isTraceEnabled())
  +         {
  +            log.trace("elapsed: " + (currentTime - lastPing));
  +         }
            return (currentTime - lastPing <= pingWindow);
         }
         
  @@ -376,7 +367,6 @@
         
         public void run()
         {
  -         log.info(this + ": starting ControlConnectionThread");
            running = true;
            while (running)
            {
  @@ -390,8 +380,6 @@
                  switch (action)
                  {
                     case Bisocket.CREATE_ORDINARY_SOCKET:
  -//                     String listenerId = dis.readUTF();
  -//                     dis.readUTF();
                        InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(listenerId);
                        if (socketFactory != null)
                           socket = socketFactory.createSocket(locator.getHost(), locator.getPort());
  @@ -403,8 +391,6 @@
                        break;
                        
                     case Bisocket.PING:
  -                     log.info(this + ": got ping");
  -//                     log.info("lastPing: " + lastPing);
                        continue;
                        
                     case -1:
  @@ -421,12 +407,13 @@
                  if (running)
                  {
                     if ("socket closed".equals(e.getMessage()) || 
  -                      "Socket is closed".equals(e.getMessage()))
  +                      "Socket is closed".equals(e.getMessage()) ||
  +                      "Connection reset".equals(e.getMessage()))
                     {
                        shutdown();
                        return;
                     }
  -                  log.error("Unable to create new Socket: " + e.getMessage());
  +                  log.error("Unable to read from control connection: " + e.getMessage());
                     e.printStackTrace();
                     if (++errorCount > 5)
                     {
  @@ -450,7 +437,9 @@
                                                  clientpool, threadpool,
                                                  getTimeout(), serverSocketClass);
                        thread.start();
  -                     log.info("created: " + thread);
  +
  +                     if (log.isDebugEnabled())
  +                        log.debug("created: " + thread);
                     }
                     catch (Exception e)
                     {
  @@ -469,12 +458,6 @@
      }
      
      
  -//   protected ServerThread createServerThread(Socket socket) throws Exception
  -//   {
  -//      return new BisocketServerThread(socket, this, clientpool, threadpool, getTimeout(), serverSocketClass);
  -//   }
  -   
  -   
      class SecondaryServerSocketThread extends Thread
      {
         private ServerSocket secondaryServerSocket;
  @@ -489,7 +472,6 @@
         {
            running = false;
            interrupt();
  -         log.info(this + ": shut down");
         }
         
         public void run()
  @@ -499,32 +481,31 @@
               try
               {
                  Socket socket = secondaryServerSocket.accept();
  -               log.info(this + ": accepted: " + socket);
  +               log.debug("accepted: " + socket);
                  DataInputStream dis = new DataInputStream(socket.getInputStream());
                  int action = dis.read();
  -               log.info(this + ": action: " + action);
                  String listenerId = dis.readUTF();
  -               log.info(this + ": listenerId: " + listenerId);
                  
                  switch (action)
                  {
                     case Bisocket.CREATE_CONTROL_SOCKET:
                        BisocketClientInvoker invoker;
  -                     invoker =  BisocketClientInvoker.getBisocketClientInvoker(listenerId);
  -                     log.info("calling replaceControlSocket()");
  +                     invoker =  BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
                        if (invoker == null)
                        {
  -                        log.error("unrecognized listenerId: " + listenerId);
  -                        log.error("unable to create control socket");
  -                        continue;
  +                        log.debug("SecondaryServerSocketThread: transferring socket: " + listenerId);
  +                        BisocketClientInvoker.transferSocket(listenerId, socket);
                        }
  +                     else
  +                     {
                        invoker.replaceControlSocket(socket);
  -                     log.info("SecondaryServerSocketThread: created secondary socket: " + listenerId);
  +                        log.debug("SecondaryServerSocketThread: created secondary socket: " + listenerId);
  +                     }
                        break;
                        
                     case Bisocket.CREATE_ORDINARY_SOCKET: 
                        BisocketClientInvoker.transferSocket(listenerId, socket);
  -                     log.info("SecondaryServerSocketThread: transferred socket: " + listenerId);
  +                     log.debug("SecondaryServerSocketThread: transferred socket: " + listenerId);
                        break;
                        
                     default:
  @@ -557,30 +538,43 @@
         {
            running = false;
            cancel();
  -         log.info("shutting down " + this);
         }
         
         public void run()
         {
  +         if (log.isTraceEnabled())
  +            log.trace("checking connections");
  +
            Collection controlConnectionThreads = null;
            synchronized (controlConnectionThreadMap)
            {
               controlConnectionThreads = new HashSet(controlConnectionThreadMap.values());
            }
            
  +         if (controlConnectionThreads.isEmpty())
  +            cancel();
  +
            Iterator it = controlConnectionThreads.iterator();
            while (it.hasNext() & running)
            {
  -            ControlConnectionThread t = (ControlConnectionThread) it.next();
  +            final ControlConnectionThread t = (ControlConnectionThread) it.next();
               if (!t.checkConnection())
               {
  -               log.info(this + ": detected failure on control connection: requesting new control connection");
  +               log.warn(this + ": detected failure on control connection " + t + ": requesting new control connection");
                  t.shutdown();
  -               it.remove();
  +
  +               synchronized (controlConnectionThreadMap)
  +               {
  +                  controlConnectionThreadMap.remove(t.getListenerId());
  +               }
                  
                  if (!running)
                     return;
                  
  +               new Thread()
  +               {
  +                  public void run()
  +                  {
                  try
                  {
                     createControlConnection(t.getListenerId(), null);
  @@ -588,34 +582,13 @@
                  catch (IOException e)
                  {
                     InvokerLocator locator = (InvokerLocator) listenerIdToInvokerLocatorMap.get(t.getListenerId());
  -                  log.error(this + ": " + "Unable to recreate control connection: " + locator, e);
  -                  e.printStackTrace();
  +                        log.error("Unable to recreate control connection: " + locator, e);
  +                     }
                  }
  +               }.start();
  +
               }
            }
         }
      }
  -   
  -   
  -//   class InternalInvocationHandler implements ServerInvocationHandler
  -//   {
  -//      InvokerLocator locator;
  -//      
  -//      InternalInvocationHandler(ServerSocket ss)
  -//      {
  -//         String host = ss.getInetAddress().getHostAddress();
  -//         int port = ss.getLocalPort();
  -//         locator = new InvokerLocator(null, host, port, null, null);   
  -//      }
  -//      
  -//      public Object invoke(InvocationRequest invocation) throws Throwable
  -//      {
  -//         return locator;
  -//      }
  -//
  -//      public void setMBeanServer(MBeanServer server) {}
  -//      public void setInvoker(ServerInvoker invoker) {}
  -//      public void addListener(InvokerCallbackHandler callbackHandler) {}
  -//      public void removeListener(InvokerCallbackHandler callbackHandler) {}
  -//   }
   }
  \ No newline at end of file
  
  
  
  1.3       +71 -44    JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -b -r1.2 -r1.3
  --- BisocketClientInvoker.java	27 Dec 2006 05:58:10 -0000	1.2
  +++ BisocketClientInvoker.java	16 Jan 2007 08:16:43 -0000	1.3
  @@ -31,6 +31,7 @@
   import java.util.Iterator;
   import java.util.Map;
   import java.util.Set;
  +import java.util.Timer;
   import java.util.TimerTask;
   
   import org.jboss.logging.Logger;
  @@ -43,11 +44,20 @@
   import org.jboss.remoting.marshal.UnMarshaller;
   import org.jboss.remoting.transport.BidirectionalClientInvoker;
   import org.jboss.remoting.transport.socket.SocketClientInvoker;
  -import org.jboss.remoting.util.TimerUtil;
   
   /**
  - * BisocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker,
  - * which must be a BisocketServerInvoker.
  + * The bisocket transport, an extension of the socket transport, is designed to allow
  + * a callback server to function behind a firewall.  All connections are created by
  + * a Socket constructor or factory on the client side connecting to a ServerSocket on
  + * the server side.  When a callback client invoker on the server side needs to
  + * open a connection to the callback server, it requests a connection by sending a
  + * request message over a control connection to the client side.
  + * 
  + * Because all connections are created in one direction, the bisocket transport is
  + * asymmetric, in the sense that client invokers and server invokers behave differently
  + * on the client side and on the server side.
  + * 
  + * 
    * 
    * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
    */
  @@ -57,15 +67,19 @@
   {  
      private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
      private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
  +   private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
      private static Map listenerIdToSocketsMap = new HashMap();
  +   private static Timer timer;
    
  -   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
      protected String listenerId;
  +   
  +   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
      private InvokerLocator secondaryLocator;
      private Socket controlSocket;
      private OutputStream controlOutputStream;
      private Object controlLock = new Object();
      private PingTimerTask pingTimerTask;
  +   private boolean isCallbackInvoker;
   
      
      static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
  @@ -74,6 +88,12 @@
      }
      
      
  +   static BisocketClientInvoker getBisocketCallbackClientInvoker(String listenerId)
  +   {
  +      return (BisocketClientInvoker) listenerIdToCallbackClientInvokerMap.get(listenerId);
  +   }
  +   
  +   
      static void transferSocket(String listenerId, Socket socket)
      {
         Set sockets = null;
  @@ -111,7 +131,8 @@
            listenerId = (String) config.get(Client.LISTENER_ID_KEY);
            if (listenerId != null)
            {
  -            listenerIdToClientInvokerMap.put(listenerId, this);
  +            isCallbackInvoker = true;
  +            listenerIdToCallbackClientInvokerMap.put(listenerId, this);
               
               synchronized (listenerIdToSocketsMap)
               {
  @@ -119,7 +140,7 @@
                     listenerIdToSocketsMap.put(listenerId, new HashSet());
               }
               
  -            log.info("registered " + listenerId + " -> " + this);
  +            log.debug("registered " + listenerId + " -> " + this);
            }
   
               // look for socketTimeout param
  @@ -151,7 +172,6 @@
      public void setPingFrequency(int pingFrequency)
      {
         this.pingFrequency = pingFrequency;
  -      log.info("set ping frequency: " + pingFrequency);
      }
      
      
  @@ -162,22 +182,13 @@
         // Callback client on server side.
         if (listenerId != null)
         {
  -//         pingTimerTask = new PingTimerTask();
  -//         TimerUtil.schedule(pingTimerTask, pingFrequency);
  -//         log.info("scheduled PingTimerTask");
            return;
         }
         
         // Client on client side.
         try
         {
  -         InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
  -         InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
  -//         secondaryLocator = (InvokerLocator) invoke(r);
  -         Object o = invoke(r);
  -         log.info("secondary locator: " + o);
  -         secondaryLocator = (InvokerLocator) o;
  -         log.info("got secondary locator: " + secondaryLocator);
  +         secondaryLocator = getSecondaryLocator();
         }
         catch (Throwable e)
         {
  @@ -192,7 +203,11 @@
         super.handleDisconnect();
         if (listenerId != null)
         {
  +         if (isCallbackInvoker)
  +            listenerIdToCallbackClientInvokerMap.remove(listenerId);
  +         else
            listenerIdToClientInvokerMap.remove(listenerId);
  +
            listenerIdToSocketsMap.remove(listenerId);
            if (pingTimerTask != null)
               pingTimerTask.shutDown();
  @@ -215,6 +230,7 @@
               {
                  Map requestPayload = ir.getRequestPayload();
                  String listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
  +               listenerIdToClientInvokerMap.put(listenerId, this);
                  BisocketServerInvoker callbackServerInvoker;
                  callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
                  callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
  @@ -270,18 +286,18 @@
               controlSocket = (Socket) it.next();
               it.remove();
               controlOutputStream = controlSocket.getOutputStream();
  -            log.info("got control socket");
  +            log.debug("got control socket");
               pingTimerTask = new PingTimerTask();
  -            TimerUtil.schedule(pingTimerTask, pingFrequency);
  -            log.info("scheduled PingTimerTask: " + pingTimerTask);
  +            if (timer == null)
  +            {
  +               timer = new Timer();
  +            }
  +            timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
            }
         }
  -  
  -      log.info("requesting socket");
         
         synchronized (controlLock)
         {
  -         log.info("writing CREATE_ORDINARY_SOCKET");
            controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
         }
   
  @@ -309,7 +325,7 @@
            Iterator it = sockets.iterator();
            Socket socket = (Socket) it.next();
            it.remove();
  -         log.info("socket found");
  +         log.debug("socket found");
            return socket;
         }
      }
  @@ -317,23 +333,35 @@
      
      void replaceControlSocket(Socket socket) throws IOException
      {
  -      log.info("entering replaceControlSocket");
         synchronized (controlLock)
         {
            controlSocket = socket;
            controlOutputStream = controlSocket.getOutputStream();
  -         log.info("replaced control socket");
  +         log.debug("replaced control socket");
         }
         
         if (pingTimerTask != null)
            pingTimerTask.cancel();
         
         pingTimerTask = new PingTimerTask();
  -      TimerUtil.schedule(pingTimerTask, pingFrequency);
  -      log.info("replaced PingTimerTask: " + pingTimerTask);
  +      if (timer == null)
  +      {
  +         timer = new Timer();
  +      }
  +      timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
      }
      
      
  +   InvokerLocator getSecondaryLocator() throws Throwable
  +   {
  +      InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
  +      InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
  +      Object o = invoke(r);
  +      log.debug("secondary locator: " + o);
  +      secondaryLocator = (InvokerLocator) o;
  +      return secondaryLocator;
  +   }
  +   
      protected Object checkType(Object o, Class c) throws IOException
      {
         if (c.isInstance(o))
  @@ -386,7 +414,6 @@
            {
               try
               {
  -               log.info(this + ": sending ping");
                  controlOutputStream.write(Bisocket.PING);
               }
               catch (IOException e)
  
  
  



More information about the jboss-cvs-commits mailing list