[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Sat Dec 30 02:47:51 EST 2006


  User: rsigal  
  Date: 06/12/30 02:47:51

  Modified:    src/main/org/jboss/remoting/transport/bisocket  Tag:
                        remoting_2_x BisocketClientInvoker.java
  Log:
  JBREM-650: (1) Distinguised behavior on client and server; (2) reorganized createSocket(); (3) added controlLock.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.1.2.3   +129 -99   JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
  retrieving revision 1.1.2.2
  retrieving revision 1.1.2.3
  diff -u -b -r1.1.2.2 -r1.1.2.3
  --- BisocketClientInvoker.java	19 Dec 2006 06:06:06 -0000	1.1.2.2
  +++ BisocketClientInvoker.java	30 Dec 2006 07:47:51 -0000	1.1.2.3
  @@ -22,7 +22,6 @@
   
   package org.jboss.remoting.transport.bisocket;
   
  -import java.io.DataOutputStream;
   import java.io.IOException;
   import java.io.OutputStream;
   import java.net.Socket;
  @@ -32,6 +31,7 @@
   import java.util.Iterator;
   import java.util.Map;
   import java.util.Set;
  +import java.util.Timer;
   import java.util.TimerTask;
   
   import org.jboss.logging.Logger;
  @@ -44,11 +44,20 @@
   import org.jboss.remoting.marshal.UnMarshaller;
   import org.jboss.remoting.transport.BidirectionalClientInvoker;
   import org.jboss.remoting.transport.socket.SocketClientInvoker;
  -import org.jboss.remoting.util.TimerUtil;
   
   /**
  - * BisocketClientInvoker uses Sockets to remotely connect to the a remote ServerInvoker,
  - * which must be a BisocketServerInvoker.
  + * The bisocket transport, an extension of the socket transport, is designed to allow
  + * a callback server to function behind a firewall.  All connections are created by
  + * a Socket constructor or factory on the client side connecting to a ServerSocket on
  + * the server side.  When a callback client invoker on the server side needs to
  + * open a connection to the callback server, it requests a connection by sending a
  + * request message over a control connection to the client side.
  + * 
  + * Because all connections are created in one direction, the bisocket transport is
  + * asymmetric, in the sense that client invokers and server invokers behave differently
  + * on the client side and on the server side.
  + * 
  + * 
    * 
    * @author <a href="mailto:ron.sigal at jboss.com">Ron Sigal</a>
    */
  @@ -58,22 +67,40 @@
   {  
      private static final Logger log = Logger.getLogger(BisocketClientInvoker.class);
      private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
  +   private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
      private static Map listenerIdToSocketsMap = new HashMap();
  +   private static Timer timer;
    
  -   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
      protected String listenerId;
  +   
  +   private int pingFrequency = Bisocket.PING_FREQUENCY_DEFAULT;;
      private InvokerLocator secondaryLocator;
      private Socket controlSocket;
      private OutputStream controlOutputStream;
  +   private Object controlLock = new Object();
      private PingTimerTask pingTimerTask;
  +   private boolean isCallbackInvoker;
   
      
  +   /**
  +    * 
  +    * FIXME Comment this
  +    * 
  +    * @param listenerId
  +    * @return
  +    */
      static BisocketClientInvoker getBisocketClientInvoker(String listenerId)
      {
         return (BisocketClientInvoker) listenerIdToClientInvokerMap.get(listenerId);
      }
      
      
  +   static BisocketClientInvoker getBisocketCallbackClientInvoker(String listenerId)
  +   {
  +      return (BisocketClientInvoker) listenerIdToCallbackClientInvokerMap.get(listenerId);
  +   }
  +   
  +   
      static void transferSocket(String listenerId, Socket socket)
      {
         Set sockets = null;
  @@ -111,7 +138,8 @@
            listenerId = (String) config.get(Client.LISTENER_ID_KEY);
            if (listenerId != null)
            {
  -            listenerIdToClientInvokerMap.put(listenerId, this);
  +            isCallbackInvoker = true;
  +            listenerIdToCallbackClientInvokerMap.put(listenerId, this);
               
               synchronized (listenerIdToSocketsMap)
               {
  @@ -119,7 +147,7 @@
                     listenerIdToSocketsMap.put(listenerId, new HashSet());
               }
               
  -            log.info("registered " + listenerId + " -> " + this);
  +            log.debug("registered " + listenerId + " -> " + this);
            }
   
               // look for socketTimeout param
  @@ -151,7 +179,6 @@
      public void setPingFrequency(int pingFrequency)
      {
         this.pingFrequency = pingFrequency;
  -      log.info("set ping frequency: " + pingFrequency);
      }
      
      
  @@ -162,22 +189,13 @@
         // Callback client on server side.
         if (listenerId != null)
         {
  -//         pingTimerTask = new PingTimerTask();
  -//         TimerUtil.schedule(pingTimerTask, pingFrequency);
  -//         log.info("scheduled PingTimerTask");
            return;
         }
         
         // Client on client side.
         try
         {
  -         InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
  -         InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
  -//         secondaryLocator = (InvokerLocator) invoke(r);
  -         Object o = invoke(r);
  -         log.info("secondary locator: " + o);
  -         secondaryLocator = (InvokerLocator) o;
  -         log.info("got secondary locator: " + secondaryLocator);
  +         secondaryLocator = getSecondaryLocator();
         }
         catch (Throwable e)
         {
  @@ -192,7 +210,11 @@
         super.handleDisconnect();
         if (listenerId != null)
         {
  +         if (isCallbackInvoker)
  +            listenerIdToCallbackClientInvokerMap.remove(listenerId);
  +         else
            listenerIdToClientInvokerMap.remove(listenerId);
  +
            listenerIdToSocketsMap.remove(listenerId);
            if (pingTimerTask != null)
               pingTimerTask.shutDown();
  @@ -215,6 +237,7 @@
               {
                  Map requestPayload = ir.getRequestPayload();
                  String listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
  +               listenerIdToClientInvokerMap.put(listenerId, this);
                  BisocketServerInvoker callbackServerInvoker;
                  callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
                  callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
  @@ -231,91 +254,85 @@
         if (listenerId == null)
            return super.createSocket(address, port);
         
  -      Socket socket = null;
         Set sockets = null;
         
         synchronized (listenerIdToSocketsMap)
         {
            sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  -      }
  -      
  -      synchronized (sockets)
  -      {
  -         Iterator it = sockets.iterator();
  -         if (it.hasNext())
  -         {
  -            socket = (Socket) it.next();
  -            it.remove();
  -            
  -            if (controlSocket != null)
  -               return socket;
  -            else
  -            {
  -               controlSocket = socket;
  -               controlOutputStream = controlSocket.getOutputStream();
  -               log.info("got control socket");
  -               pingTimerTask = new PingTimerTask();
  -               TimerUtil.schedule(pingTimerTask, pingFrequency);
  -               log.info("scheduled PingTimerTask");
  -               
  -               if (it.hasNext())
  +         if (sockets == null)
                  {
  -                  socket = (Socket) it.next();
  -                  it.remove();
  -                  return socket;
  -               }
  +            sockets = new HashSet();
  +            listenerIdToSocketsMap.put(listenerId, sockets);
               }
            }
            
            if (controlSocket == null)
            {
  +         synchronized (sockets)
  +         {
  +            if (sockets.isEmpty())
  +            {
               while (true)
               {
                  try
                  {
  -                  sockets.wait(getTimeout());
  -                  if (sockets.isEmpty())
  -                     throw new IOException("Timed out trying to create control socket");
  +                     sockets.wait(1000);
                     break;
                  }
  -               catch (InterruptedException ignored) {}
  +                  catch (InterruptedException ignored)
  +                  {
  +                     log.warn("unexpected interrupt");
  +                  }
  +               }
               }
               
  -            it = sockets.iterator();
  +            if (sockets.isEmpty())
  +               throw new IOException("Timed out trying to create control socket");
  +            
  +            Iterator it = sockets.iterator();
               controlSocket = (Socket) it.next();
  -            controlOutputStream = controlSocket.getOutputStream();
               it.remove(); 
  -            log.info("got control socket");
  +            controlOutputStream = controlSocket.getOutputStream();
  +            log.debug("got control socket");
               pingTimerTask = new PingTimerTask();
  -            TimerUtil.schedule(pingTimerTask, pingFrequency);
  -            log.info("scheduled PingTimerTask");
  +            if (timer == null)
  +            {
  +               timer = new Timer();
  +            }
  +            timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
  +         }
            }
            
  -         log.info("requesting socket");
  -         
  -         synchronized (controlOutputStream)
  +      synchronized (controlLock)
            {
               controlOutputStream.write(Bisocket.CREATE_ORDINARY_SOCKET);
            }
  -//         controlOutputStream.writeUTF(listenerId);
     
  +      synchronized (sockets)
  +      {
  +         if (sockets.isEmpty())
  +         {
            while (true)
            {
               try
               {
  -//               sockets.wait(getTimeout());
                  sockets.wait(1000);
  -               if (sockets.isEmpty())
  -                  throw new IOException("Timed out trying to create socket");
                  break;
               }
  -            catch (InterruptedException ignored) {}
  +               catch (InterruptedException ignored)
  +               {
  +                  log.warn("unexpected interrupt");
  +               }
  +            }
            }
            
  -         it = sockets.iterator();
  -         socket = (Socket) it.next();
  +         if (sockets.isEmpty())
  +            throw new IOException("Timed out trying to create socket");
  +         
  +         Iterator it = sockets.iterator();
  +         Socket socket = (Socket) it.next();
            it.remove();
  -         log.info("got socket: " + socket);
  +         log.debug("socket found");
            return socket;
         }
      }
  @@ -323,21 +340,35 @@
      
      void replaceControlSocket(Socket socket) throws IOException
      {
  -      synchronized (controlSocket)
  +      synchronized (controlLock)
         {
            controlSocket = socket;
  -      }
         controlOutputStream = controlSocket.getOutputStream();
  +         log.debug("replaced control socket");
  +      }
         
         if (pingTimerTask != null)
            pingTimerTask.cancel();
         
         pingTimerTask = new PingTimerTask();
  -      TimerUtil.schedule(pingTimerTask, pingFrequency);
  -      log.info("replaced PingTimerTask");
  +      if (timer == null)
  +      {
  +         timer = new Timer();
  +      }
  +      timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
      }
      
      
  +   InvokerLocator getSecondaryLocator() throws Throwable
  +   {
  +      InternalInvocation ii = new InternalInvocation(Bisocket.GET_SECONDARY_INVOKER_LOCATOR, null);
  +      InvocationRequest r = new InvocationRequest(null, null, ii, null, null, null);
  +      Object o = invoke(r);
  +      log.debug("secondary locator: " + o);
  +      secondaryLocator = (InvokerLocator) o;
  +      return secondaryLocator;
  +   }
  +   
      protected Object checkType(Object o, Class c) throws IOException
      {
         if (c.isInstance(o))
  @@ -386,11 +417,10 @@
         
         public void run()
         {
  -         synchronized (controlOutputStream)
  +         synchronized (controlLock)
            {
               try
               {
  -               log.info("sending ping");
                  controlOutputStream.write(Bisocket.PING);
               }
               catch (IOException e)
  
  
  



More information about the jboss-cvs-commits mailing list