[jboss-remoting-commits] JBoss Remoting SVN: r5094 - remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu Apr 30 17:32:26 EDT 2009


Author: ron.sigal at jboss.com
Date: 2009-04-30 17:32:26 -0400 (Thu, 30 Apr 2009)
New Revision: 5094

Modified:
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java
   remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java
Log:
JBREM-1112 (and others to be named): LeasePingerID, single ConnectionValidator per client invoker.

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Client.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -49,6 +49,7 @@
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.StreamCorruptedException;
+import java.lang.ref.WeakReference;
 import java.net.InetAddress;
 import java.net.SocketTimeoutException;
 import java.rmi.MarshalException;
@@ -152,6 +153,9 @@
    public static final int DEFAULT_DISCONNECT_TIMEOUT = -1;
    
    public static final String THROW_CALLBACK_EXCEPTION = "throwCallbackException";
+   
+   private static Map connectionValidators = new HashMap();
+   private static Object connectionValidatorLock = new Object();
 
    private static final Logger log = Logger.getLogger(Client.class);
 
@@ -175,6 +179,7 @@
    private InvokerLocator locator;
 
    private ConnectionValidator connectionValidator = null;
+   private ConnectionValidatorKey connectionValidatorKey;
    private Map configuration = new HashMap();
 
    private Map callbackConnectors = new HashMap();
@@ -187,6 +192,8 @@
    private int disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
 
    private boolean connected = false;
+   
+   private Set connectionListeners = new HashSet();
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -384,11 +391,54 @@
          }
       }
 
-      if (connectionValidator == null)
+      synchronized (connectionValidatorLock)
       {
-         connectionValidator = new ConnectionValidator(this, metadata);
+         if (connectionValidator == null)
+         {
+            Map map = new HashMap(configuration);
+            map.putAll(metadata);
+            connectionValidatorKey = new ConnectionValidatorKey(invoker, map);
+            WeakReference ref = (WeakReference) connectionValidators.get(connectionValidatorKey);
+            if (ref == null)
+            {
+               connectionValidator = new ConnectionValidator(this, metadata);
+               connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+               connectionValidator.addConnectionListener(this, listener);
+               log.debug(this + ": created " + connectionValidator);
+            }
+            else
+            {
+               connectionValidator = (ConnectionValidator) ref.get();
+               if (connectionValidator.addConnectionListener(this, listener))
+               {
+                  log.debug(this + ": reusing from static table:  " + connectionValidator);                  
+               }
+               else
+               {
+                  connectionValidator = new ConnectionValidator(this, metadata);
+                  connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+                  connectionValidator.addConnectionListener(this, listener);
+                  log.debug(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
+               }
+            }
+         }
+         else
+         {
+            if (connectionValidator.addConnectionListener(this, listener))
+            {
+               log.debug(this + ": reusing from local reference: " + connectionValidator);                  
+            }
+            else
+            {
+               connectionValidator = new ConnectionValidator(this, metadata);
+               connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+               connectionValidator.addConnectionListener(this, listener);
+               log.debug(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
+            }
+         }
+         
+         connectionListeners.add(listener);
       }
-      connectionValidator.addConnectionListener(listener);
    }
 
    /**
@@ -397,11 +447,29 @@
     */
    public boolean removeConnectionListener(ConnectionListener listener)
    {
-      if (connectionValidator == null)
+      boolean isRemoved = false;
+      synchronized (connectionValidatorLock)
       {
-         return false;
+         if (connectionValidator == null)
+         {
+            return false;
+         }
+         isRemoved = connectionValidator.removeConnectionListener(this, listener);
+         if (connectionValidator.isStopped())
+         {
+            connectionValidators.remove(connectionValidatorKey);
+            log.debug(this + " removed from static map: " + connectionValidator);
+            connectionValidator = null;
+            connectionValidatorKey = null;
+         }
+         connectionListeners.remove(listener);
+         if (connectionListeners.isEmpty())
+         {
+            connectionValidator = null;
+            connectionValidatorKey = null;
+         }
       }
-      return connectionValidator.removeConnectionListener(listener);
+      return isRemoved;
    }
 
    /**
@@ -483,10 +551,25 @@
          // this is a noop if no lease is active
          invoker.terminateLease(sessionId, disconnectTimeout);
 
-         if (connectionValidator != null)
+         synchronized (connectionValidatorLock)
          {
-            connectionValidator.stop();
+            if (connectionValidator != null)
+            {
+               Iterator it = connectionListeners.iterator();
+               while (it.hasNext())
+               {
+                  ConnectionListener listener = (ConnectionListener) it.next();
+                  connectionValidator.removeConnectionListener(this, listener);
+               }
+               if (connectionValidator.isStopped())
+               {
+                  connectionValidators.remove(connectionValidatorKey);
+                  log.debug(this + " removed from static map: " + connectionValidator);
+               }
+            }
+            //            connectionValidator.stop();
             connectionValidator = null;
+            connectionValidatorKey = null;
          }
 
          // Need to remove myself from registry so will not keep reference to me since I am of no
@@ -1537,6 +1620,7 @@
             e.initCause(throwable);
             throw e;
          }
+         log.debug(this + " connected to " + locator);
       }
       else
       {
@@ -1726,4 +1810,32 @@
 
 
    // Inner classes --------------------------------------------------------------------------------
+   
+   static class ConnectionValidatorKey
+   {
+      private ClientInvoker invoker;
+      private Map metadata;
+      
+      ConnectionValidatorKey(ClientInvoker invoker, Map metadata)
+      {
+         this.invoker = invoker;
+         this.metadata = metadata;
+      }
+      
+      public boolean equals(Object o)
+      {
+         if (o == null)
+            return false;
+         if (! (o instanceof ConnectionValidatorKey))
+            return false;
+         ConnectionValidatorKey holder = (ConnectionValidatorKey) o;
+         boolean metadataEquals = (metadata == null && holder.metadata == null) || metadata.equals(holder.metadata); 
+         return invoker == holder.invoker && metadataEquals;
+      }
+      
+      public int hashCode()
+      {
+         return invoker.hashCode() * metadata.hashCode();
+      }
+   }
 }

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionNotifier.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -43,7 +43,7 @@
    {
       try
       {
-         log.debug("Server connection lost to client (session id = " + clientSessionId);
+         log.debug(this + " Server connection lost to client (session id = " + clientSessionId);
          Client client = new Client(new InvokerLocator(locatorurl), requestPayload);
          client.setSessionId(clientSessionId);
          
@@ -52,7 +52,9 @@
             Iterator it = listeners.iterator();
             while (it.hasNext())
             {
-               ((ConnectionListener) it.next()).handleConnectionException(null, client);
+               ConnectionListener listener = (ConnectionListener) it.next();
+               listener.handleConnectionException(null, client);
+               log.debug("notified " + listener + " of connection lost to: " + clientSessionId);
             }
          }
       }
@@ -66,9 +68,9 @@
    {
       try
       {
-         if(log.isTraceEnabled())
+//         if(log.isTraceEnabled())
          {
-            log.trace("Client disconnected (session id = " + clientSessionId);
+            log.debug(this + " Client disconnected (session id = " + clientSessionId);
          }
          Client client = new Client(new InvokerLocator(locatorURL), requestPayload);
          client.setSessionId(clientSessionId);

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ConnectionValidator.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -22,11 +22,11 @@
 
 package org.jboss.remoting;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -230,7 +230,7 @@
    private Map metadata;
    private InvokerLocator locator;
    private Map configMap;
-   private List listeners;
+   private Map listeners;
    private ClientInvoker clientInvoker;
    private Object lock = new Object();
    private Object notificationLock = new Object();
@@ -243,6 +243,8 @@
    private int failureDisconnectTimeout = -1;
    private boolean isValid;
    private Timer timer;
+   private MicroRemoteClientInvoker sharedInvoker;
+   private LeasePinger leasePinger;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -254,8 +256,9 @@
    public ConnectionValidator(Client client, long pingPeriod)
    {
       this.client = client;
+      this.locator = client.getInvoker().getLocator();
       this.pingPeriod = pingPeriod;
-      listeners = new ArrayList();
+      listeners = new HashMap();
       stopped = false;
       getParameters(client, new HashMap());
       log.debug(this + " created");
@@ -264,8 +267,9 @@
    public ConnectionValidator(Client client, Map metadata)
    {
       this.client = client;
+      this.locator = client.getInvoker().getLocator();
       pingPeriod = DEFAULT_PING_PERIOD;
-      listeners = new ArrayList();
+      listeners = new HashMap();
       stopped = false;
       this.metadata = new HashMap(metadata);
       getParameters(client, metadata);
@@ -367,36 +371,79 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   public void addConnectionListener(ConnectionListener listener)
+   public boolean addConnectionListener(Client client, ConnectionListener listener)
    {
+      boolean doStart = false;
       if (listener != null)
       {
          synchronized (lock)
          {
+            if (stopped)
+            {
+               log.debug(this + " is stopped. Cannot add ConnectionListener: " + listener + " for " + client);
+               return false;
+            }
             if (listeners.size() == 0)
             {
-               start();
+               doStart = true;
             }
-            listeners.add(listener);
+            Set s = (Set) listeners.get(listener);
+            if (s == null)
+            {
+               s = new HashSet();
+            }
+            s.add(client);
+            listeners.put(listener, s);
+            log.debug(this + " added ConnectionListener: " + listener + " for " + client);
          }
+         if (doStart)
+         {
+            start();
+         }
       }
+      
+      return true;
    }
 
-   public boolean removeConnectionListener(ConnectionListener listener)
+   public boolean removeConnectionListener(Client client, ConnectionListener listener)
    {
-      boolean isRemoved = false;
-      if (listener != null)
+      if (listener == null)
       {
-         synchronized (lock)
+         log.debug(this + " ConnectionListener is null");
+         return false;
+      }
+      synchronized (lock)
+      {
+         if (stopped)
          {
-            isRemoved = listeners.remove(listener);
-            if (listeners.size() == 0)
-            {
-               stop();
-            }
+            log.debug(this + " is stopped. It's too late to remove " + listener);
+            return false;
          }
+         Set s = (Set) listeners.get(listener);
+         if (s == null)
+         {
+            log.debug(this + ": " + listener + " is not registered");
+            return false;
+         }
+         if (s.remove(client))
+         {
+            log.debug(this + " removed ConnectionListener: " + listener + " for " + client);
+         }
+         else
+         {
+            log.debug(this + ": " + listener + " is not registered for " + client);
+            return false;
+         }
+         if (s.size() == 0)
+         {
+            listeners.remove(listener);
+         }
+         if (listeners.size() == 0)
+         {
+            stop();
+         }
       }
-      return isRemoved;
+      return true;
    }
 
    public long getPingPeriod()
@@ -413,6 +460,11 @@
    {
       return "ConnectionValidator[" + Integer.toHexString(System.identityHashCode(this)) + ":" + clientInvoker + ", pingPeriod=" + pingPeriod + " ms]";
    }
+   
+   public boolean isStopped()
+   {
+      return stopped;
+   }
 
    // Package protected ----------------------------------------------------------------------------
 
@@ -434,6 +486,14 @@
       {
          throw new RuntimeException("creating a ConnectionValidator on a local connection");
       }
+      if (stopLeaseOnFailure)
+      {
+         sharedInvoker = (MicroRemoteClientInvoker) client.getInvoker();
+         if (sharedInvoker != null)
+         {
+            leasePinger = sharedInvoker.getLeasePinger();
+         }
+      }
    }
    
    private void getParametersFromMap(Map config)
@@ -505,8 +565,17 @@
                " to a boolean: must be a String");
             }
          }
-         
+         ClientInvoker invoker = client.getInvoker();
+         if (invoker == null)
+         {
+            log.debug(this + " client invoker == null");
+         }
+         else
+         {
+            log.debug(this + " InvokerLocator: " + invoker.getLocator());
+         }
          o = config.get(FAILURE_DISCONNECT_TIMEOUT);
+         log.debug(this + " \"failureDisconnectTimeout\" set to " + o);
          if (o != null)
          {
             if (o instanceof String)
@@ -514,6 +583,7 @@
                try
                {
                   failureDisconnectTimeout = Integer.valueOf(((String) o)).intValue();
+                  log.debug(this + " setting failureDisconnectTimeout to " + failureDisconnectTimeout);
                }
                catch (Exception e)
                {
@@ -537,7 +607,6 @@
       log.debug(this + " timeout: " + pingTimeout);
       log.debug(this + " ping retries: " + configMap.get("NumberOfCallRetries"));
       log.debug(this + " connection retries: " + configMap.get("NumberOfRetries"));
-      locator = client.getInvoker().getLocator();
 
       try
       {
@@ -555,7 +624,14 @@
          clientInvoker.connect();
       }
 
+      try
+      {
       TimerUtil.schedule(this, pingPeriod);
+      }
+      catch (Exception e)
+      {
+         log.error(this + " unable to schedule on TimerUtil", e);
+      }
       started = true;
       timer = new Timer(true);
       log.debug(this + " started");
@@ -623,6 +699,7 @@
 
    private boolean doStop()
    {
+      log.debug("entering doStop()");
       synchronized(lock)
       {
          if (stopped)
@@ -659,22 +736,33 @@
          {
             return;
          }
-         ListIterator itr = listeners.listIterator();
+         stopped = true;
+         log.debug(this + " is stopped.  No more listeners will be accepted.");
+         
+         Iterator itr = listeners.keySet().iterator();
          while (itr.hasNext())
          {
             final ConnectionListener listener = (ConnectionListener) itr.next();
-            new Thread()
+            Set clients = (Set) listeners.get(listener);
+            Iterator itr2 = clients.iterator();
+            while (itr2.hasNext())
             {
-               public void run()
+               final Client client  = (Client) itr2.next();
+               new Thread()
                {
-                  log.debug(this + " calling " + listener + ".handleConnectionException()");
-                  listener.handleConnectionException(t, client);
-               }
-            }.start();
+                  public void run()
+                  {
+                     log.debug(this + " calling " + listener + ".handleConnectionException() for " + client);
+                     listener.handleConnectionException(t, client);
+                  }
+               }.start();
+            }
          }
+         
+         listeners.clear();
       }
+      
       stop();
-      listeners.clear();
    }
 
    // Inner classes --------------------------------------------------------------------------------
@@ -715,19 +803,23 @@
             
             if (stopLeaseOnFailure)
             {
-               log.debug(this + " detected connection failure: stopping LeasePinger");
-               MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker) client.getInvoker();
-               
-               if (invoker != null)
+               log.debug(ConnectionValidator.this + " detected connection failure: stopping LeasePinger");
+//               MicroRemoteClientInvoker invoker = (MicroRemoteClientInvoker) client.getInvoker();
+//               
+//               if (invoker != null)
+//               {
+               if (leasePinger == null)
                {
+                  leasePinger = sharedInvoker.getLeasePinger();
+               }
+               log.debug(ConnectionValidator.this + " shutting down lease pinger: " + leasePinger);
                   int disconnectTimeout = (failureDisconnectTimeout == -1) ? client.getDisconnectTimeout() : failureDisconnectTimeout;
-                  invoker.terminateLease(null, disconnectTimeout);
-                  log.debug(ConnectionValidator.this + " shut down lease pinger");
-               }
-               else
-               {
-                  log.debug(ConnectionValidator.this + " unable to shut down lease pinger: client must have shut down");
-               }
+                  sharedInvoker.terminateLease(null, disconnectTimeout);
+//               }
+//               else
+//               {
+//                  log.debug(ConnectionValidator.this + " unable to shut down lease pinger: " + leasePinger + ". Client must have shut down");
+//               }
                
                cancel();
             }

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Lease.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -25,6 +25,7 @@
 import org.jboss.remoting.util.TimerUtil;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.TimerTask;
@@ -48,6 +49,9 @@
    private long leaseWindow = -1;
    private long pingStart = -1;
    private Map clientLeases = null;
+   private Object lock = new Object();
+   private String leasePingerId;
+   private boolean stopped;
 
    private boolean leaseUpdated = false;
 
@@ -64,7 +68,9 @@
       if(requestPayload != null)
       {
          this.requestPayload = (Map)requestPayload.get(ClientHolder.CLIENT_HOLDER_KEY);
+         this.leasePingerId = (String) requestPayload.get(LeasePinger.LEASE_PINGER_ID);
       }
+      log.debug("leasePingerId: " + leasePingerId);
       this.leaseWindow = leasePeriod * 2;
       this.clientLeases = clientLeases;
    }
@@ -72,9 +78,9 @@
 
    public void startLease()
    {
-      if(isTraceEnabled)
+      if(true)
       {
-         log.trace("Starting lease for client invoker (session id = " + clientSessionId + ") with lease window time of " + leaseWindow);
+         log.debug("Starting lease for client invoker (session id = " + clientSessionId + ") with lease window time of " + leaseWindow);
       }
       leaseTimerTask = new LeaseTimerTask();
       TimerUtil.schedule(leaseTimerTask, leaseWindow);
@@ -84,8 +90,28 @@
    {
       if(requestMap != null)
       {
-         this.requestPayload = (Map)requestMap.get(ClientHolder.CLIENT_HOLDER_KEY);
+         synchronized (lock)
+         {
+            this.requestPayload = (Map)requestMap.get(ClientHolder.CLIENT_HOLDER_KEY);
+
+            log.debug(this + " updating: new Client list:");
+            Collection clientHoldersCol = requestPayload.values();
+            Iterator itr = clientHoldersCol.iterator();
+            while (itr.hasNext())
+            {
+               Object val = itr.next();
+               if (val != null && val instanceof ClientHolder)
+               {
+                  ClientHolder clientHolder = (ClientHolder) val;
+                  log.debug(this + "  " + clientHolder.getSessionId());
+               }
+            }
+         }
       }
+      else
+      {
+         log.debug(this + " requestPayload == null");
+      }
       updateLease(leasePeriod);
    }
 
@@ -124,13 +150,14 @@
 
    public void terminateLease(String sessionId)
    {
-      if(isTraceEnabled)
-      {
-         log.trace("Terminating lease for session id " + sessionId);
-      }
       // is this terminate for all clients
       if (clientSessionId.equals(sessionId))
       {
+         if(true)
+         {
+            log.debug(this + " Terminating lease group for session id " + sessionId);
+         }
+         
          stopLease();
          // should be ok to call this will null as all the client should have
          // already been disconnected and there been a notification for each
@@ -140,33 +167,101 @@
       }
       else
       {
+         if(true)
+         {
+            log.debug(this + " Terminating individual lease for session id " + sessionId);
+         }
          notifyClientTermination(sessionId);
       }
    }
-
+   
+   public void terminateLeaseUponFailure(String sessionId)
+   {
+      // is this terminate for all clients
+      if (clientSessionId.equals(sessionId))
+      {
+         if(true)
+         {
+            log.debug(this + " Terminating lease group for session id " + sessionId);
+         }
+         
+         stopLease();
+         // should be ok to call this will null as all the client should have
+         // already been disconnected and there been a notification for each
+         // of these client disconnections (which would remove the client from
+         // the lease, thus leaving the collection empty
+         notifyClientLost();
+      }
+      else
+      {
+         if(true)
+         {
+            log.warn(this + " Expected invoker session id: " + sessionId);
+         }
+         notifyClientLost();
+      }
+   }
+   
+   public String toString()
+   {
+      String hash = Integer.toHexString(System.identityHashCode(this));
+      return "Lease[" + hash + ":" + clientSessionId + ":" + leasePingerId + "]";
+   }
+   
    private void notifyClientTermination(String sessionId)
    {
-      // is for a particular client, so need to inspect request payload for client
-      if (requestPayload != null)
+      Map localRequestPayload = null;
+      synchronized (lock)
       {
+         if (requestPayload != null)
+         {  
+            localRequestPayload = new HashMap(requestPayload);
+            if (sessionId != null)
+            {
+               requestPayload.remove(sessionId);
+            }
+         }
+      }
+      
+      if (localRequestPayload != null)
+      {  
          // should notify for one client or all?
          if (sessionId != null)
          {
-            Object clientHolderObj = requestPayload.remove(sessionId);
+            synchronized (lock)
+            {
+               if (stopped)
+               {
+                  log.debug(this + " already stopped");
+                  return;
+               }
+            }
+            
+            Object clientHolderObj =  localRequestPayload.get(sessionId);
             if (clientHolderObj != null && clientHolderObj instanceof ClientHolder)
             {
                ClientHolder clientHolder = (ClientHolder) clientHolderObj;
                notifier.connectionTerminated(locatorURL, clientHolder.getSessionId(), clientHolder.getConfig());
-               if(isTraceEnabled)
+               if(true)
                {
-                  log.trace("Notified connection listener of lease termination due to disconnect from client (client session id = " + clientHolder.getSessionId());
+                  log.debug(this + " Notified connection listener of lease termination due to disconnect from client (client session id = " + clientHolder.getSessionId());
                }
             }
          }
          else
          {
+            synchronized (lock)
+            {
+               if (stopped)
+               {
+                  log.debug(this + " already stopped");
+                  return;
+               }
+               stopped = true;
+            }
+            
             // loop through and notify for all clients
-            Collection clientHoldersCol = requestPayload.values();
+            Collection clientHoldersCol = localRequestPayload.values();
             if (clientHoldersCol != null && clientHoldersCol.size() > 0)
             {
                Iterator itr = clientHoldersCol.iterator();
@@ -177,9 +272,9 @@
                   {
                      ClientHolder clientHolder = (ClientHolder) val;
                      notifier.connectionTerminated(locatorURL, clientHolder.getSessionId(), clientHolder.getConfig());
-                     if(isTraceEnabled)
+                     if(true)
                      {
-                        log.trace("Notified connection listener of lease termination due to disconnect from client (client session id = " + clientHolder.getSessionId());
+                        log.debug(this + " Notified connection listener of lease termination due to disconnect from client (client session id = " + clientHolder.getSessionId());
                      }
                   }
                }
@@ -188,17 +283,32 @@
       }
       else
       {
-         log.warn("Tried to terminate lease for session id " + sessionId + ", but no collection of clients have been set.");
+         log.warn(this + " Tried to terminate lease for session id " + sessionId + ", but no collection of clients have been set.");
       }
    }
 
    private void notifyClientLost()
    {
-      // is not for a particular client (but all clients associated with client invoker), so need to inspect request payload for client
-      if (requestPayload != null)
+      Map localRequestPayload = null;
+      synchronized (lock)
       {
+         if (stopped)
+         {
+            log.debug(this + " already stopped");
+            return;
+         }
+         stopped = true;
+         if (requestPayload != null)
+         {  
+            localRequestPayload = new HashMap(requestPayload);
+         }
+      }
+      
+      if (localRequestPayload != null)
+      {
          // loop through and notify for all clients
-         Collection clientHoldersCol = requestPayload.values();
+         Collection clientHoldersCol = localRequestPayload.values();
+         log.debug(this + " notifying listeners about " + clientHoldersCol.size() + " expired client(s)");
          if (clientHoldersCol != null && clientHoldersCol.size() > 0)
          {
             Iterator itr = clientHoldersCol.iterator();
@@ -209,9 +319,9 @@
                {
                   ClientHolder clientHolder = (ClientHolder) val;
                   notifier.connectionLost(locatorURL, clientHolder.getSessionId(), clientHolder.getConfig());
-                  if(isTraceEnabled)
+                  if(true)
                   {
-                     log.trace("Notified connection listener of lease expired due to lost connection from client (client session id = " + clientHolder.getSessionId());
+                     log.debug(this + " Notified connection listener of lease expired due to lost connection from client (client session id = " + clientHolder.getSessionId());
                   }
                }
             }
@@ -219,11 +329,16 @@
       }
       else
       {
-         log.debug("requestPayload == null, calling ConnectionNotifier.connectionTerminated()");
-         notifier.connectionTerminated(locatorURL, clientSessionId, null);
+         log.debug(this + " requestPayload == null, calling ConnectionNotifier.connectionLost()");
+         notifier.connectionLost(locatorURL, clientSessionId, null);
       }
    }
 
+   protected String getLeasePingerId()
+   {
+      return leasePingerId;
+   }
+
    private void stopLease()
    {
       leaseTimerTask.cancel();
@@ -245,14 +360,14 @@
          {
             try
             {
-               if (log.isTraceEnabled()) log.trace("did not receive ping: " + clientSessionId);
+               if (true) log.debug(Lease.this + " did not receive ping: " + clientSessionId);
                stopLease();
                notifyClientLost();
                if (clientLeases != null)
                {
                   clientLeases.remove(clientSessionId);
                }
-               if (log.isTraceEnabled()) log.trace("removed lease:" + clientSessionId);
+               if (true) log.debug(Lease.this + " removed lease:" + clientSessionId);
             }
             catch (Throwable thr)
             {

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/LeasePinger.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -2,6 +2,7 @@
 
 import org.jboss.logging.Logger;
 import org.jboss.remoting.transport.ClientInvoker;
+import org.jboss.util.id.GUID;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -25,6 +26,7 @@
 
    public static final long DEFAULT_LEASE_PERIOD = 5000;
    public static final int DEFAULT_DISCONNECT_TIMEOUT = -1;
+   public static final String LEASE_PINGER_ID = "leasePingerId";
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -44,6 +46,8 @@
 
    private long pingPeriod = -1;
    private int disconnectTimeout = DEFAULT_DISCONNECT_TIMEOUT;
+   
+   private String leasePingerId;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -100,6 +104,24 @@
             e.initCause(throwable);
             throw e;
          }
+         
+//         if (trace) 
+         {
+            log.debug(this + " shut down");
+            if (!clients.isEmpty())
+            {
+               log.debug(this + " " + clients.size() + " remaining clients:");
+               Iterator it = clients.keySet().iterator();
+               while (it.hasNext())
+               {
+                  log.debug(this + ": " + it.next());
+               }
+            }
+            else
+            {
+               log.debug(this + " No remaining clients");
+            }
+         }
       }
    }
 
@@ -138,7 +160,9 @@
 
       if(trace) { log.trace(this + " removing client with session ID " + sessionID); }
 
-      ClientHolder holder = (ClientHolder)clients.remove(sessionID);
+      // Don't remove holder until after client has been removed from server side Lease, to
+      // avoid a race with LeaseTimerTask sending a PING without the Client being removed.
+      ClientHolder holder = (ClientHolder)clients.get(sessionID);
       
       if (holder != null)
       {
@@ -166,11 +190,13 @@
             log.warn(this + " failed sending disconnect for client lease for " +
                   "client with session ID " + sessionID);
          }
+         
+         clients.remove(sessionID);
       }
       else
       {
-         log.warn(this + " tried to remove lease for client with session ID " + sessionID +
-         ", but no such lease was found");
+         log.debug(this + " tried to remove lease for client with session ID " + sessionID +
+                   ", but no such lease was found: probably it was registered with an older LeasePinger");
       }
       
       if (clients.isEmpty())
@@ -233,7 +259,7 @@
 
    public String toString()
    {
-      return "LeasePinger[" + invoker + "(" + invokerSessionID + ")]";
+      return "LeasePinger[" + leasePingerId + ":" + invoker + "(" + invokerSessionID + ")]";
    }
 
    // Package protected ----------------------------------------------------------------------------
@@ -252,6 +278,16 @@
       log.debug(this + " setting disconnect timeout to: " + disconnectTimeout);
    }
    
+   protected String getLeasePingerId()
+   {
+      return leasePingerId;
+   }
+
+   protected void setLeasePingerId(String leasePingerId)
+   {
+      this.leasePingerId = leasePingerId;
+   }
+   
    // Private --------------------------------------------------------------------------------------
 
    private void sendClientPing()
@@ -277,10 +313,9 @@
          Map clientsClone = new ConcurrentHashMap(clients);
          Map requestClients = new ConcurrentHashMap();
          requestClients.put(ClientHolder.CLIENT_HOLDER_KEY, clientsClone);
-
-         InvocationRequest ir =
-            new InvocationRequest(invokerSessionID, null, "$PING$", requestClients, null, null);
-
+         requestClients.put(LeasePinger.LEASE_PINGER_ID, leasePingerId);
+         
+         InvocationRequest ir = new InvocationRequest(invokerSessionID, null, "$PING$", requestClients, null, null);
          invoker.invoke(ir);
 
          if(trace) { log.trace(this + " successfully pinged the server"); }

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/MicroRemoteClientInvoker.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -345,6 +345,7 @@
             
             if (sessionId == null)
             {
+               log.debug(this + " shutting down LeasePinger: " + leasePinger);
                // Independent of any particular Client - force LeasePinger shutdown.
                // Should be called only if there is a reasonable belief that the lease
                // has already stopped on the server side.
@@ -361,9 +362,11 @@
             else
             {
                // Remove a particular Client.
+               log.debug(this + " removing client " + sessionId + " from LeasePinger: " + leasePinger);
                boolean isLastClientLease = leasePinger.removeClient(sessionId);
                if(isLastClientLease)
                {
+                  log.debug(this + " shutting down LeasePinger, " + sessionId + " was last client lease: " + leasePinger);
                   try
                   {
                      leasePinger.stopPing();
@@ -376,6 +379,10 @@
                }
             }
          }
+         else
+         {
+            log.debug(this + " leasePinger is null: must have been shut down already");
+         }
       }
    }
 
@@ -413,8 +420,11 @@
             // configuration should NOT be passed as want ping to be specific to client invoker
             // and NOT to the client.
 
-            InvocationRequest ir =
-               new InvocationRequest(invokerSessionID, null, "$PING$", null, new HashMap(), null);
+            String leasePingerId = new GUID().toString();
+            Map requestMap = new HashMap();
+            requestMap.put(LeasePinger.LEASE_PINGER_ID, leasePingerId);
+            log.info(this + " initiating lease for leasePingerId " + leasePingerId);
+            InvocationRequest ir = new InvocationRequest(invokerSessionID, null, "$PING$", requestMap, new HashMap(), null);
 
             Object ret = invoke(ir);
 
@@ -441,6 +451,7 @@
                   if(trace) { log.trace("server does have leasing enabled (with default lease period of " + defaultLeasePeriod + ") and will start a new lease pinger."); }
 
                   leasePinger = new LeasePinger(this, invokerSessionID, defaultLeasePeriod);
+                  leasePinger.setLeasePingerId(leasePingerId);
                   leasePinger.addClient(clientSessionID, configuration, leasePeriod);
                   leasePinger.startPing();
                }
@@ -579,4 +590,11 @@
       super.finalize();
    }
 
+   protected LeasePinger getLeasePinger()
+   {
+      synchronized(clientLeaseLock)
+      {
+         return leasePinger;
+      }
+   }
 }

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Remoting.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -76,4 +76,12 @@
     * org.jboss.remoting.ServerInvoker.InvalidStateException to an org.jboss.remoting.CannotConnectException.
     */
    public static final String CHANGE_INVALID_STATE_TO_CANNOT_CONNECT = "changeInvalidStateToCannotConnect";
+   
+   /**
+    * Flags indicating that connection monitoring should treat a connection as being defined
+    * by one or two of its endpoints.  I.e., if a client invoker or server invoker stops and restarts, then
+    * all connections it participated in are now gone.
+    */
+   public static final String USE_CLIENT_CONNECTION_IDENTITY = "useClientConnectionIdentity";
+   public static final String USE_SERVER_CONNECTION_IDENTITY = "useServerConnectionIdentity";
 }

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/ServerInvoker.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -262,6 +262,8 @@
    protected ServerSocketFactory serverSocketFactory = null;
    
    protected boolean registerCallbackListeners = true;
+   
+   protected boolean useClientConnectionIdentity;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -702,6 +704,16 @@
       return (ServerInvocationHandler) handlers.get(subsystem.toUpperCase());
    }
 
+   protected boolean isUseClientConnectionIdentity()
+   {
+      return useClientConnectionIdentity;
+   }
+
+   protected void setUseClientConnectionIdentity(boolean useClientConnectionIdentity)
+   {
+      this.useClientConnectionIdentity = useClientConnectionIdentity;
+   }
+
    public Object invoke(Object invoke) throws IOException
    {
       InvocationRequest request = null;
@@ -798,6 +810,7 @@
 
          if ("$DISCONNECT$".equals(param))
          {
+            log.debug(this + " got $DISCONNECT$");
             if (leaseManagement)
             {
                terminateLease(invocation);
@@ -1177,6 +1190,13 @@
          }
       }
       
+      // config for useClientConnectionIdentity
+      String useClientConnectionIdentityString = (String)config.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+      if(useClientConnectionIdentityString != null)
+      {
+         useClientConnectionIdentity = Boolean.parseBoolean(useClientConnectionIdentityString);
+      }
+      
       // Inject ConnectionListener
       String connectionListener = (String)config.get(CONNECTION_LISTENER);
       if (connectionListener != null)
@@ -1694,6 +1714,7 @@
    {
       if (invocation != null)
       {
+         // clientSessionId == MicroRemoteClientInvoker.invokerSessionID.
          String clientSessionId = invocation.getSessionId();
          Lease clientLease = (Lease)clientLeases.get(clientSessionId);
 
@@ -1711,9 +1732,9 @@
                {
                   // just a client that disconnected, so only need to terminate lease for
                   // that particular client (by client session id).
-            	  if (trace) log.trace("terminating client lease: " + clientSessionId);
                   ClientHolder holder = (ClientHolder) holderObj;
                   clientLease.terminateLease(holder.getSessionId());
+                  if (true) log.debug("terminating client lease: " + clientSessionId + ":" + holder.getSessionId());
                   clientOnlyTerminated = true;
                }
             }
@@ -1721,7 +1742,7 @@
             // now see if client invoker needs to be terminated
             if (!clientOnlyTerminated)
             {
-               if (trace) log.trace("terminating invoker lease: " + clientSessionId);
+               if (true) log.debug("terminating invoker lease: " + clientLease);
                clientLease.terminateLease(clientSessionId);
                clientLeases.remove(clientSessionId);
             }
@@ -1729,7 +1750,7 @@
          else
          {
              String type = "invoker";
-        	 Map reqMap = invocation.getRequestPayload();
+             Map reqMap = invocation.getRequestPayload();
              if (reqMap != null)
              {
                 Object holderObj = reqMap.get(ClientHolder.CLIENT_HOLDER_KEY);
@@ -1738,8 +1759,9 @@
                 	type = "client";
                 }
              }
-             log.warn("Asked to terminate " + type + " lease for client session id " + clientSessionId +
-                       ", but lease for this id could not be found." + ": " + clientLeases);
+             log.debug("Asked to terminate " + type + " lease for invoker session id "
+                       + clientSessionId + ", but lease for this id could not be found." +"" +
+                       "Probably has been removed due to connection failure.");
          }
       }
    }
@@ -1751,7 +1773,7 @@
          String clientSessionId = invocation.getSessionId();
          if(clientSessionId != null)
          {
-            if(trace) { log.trace("Getting lease for client session id: " + clientSessionId); }
+            if(trace) { log.trace("Getting lease for invoker session id: " + clientSessionId); }
 
             Lease clientLease = (Lease)clientLeases.get(clientSessionId);
             if(clientLease == null)
@@ -1764,15 +1786,48 @@
 
                clientLeases.put(clientSessionId, newClientLease);
                newClientLease.startLease();
-
-               if(trace) { log.trace("No lease established for client session id (" + clientSessionId + "), so starting a new one."); }
+               
+               if(true) { log.debug("No lease established for invoker session id (" + clientSessionId + 
+                                    "), so starting a new one:" + newClientLease); }
             }
             else
             {
-               // including request payload from invocation as may contain updated list of clients.
-               clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
+               if (useClientConnectionIdentity)
+               {
+                  String leasePingerId = (String) invocation.getRequestPayload().get(LeasePinger.LEASE_PINGER_ID);;
+                  if (leasePingerId.equals(clientLease.getLeasePingerId()))
+                  {
+                     // including request payload from invocation as may contain updated list of clients.
+                     log.debug(clientLease + " matches: leasePingerId: " + leasePingerId);
+                     clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
+                     if(trace) { log.trace("Updated lease for invoker session id (" + clientSessionId + ")"); }
+                  }
+                  else
+                  {
+                     log.debug(clientLease + " does not match: leasePingerId: " + leasePingerId);
+                     if (true) log.debug("terminating invoker lease: " + clientLease);
+                     clientLease.terminateLeaseUponFailure(clientSessionId);
+                     clientLeases.remove(clientSessionId);
 
-               if(trace) { log.trace("Updated lease for client session id (" + clientSessionId + ")"); }
+                     Lease newClientLease = new Lease(clientSessionId, leasePeriod,
+                           locator.getLocatorURI(),
+                           invocation.getRequestPayload(),
+                           connectionNotifier,
+                           clientLeases);
+
+                     clientLeases.put(clientSessionId, newClientLease);
+                     newClientLease.startLease();
+
+                     if(true) { log.debug("starting a new lease:" + newClientLease); }
+                  }
+               }
+               else
+               {
+                  // including request payload from invocation as may contain updated list of clients.
+                  clientLease.updateLease(leasePeriod, invocation.getRequestPayload());
+
+                  if(trace) { log.trace("Updated lease for client session id (" + clientSessionId + ")"); }
+               }
             }
          }
       }
@@ -1782,7 +1837,7 @@
    {
       if(leaseManagement && invokerSessionId != null)
       {
-         if(trace) { log.trace("Checking lease for client session id: " + invokerSessionId); }
+         if(trace) { log.trace("Checking lease for invoker session id: " + invokerSessionId); }
 
          Lease clientLease = (Lease)clientLeases.get(invokerSessionId);
          if(clientLease == null)

Modified: remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java
===================================================================
--- remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java	2009-04-30 05:27:56 UTC (rev 5093)
+++ remoting2/branches/2.2.2-SP11_JBREM-1112/src/main/org/jboss/remoting/Version.java	2009-04-30 21:32:26 UTC (rev 5094)
@@ -32,7 +32,7 @@
    public static final byte VERSION_2 = 2;
    public static final byte VERSION_2_2 = 22;
 
-   public static final String VERSION = "2.2.2.SP11";
+   public static final String VERSION = "2.2.2.SP11_JBREM-1112:042409";
    private static final byte byteVersion = VERSION_2_2;
    private static byte defaultByteVersion = byteVersion;
    private static boolean performVersioning = true;




More information about the jboss-remoting-commits mailing list