[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Sun Mar 11 16:13:42 EDT 2007


  User: rsigal  
  Date: 07/03/11 16:13:42

  Modified:    src/main/org/jboss/remoting/transport/bisocket 
                        BisocketClientInvoker.java
  Log:
  JBREM-721, JBREM-722, JBREM-723: (1) transport() looks for REMOVELISTENER; (2) made PingTimerTask a static class; (3) creates control connection in constructor; (4) transport() checks for pull callback connections.
  
  Revision  Changes    Path
  1.8       +86 -61    JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -b -r1.7 -r1.8
  --- BisocketClientInvoker.java	23 Feb 2007 07:04:41 -0000	1.7
  +++ BisocketClientInvoker.java	11 Mar 2007 20:13:42 -0000	1.8
  @@ -139,17 +139,10 @@
            {
               isCallbackInvoker = true;
               listenerIdToCallbackClientInvokerMap.put(listenerId, this);
  -
  -            synchronized (listenerIdToSocketsMap)
  -            {
  -               if (listenerIdToSocketsMap.get(listenerId) == null)
  -                  listenerIdToSocketsMap.put(listenerId, new HashSet());
  -            }
  -
               log.debug("registered " + listenerId + " -> " + this);
            }
   
  -         // look for socketTimeout param
  +         // look for pingFrequency param
            Object val = config.get(Bisocket.PING_FREQUENCY);
            if (val != null)
            {
  @@ -166,6 +159,52 @@
               }
            }
         }
  +      
  +      if (isCallbackInvoker)
  +      {
  +         Set sockets = null;
  +
  +         synchronized (listenerIdToSocketsMap)
  +         {
  +            sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  +            if (sockets == null)
  +            {
  +               sockets = new HashSet();
  +               listenerIdToSocketsMap.put(listenerId, sockets);
  +            }
  +         }
  +         
  +         synchronized (sockets)
  +         {
  +            if (sockets.isEmpty())
  +            {
  +               try
  +               {
  +                  sockets.wait(timeout);
  +               }
  +               catch (InterruptedException ignored)
  +               {
  +                  log.warn("unexpected interrupt");
  +                  throw new InterruptedIOException("Attempt to create control socket interrupted");
  +               }
  +            }
  +            
  +            if (sockets.isEmpty())
  +               throw new IOException("Timed out trying to create control socket");
  +            
  +            Iterator it = sockets.iterator();
  +            controlSocket = (Socket) it.next();
  +            it.remove();
  +            controlOutputStream = controlSocket.getOutputStream();
  +            log.debug("got control socket: " + controlSocket);
  +            pingTimerTask = new PingTimerTask(this);
  +            if (timer == null)
  +            {
  +               timer = new Timer(true);
  +            }
  +            timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
  +         }
  +      }
      }
   
   
  @@ -254,7 +293,8 @@
            if (o instanceof InternalInvocation)
            {
               InternalInvocation ii = (InternalInvocation) o;
  -            if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName()))
  +            if (InternalInvocation.ADDLISTENER.equals(ii.getMethodName())
  +                && ir.getLocator() != null) // getLocator() == null for pull callbacks
               {
                  Map requestPayload = ir.getRequestPayload();
                  listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
  @@ -263,6 +303,17 @@
                  callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
                  callbackServerInvoker.createControlConnection(listenerId, secondaryLocator);
               }
  +            else if (InternalInvocation.REMOVELISTENER.equals(ii.getMethodName()))
  +            {
  +               Map requestPayload = ir.getRequestPayload();
  +               listenerId = (String) requestPayload.get(Client.LISTENER_ID_KEY);
  +               listenerIdToClientInvokerMap.remove(listenerId);
  +               BisocketServerInvoker callbackServerInvoker;
  +               callbackServerInvoker = BisocketServerInvoker.getBisocketServerInvoker(listenerId);
  +               
  +               if (callbackServerInvoker != null)
  +                  callbackServerInvoker.destroyControlConnection(listenerId);
  +            }
            }
         }
   
  @@ -275,53 +326,18 @@
         if (!isCallbackInvoker)
            return super.createSocket(address, port, timeout);
   
  -      Set sockets = null;
  -
  -      synchronized (listenerIdToSocketsMap)
  -      {
  -         sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  -         if (sockets == null)
  -         {
  -            sockets = new HashSet();
  -            listenerIdToSocketsMap.put(listenerId, sockets);
  -         }
  -      }
  -
  -      if (controlSocket == null)
  +      if (timeout < 0)
         {
  -         synchronized (sockets)
  -         {
  -            if (sockets.isEmpty())
  -            {
  -               while (true)
  -               {
  -                  try
  -                  {
  -                     sockets.wait(1000);
  -                     break;
  -                  }
  -                  catch (InterruptedException ignored)
  -                  {
  -                     log.warn("unexpected interrupt");
  -                  }
  -               }
  +         timeout = getTimeout();
  +         if (timeout < 0)
  +            timeout = 0;
               }
   
  -            if (sockets.isEmpty())
  -               throw new IOException("Timed out trying to create control socket");
  +      Set sockets = null;
   
  -            Iterator it = sockets.iterator();
  -            controlSocket = (Socket) it.next();
  -            it.remove();
  -            controlOutputStream = controlSocket.getOutputStream();
  -            log.debug("got control socket: " + controlSocket);
  -            pingTimerTask = new PingTimerTask();
  -            if (timer == null)
  +      synchronized (listenerIdToSocketsMap)
               {
  -               timer = new Timer(true);
  -            }
  -            timer.schedule(pingTimerTask, pingFrequency, pingFrequency);
  -         }
  +         sockets = (Set) listenerIdToSocketsMap.get(listenerId);
         }
   
         synchronized (controlLock)
  @@ -333,17 +349,14 @@
         {
            if (sockets.isEmpty())
            {
  -            while (true)
  -            {
                  try
                  {
  -                  sockets.wait(1000);
  -                  break;
  +                  sockets.wait(timeout);
                  }
  -               catch (InterruptedException ignored)
  +               catch (InterruptedException e)
                  {
                     log.warn("unexpected interrupt");
  -               }
  +                  throw new InterruptedIOException("Attempt to create callback socket interrupted");
               }
            }
   
  @@ -371,7 +384,7 @@
         if (pingTimerTask != null)
            pingTimerTask.cancel();
   
  -      pingTimerTask = new PingTimerTask();
  +      pingTimerTask = new PingTimerTask(this);
         if (timer == null)
         {
            timer = new Timer(true);
  @@ -427,10 +440,23 @@
      }
   
   
  -   class PingTimerTask extends TimerTask
  +   static class PingTimerTask extends TimerTask
  +   {
  +      private Object controlLock;
  +      private OutputStream controlOutputStream;
  +      
  +      PingTimerTask(BisocketClientInvoker invoker)
      {
  +         controlLock = invoker.controlLock;
  +         controlOutputStream = invoker.controlOutputStream;
  +      }
  +      
         public void shutDown()
         {
  +         synchronized (controlLock)
  +         {
  +            controlOutputStream = null;
  +         }
            cancel();
         }
   
  @@ -445,8 +471,7 @@
               catch (IOException e)
               {
                  log.warn("Unable to send ping: shutting down PingTimerTask");
  -               pingTimerTask = null;
  -               cancel();
  +               shutDown();
               }
            }
         }
  
  
  



More information about the jboss-cvs-commits mailing list