[jboss-remoting-commits] JBoss Remoting SVN: r5216 - remoting2/branches/2.x/src/main/org/jboss/remoting.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu May 14 21:58:55 EDT 2009


Author: ron.sigal at jboss.com
Date: 2009-05-14 21:58:55 -0400 (Thu, 14 May 2009)
New Revision: 5216

Modified:
   remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java
Log:
JBREM-1132: Introduced "stopping" and "useClientConnectionIdentity".

Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java	2009-05-15 01:40:05 UTC (rev 5215)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/ConnectionValidator.java	2009-05-15 01:58:55 UTC (rev 5216)
@@ -22,11 +22,11 @@
 
 package org.jboss.remoting;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -117,7 +117,7 @@
       }
       catch (Throwable throwable)
       {
-         log.debug("ConnectionValidator to connect to server " +
+         log.debug("ConnectionValidator unable to connect to server " +
             innerClientInvoker.getLocator().getProtocol() + "://" +
             innerClientInvoker.getLocator().getHost() + ":" +
             innerClientInvoker.getLocator().getPort(), throwable);
@@ -227,12 +227,13 @@
    private Map metadata;
    private InvokerLocator locator;
    private Map configMap;
-   private List listeners;
+   private Map listeners;
    private ClientInvoker clientInvoker;
    private Object lock = new Object();
    private Object notificationLock = new Object();
    private boolean started;
    private volatile boolean stopped;
+   private volatile boolean stopping;
    private String invokerSessionId;
    private boolean tieToLease = true;
    private boolean stopLeaseOnFailure = true;
@@ -240,6 +241,9 @@
    private int failureDisconnectTimeout = -1;
    private boolean isValid;
    private Timer timer;
+   private MicroRemoteClientInvoker sharedInvoker;
+   private LeasePinger leasePinger;
+   private boolean useClientConnectionIdentity;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -251,9 +255,10 @@
    public ConnectionValidator(Client client, long pingPeriod)
    {
       this.client = client;
+      this.locator = client.getInvoker().getLocator();
       this.pingPeriod = pingPeriod;
       pingTimeout = DEFAULT_PING_TIMEOUT_INT;
-      listeners = new ArrayList();
+      listeners = new HashMap();
       stopped = false;
       getParameters(client, new HashMap());
       log.debug(this + " created");
@@ -262,9 +267,10 @@
    public ConnectionValidator(Client client, Map metadata)
    {
       this.client = client;
+      this.locator = client.getInvoker().getLocator();
       pingPeriod = DEFAULT_PING_PERIOD;
       pingTimeout = DEFAULT_PING_TIMEOUT_INT;
-      listeners = new ArrayList();
+      listeners = new HashMap();
       stopped = false;
       this.metadata = new HashMap(metadata);
       getParameters(client, metadata);
@@ -300,7 +306,7 @@
                   ".addConnectionListener() instead.");
          }
          
-         if (stopped)
+         if (stopping)
          {
             return;
          }
@@ -321,7 +327,7 @@
 
       try
       {
-         if(!stopped)
+         if(!stopping)
          {
             isValid = false;
 
@@ -366,41 +372,84 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   public void addConnectionListener(ConnectionListener listener)
+   public boolean addConnectionListener(Client client, ConnectionListener listener)
    {
+      boolean doStart = false;
       if (listener != null)
       {
          synchronized (lock)
          {
+            if (stopping)
+            {
+               if (trace) log.trace(this + " is stopped. Cannot add ConnectionListener: " + listener + " for " + client);
+               return false;
+            }
             if (listeners.size() == 0)
             {
-               start();
+               doStart = true;
             }
-            listeners.add(listener);
+            Set s = (Set) listeners.get(listener);
+            if (s == null)
+            {
+               s = new HashSet();
+            }
+            s.add(client);
+            listeners.put(listener, s);
+            log.debug(this + " added ConnectionListener: " + listener + " for " + client);
          }
+         if (doStart)
+         {
+            start();
+         }
       }
+      
+      return true;
    }
 
-   public boolean removeConnectionListener(ConnectionListener listener)
+   public boolean removeConnectionListener(Client client, ConnectionListener listener)
    {
-      boolean isRemoved = false;
-      if (listener != null)
+      if (listener == null)
       {
-         synchronized (lock)
+         if (trace) log.trace(this + " ConnectionListener is null");
+         return false;
+      }
+      synchronized (lock)
+      {
+         if (stopping)
          {
-            isRemoved = listeners.remove(listener);
-            if (listeners.size() == 0)
-            {
-               stop();
-            }
+            if (trace) log.trace(this + " is stopped. It's too late to remove " + listener);
+            return false;
          }
+         Set s = (Set) listeners.get(listener);
+         if (s == null)
+         {
+            log.debug(this + ": " + listener + " is not registered");
+            return false;
+         }
+         if (s.remove(client))
+         {
+            log.debug(this + " removed ConnectionListener: " + listener + " for " + client);
+         }
+         else
+         {
+            log.debug(this + ": " + listener + " is not registered for " + client);
+            return false;
+         }
+         if (s.size() == 0)
+         {
+            listeners.remove(listener);
+         }
+         if (listeners.size() == 0)
+         {
+            stop();
+         }
       }
-      return isRemoved;
+      return true;
    }
 
    public long getPingPeriod()
    {
-      if (stopped)
+      if (stopping)
       {
          return -1;
       }
@@ -412,9 +461,52 @@
    {
       return "ConnectionValidator[" + Integer.toHexString(System.identityHashCode(this)) + ":" + clientInvoker + ", pingPeriod=" + pingPeriod + " ms]";
    }
+   
+   public boolean isStopped()
+   {
+      return stopped;
+   }
 
    // Package protected ----------------------------------------------------------------------------
 
+   void notifyListeners(Throwable thr)
+   {
+      final Throwable t = thr;
+      synchronized (lock)
+      {
+         if (stopping)
+         {
+            return;
+         }
+         stopping = true;
+         if (trace) log.trace(this + " is stopped.  No more listeners will be accepted.");
+         
+         Iterator itr = listeners.keySet().iterator();
+         while (itr.hasNext())
+         {
+            final ConnectionListener listener = (ConnectionListener) itr.next();
+            Set clients = (Set) listeners.get(listener);
+            Iterator itr2 = clients.iterator();
+            while (itr2.hasNext())
+            {
+               final Client client  = (Client) itr2.next();
+               new Thread()
+               {
+                  public void run()
+                  {
+                     log.debug(ConnectionValidator.this + " calling " + listener + ".handleConnectionException() for " + client);
+                     listener.handleConnectionException(t, client);
+                  }
+               }.start();
+            }
+         }
+         
+         listeners.clear();
+      }
+      
+      stop();
+   }
+   
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
@@ -431,12 +523,21 @@
       ClientInvoker clientInvoker = client.getInvoker();
       if (clientInvoker instanceof MicroRemoteClientInvoker)
       {
-         invokerSessionId = ((MicroRemoteClientInvoker) clientInvoker).getSessionId();
+         sharedInvoker = (MicroRemoteClientInvoker) clientInvoker;
+         invokerSessionId = sharedInvoker.getSessionId();
       }
       else
       {
          throw new RuntimeException("creating a ConnectionValidator on a local connection");
       }
+      if (stopLeaseOnFailure)
+      {
+         if (sharedInvoker != null)
+         {
+            leasePinger = sharedInvoker.getLeasePinger();
+         }
+      }
+      if (trace) log.trace(this + ": sharedInvoker = " + sharedInvoker + ", leasePinger = " + leasePinger);
    }
    
    private boolean checkUseParametersFromLocator(Client client, Map metadata)
@@ -580,6 +681,7 @@
          }
          
          o = config.get(FAILURE_DISCONNECT_TIMEOUT);
+         if (trace) log.trace(this + " \"failureDisconnectTimeout\" set to " + o);
          if (o != null)
          {
             if (o instanceof String)
@@ -587,6 +689,7 @@
                try
                {
                   failureDisconnectTimeout = Integer.valueOf(((String) o)).intValue();
+                  if (trace) log.trace(this + " setting failureDisconnectTimeout to " + failureDisconnectTimeout);
                }
                catch (Exception e)
                {
@@ -600,6 +703,27 @@
                " to an int: must be a String");
             }
          }
+         o = config.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+         if (o != null)
+         {
+            if (o instanceof String)
+            {
+               try
+               {
+                  useClientConnectionIdentity = Boolean.valueOf(((String) o)).booleanValue();
+               }
+               catch (Exception e)
+               {
+                  log.warn(this + " could not convert " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " value" +
+                           " to a boolean: " + o);
+               }
+            }
+            else
+            {
+               log.warn(this + " could not convert " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " value" +
+                        " to a boolean: must be a String");
+            }
+         }
       }
    }
    
@@ -610,7 +734,6 @@
       log.debug(this + ": pingPeriod:  " + this.pingPeriod);
       log.debug(this + ": pingTimeout: " + this.pingTimeout);
       log.debug(this + ": ping retries: " + configMap.get("NumberOfCallRetries"));
-      locator = client.getInvoker().getLocator();
 
       try
       {
@@ -628,7 +751,14 @@
          clientInvoker.connect();
       }
 
-      TimerUtil.schedule(this, pingPeriod);
+      try
+      {
+          TimerUtil.schedule(this, pingPeriod);
+      }
+      catch (Exception e)
+      {
+         log.error(this + " unable to schedule on TimerUtil", e);
+      }
       started = true;
       timer = new Timer(true);
       log.debug(this + " started");
@@ -696,6 +826,7 @@
 
    private boolean doStop()
    {
+      if (trace) log.trace("entering doStop()");
       synchronized(lock)
       {
          if (stopped)
@@ -707,6 +838,7 @@
          {
             listeners.clear();
          }
+         stopping = true;
          stopped = true;
          timer = null;
       }
@@ -722,34 +854,7 @@
       log.debug(this + " stopped, returning " + result);
       return result;
    }
-
-   private void notifyListeners(Throwable thr)
-   {
-      final Throwable t = thr;
-      synchronized (lock)
-      {
-         if (stopped)
-         {
-            return;
-         }
-         ListIterator itr = listeners.listIterator();
-         while (itr.hasNext())
-         {
-            final ConnectionListener listener = (ConnectionListener) itr.next();
-            new Thread()
-            {
-               public void run()
-               {
-                  log.debug(this + " calling " + listener + ".handleConnectionException()");
-                  listener.handleConnectionException(t, client);
-               }
-            }.start();
-         }
-      }
-      stop();
-      listeners.clear();
-   }
-
+   
    // Inner classes --------------------------------------------------------------------------------
 
    private class WaitOnConnectionCheckTimerTask extends TimerTask
@@ -783,27 +888,30 @@
          if (!isValid)
          {
             log.debug(ConnectionValidator.this + "'s connection is invalid");
-
-            notifyListeners(new Exception("Could not connect to server!"));
+            ConnectionValidator.super.cancel();
             
             if (stopLeaseOnFailure)
             {
-               log.debug(this + " detected connection failure: stopping LeasePinger");
-               MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker) client.getInvoker();
-               
-               if (invoker != null)
+               if (trace) log.trace(ConnectionValidator.this + " detected connection failure: stopping LeasePinger");
+               if (leasePinger != null)
                {
+                  log.debug(ConnectionValidator.this + " shutting down lease pinger: " + leasePinger);
                   int disconnectTimeout = (failureDisconnectTimeout == -1) ? client.getDisconnectTimeout() : failureDisconnectTimeout;
-                  invoker.terminateLease(null, disconnectTimeout);
-                  log.debug(ConnectionValidator.this + " shut down lease pinger");
+                  if (trace) log.trace(ConnectionValidator.this + " disconnectTimeout: " + disconnectTimeout);
+                  sharedInvoker.terminateLease(null, disconnectTimeout, leasePinger);
                }
                else
                {
-                  log.debug(ConnectionValidator.this + " unable to shut down lease pinger: client must have shut down");
+                  if (trace) log.trace(ConnectionValidator.this + ": lease pinger == null: perhaps leasing is not enabled for this connection");
+                  notifyListeners(new Exception("Could not connect to server!"));
                }
                
                cancel();
             }
+            if (!useClientConnectionIdentity)
+            {
+                notifyListeners(new Exception("Could not connect to server!"));
+            }
          }
       }
    }




More information about the jboss-remoting-commits mailing list