[jboss-cvs] JBossRemoting/src/main/org/jboss/remoting/transport/bisocket ...

Ron Sigal ron_sigal at yahoo.com
Wed Aug 29 03:16:01 EDT 2007


  User: rsigal  
  Date: 07/08/29 03:16:01

  Modified:    src/main/org/jboss/remoting/transport/bisocket  Tag:
                        remoting_2_x BisocketClientInvoker.java
  Log:
  JBREM-797: Added "running" variable to PingTimerTask. Also, synchronized with branch remoting_2_2_0_GA: JBREM-774: replaceControlSocket() and handleDisconnect() close control socket; JBREM-785: Declares a stack variable listenerId in transport(); JBREM-767:  Network i/o moved from constructor to handleConnect(); JBREM-784: Added listenerIdToControlSocketsMap; JBREM-788:  Synchronized access to static maps in handleDisconnect(); JBREM-767:  handleDisconnect() wakes up any threads blocked in createSocket(); JBREM-766: Put wait() in a loop in handleConnect() and createSocket().
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.1.2.20  +155 -62   JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java
  
  (In the diff below, changes in quantity of whitespace are not shown.)
  
  Index: BisocketClientInvoker.java
  ===================================================================
  RCS file: /cvsroot/jboss/JBossRemoting/src/main/org/jboss/remoting/transport/bisocket/BisocketClientInvoker.java,v
  retrieving revision 1.1.2.19
  retrieving revision 1.1.2.20
  diff -u -b -r1.1.2.19 -r1.1.2.20
  --- BisocketClientInvoker.java	2 Jun 2007 05:39:35 -0000	1.1.2.19
  +++ BisocketClientInvoker.java	29 Aug 2007 07:16:01 -0000	1.1.2.20
  @@ -72,6 +72,7 @@
      private static Map listenerIdToClientInvokerMap = Collections.synchronizedMap(new HashMap());
      private static Map listenerIdToCallbackClientInvokerMap = Collections.synchronizedMap(new HashMap());
      private static Map listenerIdToSocketsMap = new HashMap();
  +   private static Map listenerIdToControlSocketsMap = new HashMap();
      private static Timer timer;
      private static Object timerLock = new Object();
   
  @@ -108,10 +109,24 @@
      }
   
   
  -   static void transferSocket(String listenerId, Socket socket)
  +   static void transferSocket(String listenerId, Socket socket, boolean isControlSocket)
      {
         Set sockets = null;
   
  +      if (isControlSocket)
  +      {
  +         synchronized (listenerIdToControlSocketsMap)
  +         {
  +            sockets = (Set) listenerIdToControlSocketsMap.get(listenerId);
  +            if (sockets == null)
  +            {
  +               sockets = new HashSet();
  +               listenerIdToControlSocketsMap.put(listenerId, sockets);
  +            }
  +         }
  +      }
  +      else
  +      {
         synchronized (listenerIdToSocketsMap)
         {
            sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  @@ -121,6 +136,7 @@
               listenerIdToSocketsMap.put(listenerId, sockets);
            }
         }
  +      }
   
         synchronized (sockets)
         {
  @@ -147,7 +163,7 @@
            {
               isCallbackInvoker = true;
               listenerIdToCallbackClientInvokerMap.put(listenerId, this);
  -            log.debug("registered " + listenerId + " -> " + this);
  +            log.debug(this + " :registered " + listenerId + " -> " + this);
            }
   
            // look for pingFrequency param
  @@ -183,18 +199,46 @@
               }
            }
         }
  +   }
  +
  +   public int getMaxRetries()
  +   {
  +      return maxRetries;
  +   }
  +
  +
  +   public void setMaxRetries(int maxRetries)
  +   {
  +      this.maxRetries = maxRetries;
  +   }
  +   
  +
  +   public int getPingFrequency()
  +   {
  +      return pingFrequency;
  +   }
  +
         
  +   public void setPingFrequency(int pingFrequency)
  +   {
  +      this.pingFrequency = pingFrequency;
  +   }
  +
  +
  +   protected void handleConnect() throws ConnectionFailedException
  +   {
  +      // Callback client on server side.
         if (isCallbackInvoker)
         {
            Set sockets = null;
   
  -         synchronized (listenerIdToSocketsMap)
  +         synchronized (listenerIdToControlSocketsMap)
            {
  -            sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  +            sockets = (Set) listenerIdToControlSocketsMap.get(listenerId);
               if (sockets == null)
               {
                  sockets = new HashSet();
  -               listenerIdToSocketsMap.put(listenerId, sockets);
  +               listenerIdToControlSocketsMap.put(listenerId, sockets);
               }
            }
            
  @@ -202,25 +246,40 @@
            {
               if (sockets.isEmpty())
               {
  +               long wait = timeout; 
  +               long start = System.currentTimeMillis(); 
  +               
  +               while (timeout == 0 || wait > 0)
  +               {
                  try
                  {
  -                  sockets.wait(timeout);
  +                     sockets.wait(wait);
  +                     break;
                  }
  -               catch (InterruptedException ignored)
  +                  catch (InterruptedException e)
                  {
  -                  log.warn("unexpected interrupt");
  -                  throw new InterruptedIOException("Attempt to create control socket interrupted");
  +                     log.debug("unexpected interrupt");
  +                     if (timeout > 0)
  +                        wait = timeout - (System.currentTimeMillis() - start);
  +                  }
                  }
               }
               
               if (sockets.isEmpty())
  -               throw new IOException("Timed out trying to create control socket");
  +               throw new ConnectionFailedException("Timed out trying to create control socket");
               
               Iterator it = sockets.iterator();
               controlSocket = (Socket) it.next();
               it.remove();
  +            try
  +            {
               controlOutputStream = controlSocket.getOutputStream();
  -            log.debug("got control socket: " + controlSocket);
  +            }
  +            catch (IOException e1)
  +            {
  +               throw new ConnectionFailedException("Unable to get control socket output stream");
  +            }
  +            log.debug("got control socket( " + listenerId + "): " + controlSocket);
               pingTimerTask = new PingTimerTask(this);
   
               synchronized (timerLock)
  @@ -241,38 +300,7 @@
                  }
               }
            }
  -      }
  -   }
   
  -   public int getMaxRetries()
  -   {
  -      return maxRetries;
  -   }
  -
  -
  -   public void setMaxRetries(int maxRetries)
  -   {
  -      this.maxRetries = maxRetries;
  -   }
  -   
  -
  -   public int getPingFrequency()
  -   {
  -      return pingFrequency;
  -   }
  -
  -
  -   public void setPingFrequency(int pingFrequency)
  -   {
  -      this.pingFrequency = pingFrequency;
  -   }
  -
  -
  -   protected void handleConnect() throws ConnectionFailedException
  -   {
  -      // Callback client on server side.
  -      if (isCallbackInvoker)
  -      {
            // Bisocket callback client invoker doesn't share socket pools because of the danger
            // that two distinct callback servers could have the same "artifical" port.
            pool = new LinkedList();
  @@ -290,6 +318,18 @@
         {
            if (isCallbackInvoker)
            {
  +            if (controlSocket != null)
  +            {
  +               try
  +               {
  +                  controlSocket.close();
  +               }
  +               catch (IOException e)
  +               {
  +                  log.debug("unable to close control socket: " + controlSocket);
  +               }
  +            }
  +
               listenerIdToCallbackClientInvokerMap.remove(listenerId);
               for (Iterator it = pool.iterator(); it.hasNext();)
               {
  @@ -309,7 +349,26 @@
               super.handleDisconnect();
            }
   
  -         listenerIdToSocketsMap.remove(listenerId);
  +         synchronized (listenerIdToControlSocketsMap)
  +         {
  +            listenerIdToControlSocketsMap.remove(listenerId);
  +         }
  +         
  +         Set sockets = null;
  +         synchronized (listenerIdToSocketsMap)
  +         {
  +            sockets = (Set) listenerIdToSocketsMap.remove(listenerId);
  +         }
  +         
  +         // Wake up any threads blocked in createSocket().
  +         if (sockets != null)
  +         {
  +            synchronized (sockets)
  +            {
  +               sockets.notifyAll();
  +            }
  +         }
  +         
            if (pingTimerTask != null)
               pingTimerTask.shutDown();
         }
  @@ -324,6 +383,7 @@
                                 Marshaller marshaller, UnMarshaller unmarshaller)
      throws IOException, ConnectionFailedException, ClassNotFoundException
      {
  +      String listenerId = null;
         if (invocation instanceof InvocationRequest)
         {
            InvocationRequest ir = (InvocationRequest) invocation;
  @@ -371,6 +431,12 @@
         synchronized (listenerIdToSocketsMap)
         {
            sockets = (Set) listenerIdToSocketsMap.get(listenerId);
  +
  +         if (sockets == null)
  +         {
  +            sockets = new HashSet();
  +            listenerIdToSocketsMap.put(listenerId, sockets);
  +         }
         }
   
         synchronized (controlLock)
  @@ -382,24 +448,37 @@
         {
            if (sockets.isEmpty())
            {
  +            long wait = timeout; 
  +            long start = System.currentTimeMillis(); 
  +            
  +            while (timeout == 0 || wait > 0)
  +            {
                  try
                  {
  -                  sockets.wait(timeout);
  +                  sockets.wait(wait);
  +                  break;
                  }
                  catch (InterruptedException e)
                  {
  -                  log.warn("unexpected interrupt");
  -                  throw new InterruptedIOException("Attempt to create callback socket interrupted");
  +                  log.debug("unexpected interrupt");
  +                  if (timeout > 0)
  +                     wait = timeout - (System.currentTimeMillis() - start);
  +               }
                  }
            }
   
            if (sockets.isEmpty())
  +         {
  +            if (!isConnected())
  +               throw new IOException("Unable to create socket: invoker is disconnected");
  +            else
               throw new IOException("Timed out trying to create socket");
  +         }
   
            Iterator it = sockets.iterator();
            Socket socket = (Socket) it.next();
            it.remove();
  -         log.debug("found socket: " + socket);
  +         log.debug(this + " found socket (" + listenerId + "): " + socket);
            return socket;
         }
      }
  @@ -409,9 +488,15 @@
      {
         synchronized (controlLock)
         {
  +         if (controlSocket != null)
  +         {
  +            controlSocket.close();
  +         }
  +         
  +         log.debug(this + " replacing control socket: " + controlSocket);
            controlSocket = socket;
  +         log.debug(this + " control socket replaced by: " + socket);
            controlOutputStream = controlSocket.getOutputStream();
  -         log.debug("replaced control socket");
         }
   
         if (pingTimerTask != null)
  @@ -494,6 +579,7 @@
         private int maxRetries;
         private Exception savedException;
         private boolean pingSent;
  +      private boolean running = true;
         
         PingTimerTask(BisocketClientInvoker invoker)
         {
  @@ -506,6 +592,7 @@
         {
            synchronized (controlLock)
            {
  +            running = false;
               controlOutputStream = null;
            }
            cancel();
  @@ -521,6 +608,9 @@
               {
                  synchronized (controlLock)
                  {
  +                  if (!running)
  +                     return;
  +                     
                     controlOutputStream.write(Bisocket.PING);
                  }
                  pingSent = true;
  @@ -533,6 +623,9 @@
               }
            }
            
  +         if (!running)
  +            return;
  +         
            if (!pingSent)
            {
               log.warn("Unable to send ping: shutting down PingTimerTask", savedException);
  
  
  



More information about the jboss-cvs-commits mailing list