[jboss-remoting-commits] JBoss Remoting SVN: r5214 - remoting2/branches/2.x/src/main/org/jboss/remoting.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu May 14 21:34:55 EDT 2009


Author: ron.sigal at jboss.com
Date: 2009-05-14 21:34:54 -0400 (Thu, 14 May 2009)
New Revision: 5214

Modified:
   remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java
Log:
JBREM-1132: (1) Added new connect() and notifyListeners(); (2) Clients can share ConnectionValidators; (3) dialed down logging.

Modified: remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java
===================================================================
--- remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java	2009-05-13 08:01:07 UTC (rev 5213)
+++ remoting2/branches/2.x/src/main/org/jboss/remoting/Client.java	2009-05-15 01:34:54 UTC (rev 5214)
@@ -167,13 +167,20 @@
    public static final String INVOKER_DESTRUCTION_DELAY = "invokerDestructionDelay";
    
    public static final String THROW_CALLBACK_EXCEPTION = "throwCallbackException";
+   
+   private static Map connectionValidators = new HashMap();
+   private static Object connectionValidatorLock = new Object();
 
+   static final String CLIENT = "client";
+   static final String CONNECTION_LISTENER = "connectionListener";
+   
    /** The key to use to specify that parameters for objects created by Client should be taken,
     *  in addition to the metadata map, from the InvokerLocator and from the Client's configuration map.
     */
    public static final String USE_ALL_PARAMS = "useAllParams";
    
    private static final Logger log = Logger.getLogger(Client.class);
+   private static boolean trace = log.isTraceEnabled();
 
    private static final long serialVersionUID = 5679279425009837934L;
    
@@ -198,6 +205,7 @@
    private InvokerLocator locator;
 
    private ConnectionValidator connectionValidator = null;
+   private ConnectionValidatorKey connectionValidatorKey;
    private Map configuration = new HashMap();
 
    private Map callbackConnectors = new HashMap();
@@ -213,6 +221,10 @@
    
    private int invokerDestructionDelay = 0;
 
+   private Set connectionListeners = new HashSet();
+   
+   private boolean useClientConnectionIdentity;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    /**
@@ -420,7 +432,7 @@
       if (invoker == null)
       {
          throw new RuntimeException("Can not add connection listener to remoting client " +
-                                    "until client has been connected.");
+                                    "while client is not connected.");
       }
       else
       {
@@ -431,11 +443,57 @@
          }
       }
 
-      if (connectionValidator == null)
+      synchronized (connectionValidatorLock)
       {
-         connectionValidator = new ConnectionValidator(this, metadata);
+         if (trace) log.trace(this + " in addConnectionListener()");
+         if (connectionValidator == null)
+         {
+            Map map = new HashMap(configuration);
+            map.putAll(metadata);
+            connectionValidatorKey = new ConnectionValidatorKey(invoker, map);
+            WeakReference ref = (WeakReference) connectionValidators.get(connectionValidatorKey);
+            if (ref == null)
+            {
+               connectionValidator = new ConnectionValidator(this, metadata);
+               connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+               connectionValidator.addConnectionListener(this, listener);
+               if (trace) log.trace(this + ": created " + connectionValidator);
+            }
+            else
+            {
+               connectionValidator = (ConnectionValidator) ref.get();
+               if (connectionValidator.addConnectionListener(this, listener))
+               {
+                  if (trace) log.trace(this + ": reusing from static table:  " + connectionValidator);                  
+               }
+               else
+               {
+                  if (trace) log.trace(this + ": unable to reuse existing ConnectionValidator in static map: " + connectionValidator);
+                  connectionValidator = new ConnectionValidator(this, metadata);
+                  connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+                  connectionValidator.addConnectionListener(this, listener);
+                  if (trace) log.trace(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
+               }
+            }
+         }
+         else
+         {
+            if (connectionValidator.addConnectionListener(this, listener))
+            {
+               if (trace) log.trace(this + ": reusing from local reference: " + connectionValidator);                  
+            }
+            else
+            {
+               if (trace) log.trace(this + ": unable to reuse ConnectionValidator from local reference: " + connectionValidator);
+               connectionValidator = new ConnectionValidator(this, metadata);
+               connectionValidators.put(connectionValidatorKey, new WeakReference(connectionValidator));
+               connectionValidator.addConnectionListener(this, listener);
+               if (trace) log.trace(this + ": current ConnectionValidator is stopped: created " + connectionValidator);
+            }
+         }
+         
+         connectionListeners.add(listener);
       }
-      connectionValidator.addConnectionListener(listener);
    }
 
    /**
@@ -444,11 +502,36 @@
     */
    public boolean removeConnectionListener(ConnectionListener listener)
    {
-      if (connectionValidator == null)
+      if (trace) log.trace(this + ".removeConnectionListener(" + listener + ")");
+      boolean isRemoved = false;
+      synchronized (connectionValidatorLock)
       {
-         return false;
+         if (connectionValidator == null)
+         {
+            return false;
+         }
+         isRemoved = connectionValidator.removeConnectionListener(this, listener);
+         if (connectionValidator.isStopped())
+         {
+            if (connectionValidators.remove(connectionValidatorKey) != null)
+            {
+               log.debug(this + ".removeConnectionListener() removed from static map: " + connectionValidator);
+            }
+            connectionValidator = null;
+            connectionValidatorKey = null;
+         }
+         connectionListeners.remove(listener);
+         if (connectionListeners.isEmpty())
+         {
+            connectionValidator = null;
+            connectionValidatorKey = null;
+         }
+         if (connectionValidator == null)
+         {
+            if (trace) log.trace(this + " set connectionValidator to null");
+         }
       }
-      return connectionValidator.removeConnectionListener(listener);
+      return isRemoved;
    }
 
    /**
@@ -494,6 +577,41 @@
     */
    public void connect() throws Exception
    {
+       connect(null, null);
+   }
+   
+   /**
+    * Will cause the underlying transport to make connection to the target server.  This is
+    * important for any stateful transports, like socket or multiplex. This is also when a client
+    * lease with the server is started.  If listener is not null, it will be registered to
+    * receive a callback if the connection fails.
+    */
+   public void connect(ConnectionListener listener) throws Exception
+   {
+       connect(listener, null);
+   }
+   
+   /**
+    * Will cause the underlying transport to make connection to the target server.  This is
+    * important for any stateful transports, like socket or multiplex. This is also when a client
+    * lease with the server is started.  If listener is not null, it will be registered to
+    * receive a callback if the connection fails.
+    * <p>
+    * 
+    * If this version of connect() is used, and leasing is enabled, the concept of "connection
+    * identity" is enforced.  That is, the ConnectionValidator used by this Client will be
+    * tied to the LeasePinger currently used by the MicroRemoteClientInvoker created or reused
+    * in this method, and that LeasePinger will be tied to this Client and its ConnectionValidator.
+    * If the ConnectionValidator used by any of the Clients associated with the MicroRemoteClientInvoker
+    * used by this Client detects a broken connection, it will shut down that LeasePinger.
+    * Moreover, each ConnectionValidator associated with that LeasePinger will notify its
+    * ConnectionListeners of the broken connection.  At that point, the LeasePinger will be
+    * destroyed, and all of the associated Clients will be disconnected. 
+    */
+   public void connect(ConnectionListener listener, Map metadata) throws Exception
+   {
+      log.debug(this + ".connect(" + listener + ")");
+      if (trace) log.trace(this + ": metadata = " + metadata);
       if (isConnected())
          return;
 
@@ -512,9 +630,10 @@
          invoker = InvokerRegistry.createClientInvoker(locator, configuration);
       }
 
-      connect(invoker);
+      connect(invoker, listener, metadata);
 
       connected = true;
+      log.debug(this + " is connected");
    }
 
    /**
@@ -525,17 +644,15 @@
     */
    public void disconnect()
    {
+      if (trace) log.trace(this + " entering disconnect()");
+      
+      connected = false;
+      
       if (invoker != null)
       {
          // this is a noop if no lease is active
          invoker.terminateLease(sessionId, disconnectTimeout);
-
-         if (connectionValidator != null)
-         {
-            connectionValidator.stop();
-            connectionValidator = null;
-         }
-
+         
          // Need to remove myself from registry so will not keep reference to me since I am of no
          // use now. Will have to create a new one.
 
@@ -560,7 +677,7 @@
                   invokerDestructionTimer.schedule(task, invokerDestructionDelay);
                }
                
-               log.trace(this + " scheduled destruction of " + invoker);
+               if (trace) log.trace(this + " scheduled destruction of " + invoker);
             }
          }
          else
@@ -570,8 +687,30 @@
          
          invoker = null;
       }
-
-      connected = false;
+      
+      synchronized (connectionValidatorLock)
+      {
+         if (connectionValidator != null)
+         {
+            Iterator it = connectionListeners.iterator();
+            while (it.hasNext())
+            {
+               ConnectionListener listener = (ConnectionListener) it.next();
+               connectionValidator.removeConnectionListener(this, listener);
+            }
+            if (connectionValidator.isStopped())
+            {
+               if (connectionValidators.remove(connectionValidatorKey) != null)
+               {
+                  if (trace) log.trace(this + ".disconnect() removed from static map: " + connectionValidator);
+               }
+            }
+            
+            connectionValidator = null;
+            connectionValidatorKey = null;
+         }
+      }
+      log.debug(this + " is disconnected");
    }
 
    /**
@@ -1252,7 +1391,7 @@
             {
                if (e.getCause() != null && e.getCause() instanceof SocketTimeoutException)
                {
-                  if (log.isTraceEnabled()) log.trace(this + ": getCallbacks() timed out: returning empty list");
+                  if (trace) log.trace(this + ": getCallbacks() timed out: returning empty list");
                   return new ArrayList();
                }
                throw e;
@@ -1596,23 +1735,61 @@
 
    public String toString()
    {
-      return "Client[" + System.identityHashCode(this) + "]";
+      return "Client[" + System.identityHashCode(this) + ":" + sessionId + "]";
    }
 
    // Package protected ----------------------------------------------------------------------------
 
+   void notifyListeners()
+   {
+      synchronized (connectionValidatorLock)
+      {
+         log.debug(this + " entering notifyListeners(): " + connectionValidator);
+         if (connectionValidator != null)
+         {
+            synchronized (connectionValidator)
+            {
+               if (connectionValidator.isStopped())
+               {
+                  if (trace) log.trace(this + ": " + connectionValidator + " is stopped");
+               }
+               else
+               {
+                  if (trace) log.trace(this + ": " + connectionValidator + " is not stopped");
+                  if (trace) log.trace(this + " calling connectionValidator.notifyListeners()");
+                  connectionValidator.notifyListeners(new Exception("Could not connect to server!"));
+                  Iterator it = connectionListeners.iterator();
+                  while (it.hasNext())
+                  {
+                     ConnectionListener listener = (ConnectionListener) it.next();
+                     connectionValidator.removeConnectionListener(this, listener);
+                  }
+                  if (connectionValidators.remove(connectionValidatorKey) != null)
+                  {
+                     if (trace) log.trace(this + ".notifyAndDisconnect() removed from static map: " + connectionValidator);
+                  }
+               }
+            }
+            connectionValidator = null;
+            connectionValidatorKey = null;
+         }
+         
+         log.debug(this + " leaving notifyListeners()");
+      }
+   }
+   
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
 
-   private void connect(ClientInvoker invoker)
+   private void connect(ClientInvoker invoker, ConnectionListener listener, Map metadata)
    {
       if (invoker != null)
       {
          invoker.connect();
          try
          {
-            setupClientLease(invoker);
+            setupClientLease(invoker, listener, metadata);
          }
          catch (Throwable throwable)
          {
@@ -1621,6 +1798,7 @@
             e.initCause(throwable);
             throw e;
          }
+         log.debug(this + " connected to " + locator);
       }
       else
       {
@@ -1629,7 +1807,7 @@
       }
    }
 
-   private void setupClientLease(ClientInvoker invoker) throws Throwable
+   private void setupClientLease(ClientInvoker invoker, ConnectionListener listener, Map metadata) throws Throwable
    {
       long leasePeriod = -1;
       boolean enableLease = false;
@@ -1710,10 +1888,26 @@
          }
       }
 
+      if (trace) log.trace(this + " enableLease: " + enableLease);
       if (enableLease)
       {
-         invoker.establishLease(sessionId, configuration, leasePeriod);
+         Map temp = new HashMap(configuration);
+         if (metadata != null)
+         {
+             temp.putAll(metadata);
+         }
+         if (useClientConnectionIdentity)
+         {
+            temp.put(CLIENT, this);
+            temp.put(CONNECTION_LISTENER, listener);
+         }
+         if (trace) log.trace(this + " calling MicroRemoteClientInvoker.establishLease()");
+         invoker.establishLease(sessionId, temp, leasePeriod);
       }
+      else if (listener != null)
+      {
+          addConnectionListener(listener, metadata);
+      }
    }
 
    private Object invoke(Object param, Map metadata, InvokerLocator callbackServerLocator)
@@ -1833,6 +2027,28 @@
       {
          log.error("invokerDestructionDelay parameter must be a string in integer format: " + param);
       }
+      
+      param = configuration.get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+      if (param instanceof String)
+      {
+         useClientConnectionIdentity = Boolean.valueOf((String) param).booleanValue();
+      }
+      else if (param != null)
+      {
+         log.warn("value of " + Remoting.USE_CLIENT_CONNECTION_IDENTITY + " must be a String: " + param); 
+      }
+      else
+      {
+         if (locator.getParameters() != null)
+         {
+            param = locator.getParameters().get(Remoting.USE_CLIENT_CONNECTION_IDENTITY);
+            if (param != null)
+            {
+               useClientConnectionIdentity = Boolean.valueOf((String) param).booleanValue();
+               this.configuration.put(Remoting.USE_CLIENT_CONNECTION_IDENTITY, param);
+            }
+         }
+      }
    }
 
    private void configureCallbackServerSocketFactory(Map map) throws Exception
@@ -1864,6 +2080,34 @@
       }
    }
    
+   static class ConnectionValidatorKey
+   {
+      private ClientInvoker invoker;
+      private Map metadata;
+      
+      ConnectionValidatorKey(ClientInvoker invoker, Map metadata)
+      {
+         this.invoker = invoker;
+         this.metadata = metadata;
+      }
+      
+      public boolean equals(Object o)
+      {
+         if (o == null)
+            return false;
+         if (! (o instanceof ConnectionValidatorKey))
+            return false;
+         ConnectionValidatorKey holder = (ConnectionValidatorKey) o;
+         boolean metadataEquals = (metadata == null && holder.metadata == null) || metadata.equals(holder.metadata); 
+         return invoker == holder.invoker && metadataEquals;
+      }
+      
+      public int hashCode()
+      {
+         return invoker.hashCode() * metadata.hashCode();
+      }
+   }
+   
    static private InetAddress getLocalHost() throws UnknownHostException
    {
       if (SecurityUtility.skipAccessControl())




More information about the jboss-remoting-commits mailing list