[jboss-cvs] JBoss Messaging SVN: r3019 - in trunk: src/main/org/jboss/jms/client/delegate and 14 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Aug 21 00:19:49 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-08-21 00:19:48 -0400 (Tue, 21 Aug 2007)
New Revision: 3019

Added:
   trunk/src/main/org/jboss/jms/delegate/TopologyResult.java
   trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java
Modified:
   trunk/build-messaging.xml
   trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
   trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java
   trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java
   trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java
   trunk/src/main/org/jboss/jms/server/ConnectionManager.java
   trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
   trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java
   trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
   trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1027 and http://jira.jboss.org/jira/browse/JBMESSAGING-1038

Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/build-messaging.xml	2007-08-21 04:19:48 UTC (rev 3019)
@@ -123,8 +123,8 @@
        This module is based on Java 1.4
    -->
 
-   <property name="javac.target" value="1.4"/>
-   <property name="javac.source" value="1.4"/>
+   <property name="javac.target" value="1.5"/>
+   <property name="javac.source" value="1.5"/>
 
    <property name="javac.debug" value="true"/>
    <property name="javac.optimize" value="false"/>

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientClusteredConnectionFactoryDelegate.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -27,11 +27,20 @@
 import javax.jms.JMSException;
 
 import org.jboss.jms.client.plugin.LoadBalancingPolicy;
+import org.jboss.jms.client.remoting.JMSRemotingConnection;
+import org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler;
+import org.jboss.jms.client.container.JMSClientVMIdentifier;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.CreateConnectionResult;
 import org.jboss.jms.delegate.IDBlock;
+import org.jboss.jms.delegate.TopologyResult;
 import org.jboss.jms.exception.MessagingNetworkFailureException;
+import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyResponse;
+import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.util.Version;
 
 /**
  * A ClientClusteredConnectionFactoryDelegate.
@@ -48,7 +57,7 @@
  *
  * $Id$
  */
-public class ClientClusteredConnectionFactoryDelegate
+public class ClientClusteredConnectionFactoryDelegate extends DelegateSupport
    implements Serializable, ConnectionFactoryDelegate
 {
    // Constants ------------------------------------------------------------------------------------
@@ -57,11 +66,126 @@
 
    private static final Logger log =
       Logger.getLogger(ClientClusteredConnectionFactoryDelegate.class);
+   private static boolean trace = log.isTraceEnabled();
 
+   // Serialization and CallbackHandler code -------------------------------------------------------
+
+   private transient JMSRemotingConnection remoting;
+   private transient ClientConnectionFactoryDelegate currentDelegate;
+
+   private void readObject(java.io.ObjectInputStream s)
+        throws java.io.IOException, ClassNotFoundException
+   {
+      s.defaultReadObject();
+      establishCallback();
+   }
+
+   public synchronized void establishCallback()
+   {
+
+      log.debug(" Establishing CFCallback\n");
+
+      for (int server = delegates.length - 1; server >= 0; server--)
+      {
+         if (trace) log.trace("Closing current callback");
+         closeCallback();
+
+         if (trace) log.trace("Trying communication on server(" + server + ")=" + delegates[server].getServerLocatorURI());
+         try
+         {
+            remoting = new JMSRemotingConnection(delegates[server].getServerLocatorURI(), true);
+            remoting.start();
+            currentDelegate = delegates[server];
+            if (trace) log.trace("Adding callback");
+            addCallback(delegates[server]);
+            if (trace) log.trace("Getting topology");
+            TopologyResult topology = getTopology();
+            if (trace) log.trace("delegates.size = " + topology.getDelegates().length);
+
+            break;
+         }
+         catch (Throwable e)
+         {
+            log.warn("Server communication to server[" + server + "] (" +
+               delegates[server].getServerLocatorURI() + ") during establishCallback was broken, " +
+               "trying the next one", e);
+            if (remoting != null)
+            {
+               remoting.stop();
+               remoting = null;
+               currentDelegate = null;
+            }
+         }
+      }
+   }
+
+   private void addCallback(ClientConnectionFactoryDelegate delegate) throws Throwable
+   {
+      remoting.getCallbackManager().setConnectionfactoryCallbackHandler(new ConnectionFactoryCallbackHandler(this, remoting));
+
+      ConnectionFactoryAddCallbackRequest request =
+         new ConnectionFactoryAddCallbackRequest (JMSClientVMIdentifier.instance,
+               remoting.getRemotingClient().getSessionId(),
+               delegate.getID(),
+               Version.instance().getProviderIncrementingVersion());
+
+      remoting.getRemotingClient().invoke(request, null);
+
+   }
+
+   private void removeCallback() throws Throwable
+   {
+      ConnectionFactoryRemoveCallbackRequest request =
+         new ConnectionFactoryRemoveCallbackRequest (JMSClientVMIdentifier.instance,
+               remoting.getRemotingClient().getSessionId(),
+               currentDelegate.getID(),
+               Version.instance().getProviderIncrementingVersion());
+
+      remoting.getRemotingClient().invoke(request, null);
+   }
+
+   protected void finalize() throws Throwable
+   {
+      super.finalize();
+      closeCallback();
+
+   }
+
+   public void closeCallback()
+   {
+      if (remoting != null)
+      {
+         try
+         {
+            removeCallback();
+         }
+         catch (Throwable warn)
+         {
+            log.warn(warn, warn);
+         }
+
+         try
+         {
+            remoting.removeConnectionListener();
+            remoting.stop();
+            currentDelegate = null;
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         remoting = null;
+      }
+   }
+   // Serialization and CallbackHandler code -------------------------------------------------------
+
+   
    // Static ---------------------------------------------------------------------------------------
 
    // Attributes -----------------------------------------------------------------------------------
 
+   private String uniqueName;
+
    private ClientConnectionFactoryDelegate[] delegates;
 
    // Map <Integer(nodeID)->Integer(failoverNodeID)>
@@ -75,11 +199,13 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ClientClusteredConnectionFactoryDelegate(ClientConnectionFactoryDelegate[] delegates,
+   public ClientClusteredConnectionFactoryDelegate(String uniqueName,
+                                                   ClientConnectionFactoryDelegate[] delegates,
                                                    Map failoverMap,
                                                    LoadBalancingPolicy loadBalancingPolicy,
                                                    boolean supportsFailover)
    {
+      this.uniqueName = uniqueName;
       this.delegates = delegates;
       this.failoverMap = failoverMap;
       this.loadBalancingPolicy = loadBalancingPolicy;
@@ -167,7 +293,36 @@
    {
    	return supportsFailover;
    }
-   
+
+   public String getUniqueName()
+   {
+      return uniqueName;
+   }
+
+
+   public TopologyResult getTopology() throws JMSException
+   {
+
+      try
+      {
+         ConnectionFactoryGetTopologyRequest request =
+            new ConnectionFactoryGetTopologyRequest(currentDelegate.getID());
+
+         ConnectionFactoryGetTopologyResponse response = (ConnectionFactoryGetTopologyResponse)remoting.getRemotingClient().invoke(request, null);
+
+
+         TopologyResult topology = (TopologyResult)response.getResponse();
+
+         updateFailoverInfo(topology.getDelegates(), topology.getFailoverMap());
+
+         return topology;
+      }
+      catch (Throwable e)
+      {
+         throw handleThrowable(e);
+      }
+   }
+
    //Only used in testing
    public void setSupportsFailover(boolean failover)
    {
@@ -186,10 +341,7 @@
       
       failoverMap.putAll(newFailoverMap);
 
-      if (supportsLoadBalancing)
-      {
-      	loadBalancingPolicy.updateView(delegates);
-      }
+      loadBalancingPolicy.updateView(delegates);
    }
 
    public String toString()

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -132,11 +132,6 @@
       // There is one RM per server, so we need to merge the rms if necessary
       ResourceManagerFactory.instance.handleFailover(serverID, newDelegate.getServerID());
 
-
-      // The remoting connection was replaced by a new one..
-      // we have to set the connection Delegate on the CallbackManager to avoid leaks
-      remotingConnection.getCallbackManager().setConnectionDelegate(this);
-
       client = thisState.getRemotingConnection().getRemotingClient();
 
       serverID = newDelegate.getServerID();

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -33,6 +33,7 @@
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
 import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.delegate.TopologyResult;
 import org.jboss.jms.exception.MessagingNetworkFailureException;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
@@ -63,6 +64,9 @@
    // Attributes -----------------------------------------------------------------------------------
 
    //This data is needed in order to create a connection
+
+   private String uniqueName;
+
    private String serverLocatorURI;
 
    private Version serverVersion;
@@ -99,11 +103,12 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ClientConnectionFactoryDelegate(String objectID, int serverID, String serverLocatorURI,
+   public ClientConnectionFactoryDelegate(String uniqueName, String objectID, int serverID, String serverLocatorURI,
                                           Version serverVersion, boolean clientPing)
    {
       super(objectID);
-      
+
+      this.uniqueName = uniqueName;
       this.serverID = serverID;
       this.serverLocatorURI = serverLocatorURI;
       this.serverVersion = serverVersion;
@@ -115,7 +120,7 @@
    }
 
    // ConnectionFactoryDelegate implementation -----------------------------------------------------
- 
+
    public CreateConnectionResult createConnectionDelegate(String username,
                                                           String password,
                                                           int failedNodeID)
@@ -186,8 +191,6 @@
          connectionDelegate.setRemotingConnection(remotingConnection);
          
          connectionDelegate.setVersionToUse(version);
-
-         remotingConnection.getCallbackManager().setConnectionDelegate(connectionDelegate);
       }
       else
       {
@@ -226,7 +229,12 @@
       
       return (byte[])doInvoke(theClient, req); 
    }
-      
+
+   public TopologyResult getTopology() throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    // Public ---------------------------------------------------------------------------------------
 
    public String toString()

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -62,12 +62,9 @@
 
    private static boolean trace = log.isTraceEnabled();
 
-   protected static CallbackManager theManager;
-
    // Attributes -----------------------------------------------------------------------------------
 
-   // Map<Long(lookup)-ClientConsumer>
-   protected Map callbackHandlers;
+   protected Map<String, ClientConsumer> callbackHandlers;
    protected ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler;
 
    // Constructors ---------------------------------------------------------------------------------
@@ -115,12 +112,17 @@
       }
       else if (parameter instanceof ConnectionFactoryUpdate)
       {
-         ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)parameter;
 
-         if (trace) { log.trace(this + " receiving cluster view change " + viewChange); }
+         if (connectionfactoryCallbackHandler == null)
+         {
+            log.warn("ConnectionFactoryUpdate was received but there is no callbackHandler set");
+         }
+         else
+         {
+            ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)parameter;
 
-         if (connectionfactoryCallbackHandler != null)
-         {
+            if (trace) { log.trace(this + " receiving cluster view change " + viewChange); }
+
             connectionfactoryCallbackHandler.handleMessage(viewChange);
          }
       }
@@ -137,17 +139,22 @@
       callbackHandlers.put(consumerID, handler);
    }
 
-   public void setConnectionDelegate(ClientConnectionDelegate connectionDelegate)
-   {
-      this.connectionfactoryCallbackHandler =
-         new ConnectionFactoryCallbackHandler(connectionDelegate);
-   }
-
    public ClientConsumer unregisterHandler(String consumerID)
    { 
       return (ClientConsumer)callbackHandlers.remove(consumerID);
    }
 
+
+   public ConnectionFactoryCallbackHandler getConnectionfactoryCallbackHandler()
+   {
+      return connectionfactoryCallbackHandler;
+   }
+
+   public void setConnectionfactoryCallbackHandler(ConnectionFactoryCallbackHandler connectionfactoryCallbackHandler)
+   {
+      this.connectionfactoryCallbackHandler = connectionfactoryCallbackHandler;
+   }
+
    public String toString()
    {
       return "CallbackManager[" + Integer.toHexString(hashCode()) + "]";

Modified: trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/remoting/ConnectionFactoryCallbackHandler.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -23,10 +23,11 @@
 package org.jboss.jms.client.remoting;
 
 import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.ClientConnectionDelegate;
-import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
 import org.jboss.logging.Logger;
+import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.Client;
+import java.lang.ref.WeakReference;
 
 /**
  * This class will manage ConnectionFactory messages updates
@@ -43,8 +44,9 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private ClientConnectionDelegate connectionDelegate;
-   private ConnectionState state;
+   // Without a WeakReference here, the CF would never be released!
+   private WeakReference<ClientClusteredConnectionFactoryDelegate> delegateRef;
+   private JMSRemotingConnection remotingConnection;
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -52,9 +54,12 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ConnectionFactoryCallbackHandler(ClientConnectionDelegate connectionDelegate)
+   public ConnectionFactoryCallbackHandler(ClientClusteredConnectionFactoryDelegate cfDelegate,
+                                           JMSRemotingConnection remotingConnection)
    {
-      this.connectionDelegate = connectionDelegate;
+      this.delegateRef = new WeakReference<ClientClusteredConnectionFactoryDelegate>(cfDelegate);
+      this.remotingConnection = remotingConnection;
+      this.remotingConnection.addPlainConnectionListener(new CallbackConnectionListener());
    }
 
    // Public ---------------------------------------------------------------------------------------
@@ -65,44 +70,42 @@
 
       ConnectionFactoryUpdate viewChange = (ConnectionFactoryUpdate)message;
 
-      ConnectionState state = getState();
-            
-      if (state != null)
-      {      
-      	Object d = state.getClusteredConnectionFactoryDelegate();
-      	
-      	if (d instanceof ClientClusteredConnectionFactoryDelegate)
-         {
-            ClientClusteredConnectionFactoryDelegate clusteredDelegate =
-               (ClientClusteredConnectionFactoryDelegate)d;
+      ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
 
-            clusteredDelegate.updateFailoverInfo(viewChange.getDelegates(),
-                                                 viewChange.getFailoverMap());
-         }
-      }      
+      if (delegate!=null)
+      {
+         delegate.updateFailoverInfo(viewChange.getTopology().getDelegates(),
+                                           viewChange.getTopology().getFailoverMap());
+      }
    }
 
+
    public String toString()
    {
-      return "ConnectionFactoryCallbackHandler[" + connectionDelegate + "]";
+      return "ConnectionFactoryCallbackHandler[" + delegateRef.get() + "]";
    }
 
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
 
-   // When ConnectionFactoryCallbackHandler is created, is not guaranteed that state is set
-   // as this could be later initialized by the aop stack
-   protected ConnectionState getState()
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   class CallbackConnectionListener implements ConnectionListener
    {
-      if (state==null)
+
+      public void handleConnectionException(Throwable throwable, Client client)
       {
-         this.state = (ConnectionState)connectionDelegate.getState();
+         ClientClusteredConnectionFactoryDelegate delegate = delegateRef.get();
+
+         if (delegate!=null)
+         {
+            delegate.establishCallback();
+         }
+
+         //remotingConnection.removePlainConnectionListener(this);
       }
-      return this.state;
    }
-
-   // Private --------------------------------------------------------------------------------------
-
-   // Inner classes --------------------------------------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/client/remoting/JMSRemotingConnection.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -23,6 +23,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.net.MalformedURLException;
 
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.wireformat.JMSWireFormat;
@@ -31,6 +32,7 @@
 import org.jboss.remoting.Client;
 import org.jboss.remoting.InvokerLocator;
 import org.jboss.remoting.ServerInvoker;
+import org.jboss.remoting.ConnectionListener;
 import org.jboss.remoting.callback.CallbackPoller;
 import org.jboss.remoting.callback.InvokerCallbackHandler;
 import org.jboss.remoting.transport.bisocket.Bisocket;
@@ -244,7 +246,7 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Throwable
+   public JMSRemotingConnection(String serverLocatorURI, boolean clientPing) throws Exception
    {
       serverLocator = new InvokerLocator(serverLocatorURI);
       this.clientPing = clientPing;
@@ -382,6 +384,16 @@
       return true;
    }
 
+   public synchronized void addPlainConnectionListener(ConnectionListener listener)
+   {
+      client.addConnectionListener(listener);
+   }
+
+   public synchronized void removePlainConnectionListener(ConnectionListener listener)
+   {
+      client.removeConnectionListener(listener);
+   }
+
    public synchronized ConsolidatedRemotingConnectionListener getConnectionListener()
    {
       return remotingConnectionListener;

Modified: trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/delegate/ConnectionFactoryEndpoint.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -43,7 +43,10 @@
                                                    String password, 
                                                    int failedNodeID)
       throws JMSException;
+
+   /** Get the current Cluster topology associated with a ClusteredConnectionFactory */
+   TopologyResult getTopology() throws JMSException;
    
-   byte[] getClientAOPStack() throws JMSException;     
+   byte[] getClientAOPStack() throws JMSException;
 }
 

Modified: trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -43,18 +43,10 @@
  */
 public interface SessionEndpoint extends Closeable
 {
-   /**
-    * @param failoverChannelID - the ID of the channel for which there is a failover process in
-    *        progress. -1 means regular (non-failover) consumer delegate creation.
-    */
    ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                            boolean noLocal, String subscriptionName,
                                            boolean connectionConsumer, boolean autoFlowControl) throws JMSException;
    
-   /**
-    * @param failoverChannelID - the ID of the channel for which there is a failover process in
-    *        progress. -1 means regular (non-failover) browser delegate creation.
-    */
    BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector) throws JMSException;
 
    /**

Added: trunk/src/main/org/jboss/jms/delegate/TopologyResult.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/TopologyResult.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/delegate/TopologyResult.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,196 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.delegate;
+
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.wireformat.PacketSupport;
+import org.jboss.messaging.util.Streamable;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class TopologyResult implements Streamable
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+   private ClientConnectionFactoryDelegate[] delegates;
+
+   private Map failoverMap;
+
+   String uniqueName;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+
+   public TopologyResult()
+   {
+   }
+
+   public TopologyResult(String uniqueName, ClientConnectionFactoryDelegate[] delegates,
+                                  Map failoverMap)
+   {
+      this.uniqueName = uniqueName;
+
+      this.delegates = delegates;
+
+      this.failoverMap = failoverMap;
+   }
+
+
+
+   // Public ---------------------------------------------------------------------------------------
+
+
+   public ClientConnectionFactoryDelegate[] getDelegates()
+   {
+      return delegates;
+   }
+
+   public void setDelegates(ClientConnectionFactoryDelegate[] delegates)
+   {
+      this.delegates = delegates;
+   }
+
+   public Map getFailoverMap()
+   {
+      return failoverMap;
+   }
+
+   public void setFailoverMap(Map failoverMap)
+   {
+      this.failoverMap = failoverMap;
+   }
+
+   public String getUniqueName()
+   {
+      return uniqueName;
+   }
+
+   public void setUniqueName(String uniqueName)
+   {
+      this.uniqueName = uniqueName;
+   }
+
+   public void read(DataInputStream is) throws Exception
+   {
+      uniqueName = is.readUTF();
+
+      int len = is.readInt();
+
+      delegates = new ClientConnectionFactoryDelegate[len];
+
+      for (int i = 0; i < len; i++)
+      {
+         delegates[i] = new ClientConnectionFactoryDelegate();
+
+         delegates[i].read(is);
+      }
+
+      len = is.readInt();
+
+      failoverMap = new HashMap(len);
+
+      for (int c = 0; c < len; c++)
+      {
+         Integer i = new Integer(is.readInt());
+
+         Integer j = new Integer(is.readInt());
+
+         failoverMap.put(i, j);
+      }
+   }
+
+   public void write(DataOutputStream os) throws Exception
+   {
+      os.writeUTF(uniqueName);
+
+      int len = delegates.length;
+
+      os.writeInt(len);
+
+      for (int i = 0; i < len; i++)
+      {
+         delegates[i].write(os);
+      }
+
+      os.writeInt(failoverMap.size());
+
+      Iterator iter = failoverMap.entrySet().iterator();
+
+      while (iter.hasNext())
+      {
+         Map.Entry entry = (Map.Entry)iter.next();
+
+         Integer i = (Integer)entry.getKey();
+
+         Integer j = (Integer)entry.getValue();
+
+         os.writeInt(i.intValue());
+
+         os.writeInt(j.intValue());
+      }
+
+   }
+
+
+   public String toString()
+   {
+      StringBuffer sb = new StringBuffer("UpdateConnectionFactoryResult[");
+
+      if (delegates != null)
+      {
+         for(int i = 0; i < delegates.length; i++)
+         {
+            sb.append(delegates[i]);
+            if (i < delegates.length - 1)
+            {
+               sb.append(',');
+            }
+         }
+      }
+
+      sb.append("]");
+
+      return sb.toString();
+
+   }
+   
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+}


Property changes on: trunk/src/main/org/jboss/jms/delegate/TopologyResult.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java
===================================================================
--- trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/exception/MessagingJMSException.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -46,7 +46,7 @@
      this(reason, null, null);
    }
 
-   public MessagingJMSException(Exception cause) {
+   public MessagingJMSException(Throwable cause) {
      this(null, null, cause);
    }
 

Modified: trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java
===================================================================
--- trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/exception/MessagingNetworkFailureException.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -34,7 +34,7 @@
 {
    private static final long serialVersionUID = 1255764532063281353L;
 
-   public MessagingNetworkFailureException(Exception cause)
+   public MessagingNetworkFailureException(Throwable cause)
    {
       super(cause);
    }

Modified: trunk/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/ConnectionManager.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,6 +25,7 @@
 
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.messaging.core.contract.MessagingComponent;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 
 /**
@@ -59,6 +60,12 @@
     */
    List getActiveConnections();
 
+   void addConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
+
+   void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
+   
+   ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName);
+
    /**
     * @param clientToServer - true if the failure has been detected on a direct connection from
     *        client to this server, false if the failure has been detected while trying to send a

Modified: trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -86,7 +86,7 @@
    private Map delegates;
 
    private Replicator replicator;
-   
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ConnectionFactoryJNDIMapper(ServerPeer serverPeer) throws Exception
@@ -118,7 +118,7 @@
    {
       log.debug(this + " registering connection factory '" + uniqueName +
          "', bindings: " + jndiBindings);
-      
+
       // Sanity check
       if (delegates.containsKey(uniqueName))
       {
@@ -126,11 +126,14 @@
                                             "registered with name " + uniqueName);
       }
 
-      String id = GUIDGenerator.generateGUID();
+      // See http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4076040#4076040
+      //String id = GUIDGenerator.generateGUID();
+      String id = uniqueName;
+      
       Version version = serverPeer.getVersion();
 
       ServerConnectionFactoryEndpoint endpoint =
-         new ServerConnectionFactoryEndpoint(id, serverPeer, clientID,
+         new ServerConnectionFactoryEndpoint(uniqueName, id, serverPeer, clientID,
                                              jndiBindings, prefetchSize,
                                              defaultTempQueueFullSize,
                                              defaultTempQueuePageSize,
@@ -145,13 +148,13 @@
       {
          setupReplicator();
       }
-      
+
       if (supportsFailover && replicator == null)
       {
       	log.warn("supportsFailover attribute is true on connection factory: " + uniqueName + " but post office is non clustered. " +
       			   "So connection factory will *not* support failover");
       }
-      
+
       if (supportsLoadBalancing && replicator == null)
       {
       	log.warn("supportsLoadBalancing attribute is true on connection factory: " + uniqueName + " but post office is non clustered. " +
@@ -161,7 +164,7 @@
       boolean creatingClustered = (supportsFailover || supportsLoadBalancing) && replicator != null;
 
       ClientConnectionFactoryDelegate localDelegate =
-         new ClientConnectionFactoryDelegate(id, serverPeer.getServerPeerID(),
+         new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getServerPeerID(),
                                              locatorURI, version, clientPing);
 
       log.debug(this + " created local delegate " + localDelegate);
@@ -172,18 +175,18 @@
       // connection factory list. This will happen locally too, so we will get the replication
       // message locally - to avoid updating it again we can ignore any "add" replication updates
       // that originate from the current node.
-      
+
       if (creatingClustered)
       {
          // Create a clustered delegate
-         
+
          if (!supportsLoadBalancing)
          {
          	loadBalancingFactory = new NoLoadBalancingLoadBalancingFactory(localDelegate);
          }
 
          Map localDelegates = replicator.get(Replicator.CF_PREFIX + uniqueName);
-         delegate = createClusteredDelegate(localDelegates.values(), loadBalancingFactory, supportsFailover);
+         delegate = createClusteredDelegate(uniqueName, localDelegates.values(), loadBalancingFactory, endpoint, supportsFailover);
 
          log.debug(this + " created clustered delegate " + delegate);
       }
@@ -198,13 +201,13 @@
 
       // Now bind it in JNDI
       rebindConnectionFactory(initialContext, jndiBindings, delegate);
-      
+
       ConnectionFactoryAdvised advised;
-      
+
       // Need to synchronized to prevent a deadlock
       // See http://jira.jboss.com/jira/browse/JBMESSAGING-797
       synchronized (AspectManager.instance())
-      {       
+      {
          advised = new ConnectionFactoryAdvised(endpoint);
       }
 
@@ -213,8 +216,8 @@
       Dispatcher.instance.registerTarget(id, advised);
 
       // Replicate the change - we will ignore this locally
-   	
-      if (replicator != null) replicator.put(Replicator.CF_PREFIX + uniqueName, localDelegate);            
+
+      if (replicator != null) replicator.put(Replicator.CF_PREFIX + uniqueName, localDelegate);
    }
 
    public synchronized void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing)
@@ -254,11 +257,11 @@
       if (replicator != null)
       {
       	if (!replicator.remove(Replicator.CF_PREFIX + uniqueName))
-         {            	
+         {
             throw new IllegalStateException("Cannot find replicant to remove: " +
             		Replicator.CF_PREFIX + uniqueName);
          }
-      }      
+      }
 
       Dispatcher.instance.unregisterTarget(endpoint.getID(), endpoint);
    }
@@ -286,7 +289,7 @@
       log.debug(this + " received notification from node " + notification.nodeID );
 
       try
-      {      	      	
+      {
       	if (notification.type == ClusterNotification.TYPE_NODE_JOIN || notification.type == ClusterNotification.TYPE_NODE_LEAVE)
       	{
             // We respond to changes in the node-address mapping. This will be replicated whan a
@@ -318,22 +321,25 @@
          else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
          		   (notification.data instanceof String) && ((String)notification.data).startsWith(Replicator.CF_PREFIX))
          {
+
+            log.debug("Updating CF information for " + notification.data);
             // A connection factory has been deployed / undeployed
 
          	// NOTE! All connection factories MUST be deployed on all nodes!
          	// Otherwise the server might failover onto a node which doesn't have that connection factory deployed
             // so the connection won't be able to recconnect.
-         	
+
          	String key = (String)notification.data;
-         	
+
             String uniqueName = key.substring(Replicator.CF_PREFIX.length());
 
             log.debug(this + " received '" + uniqueName +  "' connection factory deploy / undeploy");
 
             ConnectionFactoryDelegate cfd = (ConnectionFactoryDelegate)delegates.get(uniqueName);
-              
+
             if (cfd == null)
             {
+               log.info("cfd == null");
                //This is ok - connection factory a might be deployed on node A before being deployed on node B so
             	//node B might get the notification before it has deployed a itself
             }
@@ -341,23 +347,24 @@
             {
                if (cfd instanceof ClientConnectionFactoryDelegate)
                {
-               	//Non clustered - ignore
-               	
+                  log.info("Non Clustered!!!");
+                  //Non clustered - ignore
+
                	//We still replicate non clustered connection factories since the ClusterPullConnectionFactory
                	//is non clustered but needs to be available across the cluster
                }
                else
                {
                	ClientClusteredConnectionFactoryDelegate del = (ClientClusteredConnectionFactoryDelegate)cfd;
-            	
+
 	            	Map updatedReplicantMap = replicator.get(key);
-	            	            	
+
 		            List newDels = sortDelegatesOnServerID(updatedReplicantMap.values());
-		
+
 		            ClientConnectionFactoryDelegate[] delArr =
 		               (ClientConnectionFactoryDelegate[])newDels.
 		                  toArray(new ClientConnectionFactoryDelegate[newDels.size()]);
-		
+
                   Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
 
 		            del.setDelegates(delArr);
@@ -365,12 +372,12 @@
 
 		            ServerConnectionFactoryEndpoint endpoint =
 		               (ServerConnectionFactoryEndpoint)endpoints.get(uniqueName);
-		
+
 		            if (endpoint == null)
 		            {
 		               throw new IllegalStateException("Cannot find endpoint with name " + uniqueName);
 		            }
-		
+
 		            rebindConnectionFactory(initialContext, endpoint.getJNDIBindings(), del);
 
 		            endpoint.updateClusteredClients(delArr, failoverMap);
@@ -410,8 +417,9 @@
    /**
     * @param localDelegates - Collection<ClientConnectionFactoryDelegate>
     */
-   private ClientClusteredConnectionFactoryDelegate  createClusteredDelegate(Collection localDelegates, LoadBalancingFactory loadBalancingFactory,
-      		                                                                 boolean supportsFailover)
+   private ClientClusteredConnectionFactoryDelegate  createClusteredDelegate(String uniqueName, Collection localDelegates, LoadBalancingFactory loadBalancingFactory,
+                                                                             ServerConnectionFactoryEndpoint endpoint,
+                                                                             boolean supportsFailover)
       throws Exception
    {
       log.trace(this + " creating a clustered ConnectionFactoryDelegate based on " + localDelegates);
@@ -426,8 +434,10 @@
       Map failoverMap = serverPeer.getPostOfficeInstance().getFailoverMap();
 
       LoadBalancingPolicy lbp = loadBalancingFactory.createLoadBalancingPolicy(delegates);
-      
-      return new ClientClusteredConnectionFactoryDelegate(delegates, failoverMap, lbp, supportsFailover);
+
+      endpoint.updateTopology(delegates, failoverMap);
+
+      return new ClientClusteredConnectionFactoryDelegate(uniqueName, delegates, failoverMap, lbp, supportsFailover);
    }
 
    private void rebindConnectionFactory(Context ic, JNDIBindings jndiBindings,

Modified: trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,10 +25,10 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
 
@@ -39,9 +39,11 @@
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
 import org.jboss.messaging.core.contract.Replicator;
 import org.jboss.messaging.util.Util;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ClientDisconnectedException;
 import org.jboss.remoting.ConnectionListener;
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -62,14 +64,15 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   // Map<jmsClientVMID<String> - Map<remotingClientSessionID<String> - ConnectionEndpoint>>
-   private Map jmsClients;
+   private Map</** VMID */String, Map</** RemoteSessionID */String, ConnectionEndpoint>> jmsClients;
 
    // Map<remotingClientSessionID<String> - jmsClientVMID<String>
    private Map remotingSessions;
 
    // Set<ConnectionEndpoint>
    private Set activeConnectionEndpoints;
+
+   private Map</** CFUniqueName*/ String, ConnectionFactoryCallbackInformation> cfCallbackInfo;
    
    private Replicator replicator;
 
@@ -80,6 +83,7 @@
       jmsClients = new HashMap();
       remotingSessions = new HashMap();
       activeConnectionEndpoints = new HashSet();
+      cfCallbackInfo = new ConcurrentHashMap<String, ConnectionFactoryCallbackInformation>();
    }
 
    // ConnectionManager implementation -------------------------------------------------------------
@@ -88,7 +92,7 @@
                                                String remotingClientSessionID,
                                                ConnectionEndpoint endpoint)
    {    
-      Map endpoints = (Map)jmsClients.get(jmsClientVMID);
+      Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientVMID);
       
       if (endpoints == null)
       {
@@ -109,11 +113,11 @@
    public synchronized ConnectionEndpoint unregisterConnection(String jmsClientVMId,
                                                                String remotingClientSessionID)
    {
-      Map endpoints = (Map)jmsClients.get(jmsClientVMId);
+      Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientVMId);
       
       if (endpoints != null)
       {
-         ConnectionEndpoint e = (ConnectionEndpoint)endpoints.remove(remotingClientSessionID);
+         ConnectionEndpoint e = endpoints.remove(remotingClientSessionID);
 
          if (e != null)
          {
@@ -148,7 +152,7 @@
    {
       String jmsClientID = (String)remotingSessions.get(remotingSessionID);
 
-      if (jmsClientID != null)
+      if (jmsClientID == null)
       {
          log.warn(this + " cannot look up remoting session ID " + remotingSessionID);
       }
@@ -191,7 +195,25 @@
          handleClientFailure(remotingSessionID, true);
       }
    }
-   
+
+   /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
+   public synchronized void addConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
+   {
+      getCFInfo(uniqueName).addClient(JVMID, handler);
+   }
+
+   /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
+   public synchronized void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
+   {
+      getCFInfo(uniqueName).removeHandler(JVMID, handler);
+   }
+
+   /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
+   public synchronized ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName)
+   {
+      return getCFInfo(uniqueName).getAllHandlers();
+   }
+
    // ClusterNotificationListener implementation ---------------------------------------------------
 
 
@@ -292,22 +314,34 @@
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
-   
+
+   private ConnectionFactoryCallbackInformation getCFInfo(String uniqueName)
+   {
+      ConnectionFactoryCallbackInformation callback = cfCallbackInfo.get(uniqueName);
+      if (callback == null)
+      {
+         callback = new ConnectionFactoryCallbackInformation(uniqueName);
+         cfCallbackInfo.put(uniqueName, callback);
+         callback = cfCallbackInfo.get(uniqueName);
+      }
+      return callback;
+   }
+
+
    private synchronized void closeConsumersForClientVMID(String jmsClientID)
    {
    	// Remoting only provides one pinger per invoker, not per connection therefore when the pinger
       // dies we must close ALL connections corresponding to that jms client ID.
 
-      Map endpoints = (Map)jmsClients.get(jmsClientID);
+      Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientID);
 
       if (endpoints != null)
       {
-         List sces = new ArrayList();
+         List<ConnectionEndpoint> sces = new ArrayList();
 
-         for(Iterator i = endpoints.entrySet().iterator(); i.hasNext(); )
+         for(Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
          {
-            Map.Entry entry = (Map.Entry)i.next();
-            ConnectionEndpoint sce = (ConnectionEndpoint)entry.getValue();
+            ConnectionEndpoint sce = entry.getValue();
             sces.add(sce);
          }
 
@@ -315,13 +349,11 @@
          // to remove the data from the jmsClients and sessions maps.
          // Note we do this outside the loop to prevent ConcurrentModificationException
 
-         for(Iterator i = sces.iterator(); i.hasNext(); )
+         for(ConnectionEndpoint sce: sces )
          {
-            ConnectionEndpoint sce = (ConnectionEndpoint)i.next();
-
             try
             {
-      			log.debug("clearing up state for connection " + sce);
+      			log.debug("clPearing up state for connection " + sce);
                sce.closing();
                sce.close();
                log.debug("cleared up state for connection " + sce);
@@ -332,8 +364,94 @@
             }          
          }
       }
+
+      for (ConnectionFactoryCallbackInformation cfInfo: cfCallbackInfo.values())
+      {
+         ServerInvokerCallbackHandler[] handlers = cfInfo.getAllHandlers(jmsClientID);
+         for (ServerInvokerCallbackHandler handler: handlers)
+         {
+            try
+            {
+               handler.getCallbackClient().disconnect();
+            }
+            catch (Throwable e)
+            {
+               log.warn (e, e);
+            }
+
+            try
+            {
+               handler.destroy();
+            }
+            catch (Throwable e)
+            {
+               log.warn (e, e);
+            }
+
+            cfInfo.removeHandler(jmsClientID, handler);
+         }
+
+      }
+
    }
 
    // Inner classes --------------------------------------------------------------------------------
 
+   /** Class used to organize Callbacks on ClusteredConnectionFactories */
+   static class ConnectionFactoryCallbackInformation
+   {
+
+      // We keep two lists, one containing all clients a CF will have to maintain and another
+      //   organized by JVMId as we will need that organization when cleaning up dead clients
+      String uniqueName;
+      Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<ServerInvokerCallbackHandler>> clientHandlersByVM;
+      ConcurrentHashSet<ServerInvokerCallbackHandler> clientHandlers;
+
+
+      public ConnectionFactoryCallbackInformation(String uniqueName)
+      {
+         this.uniqueName = uniqueName;
+         this.clientHandlersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<ServerInvokerCallbackHandler>>();
+         this.clientHandlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
+      }
+
+      public void addClient(String vmID, ServerInvokerCallbackHandler handler)
+      {
+         clientHandlers.add(handler);
+         getHandlersList(vmID).add(handler);
+      }
+
+      public ServerInvokerCallbackHandler[] getAllHandlers(String vmID)
+      {
+         Set<ServerInvokerCallbackHandler> list = getHandlersList(vmID);
+         ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[list.size()];
+         return (ServerInvokerCallbackHandler[])list.toArray(array);
+      }
+
+      public ServerInvokerCallbackHandler[] getAllHandlers()
+      {
+         ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[clientHandlers.size()];
+         return (ServerInvokerCallbackHandler[])clientHandlers.toArray(array);
+      }
+
+      public void removeHandler(String vmID, ServerInvokerCallbackHandler handler)
+      {
+         clientHandlers.remove(handler);
+         getHandlersList(vmID).remove(handler);
+      }
+
+      private ConcurrentHashSet<ServerInvokerCallbackHandler> getHandlersList(String vmID)
+      {
+         ConcurrentHashSet<ServerInvokerCallbackHandler> perVMList = clientHandlersByVM.get(vmID);
+         if (perVMList == null)
+         {
+            perVMList = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
+            clientHandlersByVM.put(vmID, perVMList);
+            perVMList = clientHandlersByVM.get(vmID);
+         }
+         return perVMList;
+      }
+
+   }
+
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -32,6 +32,7 @@
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
 import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.delegate.TopologyResult;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.connectionfactory.JNDIBindings;
 import org.jboss.jms.server.endpoint.advised.ConnectionAdvised;
@@ -39,6 +40,7 @@
 import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 import org.jboss.security.SecurityAssociation;
@@ -66,6 +68,8 @@
 
    private String clientID;
 
+   private String uniqueName;
+
    private String id;
 
    private JNDIBindings jndiBindings;
@@ -81,14 +85,24 @@
    private int dupsOKBatchSize;
    
    private boolean supportsFailover;
+
+   /** Cluster Topology on ClusteredConnectionFactories
+       Information to failover to other connections on clients **/
+   ClientConnectionFactoryDelegate[] delegates;
+
+   /** Cluster Topology on ClusteredConnectionFactories
+       Information to failover to other connections on clients **/
+   Map failoverMap;
+
    
+
    // Constructors ---------------------------------------------------------------------------------
 
    /**
     * @param jndiBindings - names under which the corresponding JBossConnectionFactory is bound in
     *        JNDI.
     */
-   public ServerConnectionFactoryEndpoint(String id, ServerPeer serverPeer,
+   public ServerConnectionFactoryEndpoint(String uniqueName, String id, ServerPeer serverPeer,
                                           String defaultClientID,
                                           JNDIBindings jndiBindings,
                                           int preFetchSize,
@@ -98,6 +112,7 @@
                                           int dupsOKBatchSize,
                                           boolean supportsFailover)
    {
+      this.uniqueName = uniqueName;
       this.serverPeer = serverPeer;
       this.clientID = defaultClientID;
       this.id = id;
@@ -263,6 +278,25 @@
       }
    }
 
+   public void addCallback(String VMID, String remotingSessionID,
+                           ServerInvokerCallbackHandler callbackHandler) throws JMSException
+   {
+      log.debug("Adding callbackHandler on ConnectionFactory");
+      serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
+   }
+
+   public void removeCallback(String VMID, String remotingSessionID,
+                           ServerInvokerCallbackHandler callbackHandler) throws JMSException
+   {
+      log.debug("Removing callbackHandler on ConnectionFactory");
+      serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
+   }
+
+   public TopologyResult getTopology() throws JMSException
+   {
+      return new TopologyResult(uniqueName, delegates, failoverMap);
+   }
+
    // Public ---------------------------------------------------------------------------------------
    
    public String getID()
@@ -289,35 +323,29 @@
    public void updateClusteredClients(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
       throws Exception
    {
-      // TODO Should we lock the CFEndpoint now allowing new connections to come while doing this?
+      updateTopology(delegates, failoverMap);
 
-      List activeConnections = serverPeer.getConnectionManager().getActiveConnections();
+      ServerInvokerCallbackHandler[] clientFactoriesToUpdate = serverPeer.getConnectionManager().getConnectionFactoryCallback(this.uniqueName);
+      log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + clientFactoriesToUpdate.length);
 
       ConnectionFactoryUpdate message =
-         new ConnectionFactoryUpdate(delegates, failoverMap);
+         new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
 
       Callback callback = new Callback(message);
 
-      for (Iterator i = activeConnections.iterator(); i.hasNext();)
+      for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
       {
-         ServerConnectionEndpoint connEndpoint = (ServerConnectionEndpoint)i.next();
-
-         if (connEndpoint.getConnectionFactoryEndpoint() == this)
-         {
-            log.trace(this + " sending cluster view update to " + connEndpoint);
-
-            try
-            {
-               connEndpoint.getCallbackHandler().handleCallbackOneway(callback);
-            }
-            catch (Exception e)
-            {
-               log.error("Callback failed on connection " + connEndpoint, e);
-            }
-         }
+         log.debug("Updating CF on callback " + o);
+         o.handleCallbackOneway(callback);
       }
    }
 
+   public void updateTopology(ClientConnectionFactoryDelegate[] delegates, Map failoverMap)
+   {
+      this.delegates = delegates;
+      this.failoverMap = failoverMap;
+   }
+
    public String toString()
    {
       return "ConnectionFactoryEndpoint[" + id + "]";

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/ConnectionFactoryAdvised.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,6 +25,7 @@
 
 import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
 import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.jms.delegate.TopologyResult;
 import org.jboss.jms.server.endpoint.ConnectionFactoryInternalEndpoint;
 import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
@@ -73,7 +74,26 @@
    {
       return endpoint.getClientAOPStack();
    }
-   
+
+   public void addCallback(String vmID, String remotingSessionID,
+                           ServerInvokerCallbackHandler callbackHandler) throws JMSException
+   {
+      ((ServerConnectionFactoryEndpoint)endpoint).addCallback(vmID,  remotingSessionID,
+                            callbackHandler);
+   }
+
+   public void removeCallback(String vmID, String remotingSessionID,
+                           ServerInvokerCallbackHandler callbackHandler) throws JMSException
+   {
+      ((ServerConnectionFactoryEndpoint)endpoint).removeCallback(vmID,  remotingSessionID,
+                            callbackHandler);
+   }
+
+   public TopologyResult getTopology() throws JMSException
+   {
+      return endpoint.getTopology();
+   }
+
    // ConnectionFactoryInternalEndpoint implementation ---------------------------------------------
    public CreateConnectionResult
       createConnectionDelegate(String username,

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -31,6 +31,7 @@
 import org.jboss.jms.exception.MessagingJMSException;
 import org.jboss.jms.wireformat.ConnectionFactoryCreateConnectionDelegateRequest;
 import org.jboss.jms.wireformat.RequestSupport;
+import org.jboss.jms.wireformat.CallbackRequestSupport;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.Util;
 import org.jboss.remoting.InvocationRequest;
@@ -135,31 +136,9 @@
            
          RequestSupport request = (RequestSupport)invocation.getParameter();
          
-         if (request instanceof ConnectionFactoryCreateConnectionDelegateRequest)
+         if (request instanceof CallbackRequestSupport)
          {
-            //Create connection request
-            
-            ConnectionFactoryCreateConnectionDelegateRequest cReq = 
-               (ConnectionFactoryCreateConnectionDelegateRequest)request;
-            
-            String remotingSessionId = cReq.getRemotingSessionID();
-            
-            ServerInvokerCallbackHandler callbackHandler = null;
-            synchronized(callbackHandlers)
-            {
-               callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
-            }
-            if (callbackHandler != null)
-            {
-               log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
-               
-               cReq.setCallbackHandler(callbackHandler);
-            }
-            else
-            {
-               throw new IllegalStateException("Cannot find callback handler " +
-                                               "for session id " + remotingSessionId);
-            }
+            performCallbackRequest(request);
          }
       
          return request.serverInvoke();
@@ -238,6 +217,32 @@
    // Protected ------------------------------------------------------------------------------------
    
    // Private --------------------------------------------------------------------------------------
-   
+
+   private void performCallbackRequest(RequestSupport request)
+   {
+      CallbackRequestSupport cReq =
+         (CallbackRequestSupport)request;
+
+      String remotingSessionId = cReq.getRemotingSessionID();
+
+      ServerInvokerCallbackHandler callbackHandler = null;
+      synchronized(callbackHandlers)
+      {
+               callbackHandler = (ServerInvokerCallbackHandler)callbackHandlers.get(remotingSessionId);
+      }
+      if (callbackHandler != null)
+      {
+         log.debug("found calllback handler for remoting session " + Util.guidToString(remotingSessionId));
+
+         cReq.setCallbackHandler(callbackHandler);
+      }
+      else
+      {
+         throw new IllegalStateException("Cannot find callback handler " +
+                                         "for session id " + remotingSessionId);
+      }
+   }
+
+
    // Inner classes --------------------------------------------------------------------------------
 }

Added: trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,124 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.wireformat;
+
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+
+/**
+ * Support of establishing server2client callback mechanism.
+ *
+ * (JMSServerInvocationHandler looks up for the callbackHandler based on the remoteSessionId.
+ *  That routine used to be dependent on ConnectionFactoryCreateConnectionDelegateRequest
+ *  but we also needed the same thing to establish callback on ConnectionFactory updates,
+ *  so we created another level for RequestSupport having the callback information)
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public abstract class CallbackRequestSupport extends RequestSupport
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   private String remotingSessionId;
+
+   private transient ServerInvokerCallbackHandler callbackHandler;
+
+   private String clientVMId;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   protected CallbackRequestSupport()
+   {
+   }
+
+   protected CallbackRequestSupport(String clientVMId, String remotingSessionId, String objectId, int methodId, byte version)
+   {
+      super(objectId, methodId, version);
+      this.remotingSessionId = remotingSessionId;
+      this.clientVMId = clientVMId;
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   public String getRemotingSessionID()
+   {
+      return remotingSessionId;
+   }
+
+
+   public String getClientVMID()
+   {
+      return clientVMId;
+   }
+
+   public void setRemotingSessionId(String remotingSessionId)
+   {
+      this.remotingSessionId = remotingSessionId;
+   }
+
+   public ServerInvokerCallbackHandler getCallbackHandler()
+   {
+      return callbackHandler;
+   }
+
+   public void setCallbackHandler(ServerInvokerCallbackHandler callbackHandler)
+   {
+      this.callbackHandler = callbackHandler;
+   }
+
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+
+      os.writeUTF(remotingSessionId);
+
+      os.writeUTF(clientVMId);
+   }
+
+   public void read(DataInputStream is) throws Exception
+   {
+      super.read(is);
+
+      remotingSessionId = is.readUTF();
+
+      clientVMId = is.readUTF();
+      
+
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/wireformat/CallbackRequestSupport.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,98 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.wireformat;
+
+import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
+import org.jboss.jms.delegate.CreateConnectionResult;
+import org.jboss.logging.Logger;
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class ConnectionFactoryAddCallbackRequest extends CallbackRequestSupport
+{
+
+   private static final Logger log = Logger.getLogger(ConnectionFactoryAddCallbackRequest.class);
+
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryAddCallbackRequest()
+   {
+   }
+
+   public ConnectionFactoryAddCallbackRequest(String jvmSessionId, String remotingSessionId, String objectId, byte version)
+   {
+      super(jvmSessionId, remotingSessionId, objectId, PacketSupport.REQ_CONNECTIONFACTORY_ADDCALLBACK, version);
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   public ResponseSupport serverInvoke() throws Exception
+   {
+      log.debug("serverInvoke callbackHandler=" + this.getCallbackHandler());
+      ConnectionFactoryAdvised advised =
+         (ConnectionFactoryAdvised)Dispatcher.instance.getTarget(objectId);
+
+      if (advised == null)
+      {
+         throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+      }
+
+      advised.addCallback(getClientVMID(), getRemotingSessionID(), this.getCallbackHandler());
+
+
+
+      return null;
+   }
+
+
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+      os.flush();
+   }
+
+   public void read(DataInputStream is) throws Exception
+   {
+      super.read(is);
+   }
+}


Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryAddCallbackRequest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryCreateConnectionDelegateRequest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -37,7 +37,7 @@
  * $Id$
  *
  */
-public class ConnectionFactoryCreateConnectionDelegateRequest extends RequestSupport
+public class ConnectionFactoryCreateConnectionDelegateRequest extends CallbackRequestSupport
 {
    private String username;
    
@@ -45,10 +45,6 @@
    
    private int failedNodeId;
    
-   private String remotingSessionId;
-   
-   private String clientVMId;
-   
    private transient ServerInvokerCallbackHandler callbackHandler;
    
    public ConnectionFactoryCreateConnectionDelegateRequest()
@@ -62,12 +58,8 @@
                                                            String username, String password,
                                                            int failedNodeId)
    {
-      super(objectId, PacketSupport.REQ_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE, version);
+      super(clientVMId, remotingSessionId, objectId, PacketSupport.REQ_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE, version);
       
-      this.remotingSessionId = remotingSessionId;
-      
-      this.clientVMId = clientVMId;
-      
       this.username = username;
       
       this.password = password;
@@ -79,10 +71,6 @@
    {
       super.read(is);
       
-      remotingSessionId = is.readUTF();
-      
-      clientVMId = is.readUTF();
-      
       username = readNullableString(is);
       
       password = readNullableString(is);
@@ -102,7 +90,7 @@
       
       CreateConnectionResult del = 
          advised.createConnectionDelegate(username, password, failedNodeId,
-                                           remotingSessionId, clientVMId, version,
+                                           getRemotingSessionID(), getClientVMID(), version,
                                            callbackHandler);
       
       return new ConnectionFactoryCreateConnectionDelegateResponse(del);
@@ -112,10 +100,6 @@
    {
       super.write(os);
       
-      os.writeUTF(remotingSessionId);
-      
-      os.writeUTF(clientVMId);
-      
       //Write the args
            
       writeNullableString(username, os);
@@ -127,16 +111,6 @@
       os.flush();
    }
    
-   public String getRemotingSessionID()
-   {
-      return remotingSessionId;
-   }
-   
-   public String getClientVMID()
-   {
-      return clientVMId;
-   }
-   
    public ServerInvokerCallbackHandler getCallbackHandler()
    {
       return this.callbackHandler;

Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,90 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.wireformat;
+
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import org.jboss.messaging.util.Version;
+import org.jboss.jms.delegate.ConnectionFactoryEndpoint;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class ConnectionFactoryGetTopologyRequest extends RequestSupport
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryGetTopologyRequest()
+   {
+   }
+
+   public ConnectionFactoryGetTopologyRequest(String objectId)
+   {
+      super(objectId, REQ_CONNECTIONFACTORY_GETTOPOLOGY, Version.instance().getProviderIncrementingVersion());
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+      os.flush();
+   }
+
+   public void read(DataInputStream is) throws Exception
+   {
+      super.read(is);
+   }
+
+   public ResponseSupport serverInvoke() throws Exception
+   {
+      ConnectionFactoryEndpoint endpoint =
+         (ConnectionFactoryEndpoint)Dispatcher.instance.getTarget(objectId);
+
+      if (endpoint == null)
+      {
+         throw new IllegalStateException("Cannot find object with ID " + objectId + " in dispatcher");
+      }
+
+      return new ConnectionFactoryGetTopologyResponse(endpoint.getTopology());
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyRequest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,85 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.wireformat;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import org.jboss.jms.delegate.TopologyResult;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class ConnectionFactoryGetTopologyResponse extends ResponseSupport
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   TopologyResult result;
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryGetTopologyResponse(TopologyResult result)
+   {
+      super(RESP_CONNECTIONFACTORY_GETTOPOLOGY);
+      this.result = result;
+   }
+
+   public ConnectionFactoryGetTopologyResponse()
+   {
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+   public void read(DataInputStream is) throws Exception
+   {
+      result = new TopologyResult();
+      result.read(is);
+   }
+
+
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+      result.write(os);
+      os.flush();
+   }
+
+   public Object getResponse()
+   {
+      return result;
+   }
+}


Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryGetTopologyResponse.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Added: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java	                        (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -0,0 +1,91 @@
+/*
+   * JBoss, Home of Professional Open Source
+   * Copyright 2005, JBoss Inc., and individual contributors as indicated
+   * by the @authors tag. See the copyright.txt in the distribution for a
+   * full listing of individual contributors.
+   *
+   * This is free software; you can redistribute it and/or modify it
+   * under the terms of the GNU Lesser General Public License as
+   * published by the Free Software Foundation; either version 2.1 of
+   * the License, or (at your option) any later version.
+   *
+   * This software is distributed in the hope that it will be useful,
+   * but WITHOUT ANY WARRANTY; without even the implied warranty of
+   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+   * Lesser General Public License for more details.
+   *
+   * You should have received a copy of the GNU Lesser General Public
+   * License along with this software; if not, write to the Free
+   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+   */
+
+package org.jboss.jms.wireformat;
+
+import java.io.DataOutputStream;
+import java.io.DataInputStream;
+import org.jboss.jms.server.endpoint.advised.ConnectionFactoryAdvised;
+import org.jboss.messaging.util.Version;
+
+/**
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @version <tt>$Revision$</tt>
+ *          $Id$
+ */
+public class ConnectionFactoryRemoveCallbackRequest extends CallbackRequestSupport
+{
+
+   // Constants ------------------------------------------------------------------------------------
+
+   // Attributes -----------------------------------------------------------------------------------
+
+   // Static ---------------------------------------------------------------------------------------
+
+   // Constructors ---------------------------------------------------------------------------------
+
+   public ConnectionFactoryRemoveCallbackRequest()
+   {
+   }
+
+   public ConnectionFactoryRemoveCallbackRequest(String jvmSessionId, String remotingSessionId, String objectId, byte version)
+   {
+      super(jvmSessionId, remotingSessionId, objectId, PacketSupport.REQ_CONNECTIONFACTORY_REMOVECALLBACK, Version.instance().getProviderIncrementingVersion());
+   }
+
+   // Public ---------------------------------------------------------------------------------------
+   public void write(DataOutputStream os) throws Exception
+   {
+      super.write(os);
+      os.flush();
+   }
+
+   public void read(DataInputStream is) throws Exception
+   {
+      super.read(is);
+   }
+
+   public ResponseSupport serverInvoke() throws Exception
+   {
+      ConnectionFactoryAdvised advised =
+         (ConnectionFactoryAdvised)Dispatcher.instance.getTarget(objectId);
+
+      if (advised == null)
+      {
+         throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
+      }
+
+      advised.removeCallback(getClientVMID(), getRemotingSessionID(), this.getCallbackHandler());
+
+
+      return null;
+   }
+
+   // Package protected ----------------------------------------------------------------------------
+
+   // Protected ------------------------------------------------------------------------------------
+
+   // Private --------------------------------------------------------------------------------------
+
+   // Inner classes --------------------------------------------------------------------------------
+
+}


Property changes on: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryRemoveCallbackRequest.java
___________________________________________________________________
Name: svn:keywords
   + Id LastChangedDate Author Revision

Modified: trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/wireformat/ConnectionFactoryUpdate.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -24,11 +24,10 @@
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.delegate.TopologyResult;
 
 /**
  * This class holds the update cluster view sent by the server to client-side clustered connection
@@ -47,22 +46,18 @@
 
    // Attributes -----------------------------------------------------------------------------------
 
-   private ClientConnectionFactoryDelegate[] delegates;
-   
-   private Map failoverMap;
+   TopologyResult topology;
 
    // Static ---------------------------------------------------------------------------------------
 
    // Constructors ---------------------------------------------------------------------------------
 
-   public ConnectionFactoryUpdate(ClientConnectionFactoryDelegate[] delegates,
+   public ConnectionFactoryUpdate(String uniqueName, ClientConnectionFactoryDelegate[] delegates,
                                   Map failoverMap)
    {
       super(PacketSupport.CONNECTIONFACTORY_UPDATE);
-      
-      this.delegates = delegates;
-      
-      this.failoverMap = failoverMap;
+
+      topology = new TopologyResult(uniqueName, delegates, failoverMap);
    }
    
    public ConnectionFactoryUpdate()
@@ -71,108 +66,36 @@
 
    // Public ---------------------------------------------------------------------------------------
 
-   public ClientConnectionFactoryDelegate[] getDelegates()
+   public String toString()
    {
-      return delegates;
+      return "ConnectionFactoryUpdateMessage{" + topology + "}";
    }
 
-   public void setDelegates(ClientConnectionFactoryDelegate[] delegates)
+   public TopologyResult getTopology()
    {
-      this.delegates = delegates;
+      return topology;
    }
 
-   public Map getFailoverMap()
+   public void setTopology(TopologyResult topology)
    {
-      return failoverMap;
+      this.topology = topology;
    }
 
-   public void setFailoverMap(Map failoverMap)
-   {
-      this.failoverMap = failoverMap;
-   }
-   
-
-   public String toString()
-   {
-      StringBuffer sb = new StringBuffer("ConnectionFactoryUpdateMessage[");
-
-      if (delegates != null)
-      {
-         for(int i = 0; i < delegates.length; i++)
-         {
-            sb.append(delegates[i]);
-            if (i < delegates.length - 1)
-            {
-               sb.append(',');
-            }
-         }
-      }
-
-      sb.append("]");
-
-      return sb.toString();
-   }   
-   
    // Streamable implementation
    // ---------------------------------------------------------------     
 
    public void read(DataInputStream is) throws Exception
-   {    
-      int len = is.readInt();
-      
-      delegates = new ClientConnectionFactoryDelegate[len];
-      
-      for (int i = 0; i < len; i++)
-      {
-         delegates[i] = new ClientConnectionFactoryDelegate();
-         
-         delegates[i].read(is);
-      }
-      
-      len = is.readInt();
-      
-      failoverMap = new HashMap(len);
-      
-      for (int c = 0; c < len; c++)
-      {
-         Integer i = new Integer(is.readInt());
-         
-         Integer j = new Integer(is.readInt());
-         
-         failoverMap.put(i, j);
-      }
+   {
+      topology = new TopologyResult();
+      topology.read(is);
    }
 
    public void write(DataOutputStream os) throws Exception
    {
       super.write(os);
-      
-      int len = delegates.length;
-      
-      os.writeInt(len);
-      
-      for (int i = 0; i < len; i++)
-      {
-         delegates[i].write(os);
-      }
-      
-      os.writeInt(failoverMap.size());
-      
-      Iterator iter = failoverMap.entrySet().iterator();
-      
-      while (iter.hasNext())
-      {
-         Map.Entry entry = (Map.Entry)iter.next();
-         
-         Integer i = (Integer)entry.getKey();
-         
-         Integer j = (Integer)entry.getValue();
-         
-         os.writeInt(i.intValue());
-         
-         os.writeInt(j.intValue());
-      }            
-      
+
+      topology.write(os);
+
       os.flush();
    }
 

Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -77,6 +77,9 @@
    public static final int REQ_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE = 100;
    public static final int REQ_CONNECTIONFACTORY_GETIDBLOCK = 101;
    public static final int REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK = 102;
+   public static final int REQ_CONNECTIONFACTORY_ADDCALLBACK = 103;
+   public static final int REQ_CONNECTIONFACTORY_REMOVECALLBACK = 104;
+   public static final int REQ_CONNECTIONFACTORY_GETTOPOLOGY = 105;
    
    // Connection
    // ----------
@@ -137,8 +140,9 @@
    public static final int RESP_CONNECTIONFACTORY_CREATECONNECTIONDELEGATE = 100100;   
    public static final int RESP_CONNECTIONFACTORY_GETIDBLOCK = 100101;   
    public static final int RESP_CONNECTIONFACTORY_GETCLIENTAOPSTACK = 100102;
+   public static final int RESP_CONNECTIONFACTORY_GETTOPOLOGY = 100105;
       
-   // Connection 
+   // Connection
    // -------------------------------------
    
    public static final int RESP_CONNECTION_CREATESESSIONDELEGATE = 100200;   
@@ -207,8 +211,17 @@
          case REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK:
             packet = new ConnectionFactoryGetClientAOPStackRequest();
             break;
-            
-         // Connection   
+         case REQ_CONNECTIONFACTORY_ADDCALLBACK:
+            packet = new ConnectionFactoryAddCallbackRequest();
+            break;
+         case REQ_CONNECTIONFACTORY_REMOVECALLBACK:
+            packet = new ConnectionFactoryRemoveCallbackRequest();
+            break;
+         case REQ_CONNECTIONFACTORY_GETTOPOLOGY:
+            packet = new ConnectionFactoryGetTopologyRequest();
+            break;
+
+         // Connection
          case REQ_CONNECTION_CREATESESSIONDELEGATE:
             packet = new ConnectionCreateSessionDelegateRequest();
             break;
@@ -302,9 +315,12 @@
             break;
          case RESP_CONNECTIONFACTORY_GETCLIENTAOPSTACK:
             packet = new ConnectionFactoryGetClientAOPStackResponse();
-            break;            
+            break;
+         case RESP_CONNECTIONFACTORY_GETTOPOLOGY:
+            packet = new ConnectionFactoryGetTopologyResponse();
+            break;
             
-         // Connection 
+         // Connection
          case RESP_CONNECTION_CREATESESSIONDELEGATE:
             packet = new ConnectionCreateSessionDelegateResponse();
             break;

Modified: trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/src/main/org/jboss/messaging/util/ConcurrentHashSet.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -39,9 +39,9 @@
  *
  * $Id: ConcurrentReaderHashSet.java 1935 2007-01-09 23:29:20Z clebert.suconic at jboss.com $
  */
-public class ConcurrentHashSet extends AbstractSet
+public class ConcurrentHashSet<E> extends AbstractSet<E>
 {
-   private Map theMap;
+   private Map<E, Object> theMap;
    
    private static Object dummy = new Object();
    
@@ -77,7 +77,7 @@
       return theMap.isEmpty();
    }
    
-   public boolean add(Object o)
+   public boolean add(E o)
    {
       return theMap.put(o, dummy) == dummy;
    }

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/build.xml	2007-08-21 04:19:48 UTC (rev 3019)
@@ -189,8 +189,8 @@
       <mkdir dir="${build.tests.classes}"/>
       <javac destdir="${build.tests.classes}"
              optimize="${javac.optimize}"
-             target="1.4"
-             source="1.4"
+             target="1.5"
+             source="1.5"
              debug="${javac.debug}"
              depend="${javac.depend}"
              verbose="${javac.verbose}"
@@ -758,7 +758,6 @@
             <fileset dir="${build.tests.classes}">
               <include name="**/jms/clustering/${test-mask}.class"/>
               <exclude name="**/jms/clustering/ClusterLeakTest.class"/>
-              <exclude name="**/jms/clustering/ClusterViewUpdateTest.class"/>
            </fileset>
          </batchtest>
       </junit>

Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -95,6 +95,10 @@
 import org.jboss.jms.wireformat.SessionRecoverDeliveriesRequest;
 import org.jboss.jms.wireformat.SessionSendRequest;
 import org.jboss.jms.wireformat.SessionUnsubscribeRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryAddCallbackRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyRequest;
+import org.jboss.jms.wireformat.ConnectionFactoryGetTopologyResponse;
+import org.jboss.jms.wireformat.ConnectionFactoryRemoveCallbackRequest;
 import org.jboss.remoting.InvocationRequest;
 
 /**
@@ -150,6 +154,11 @@
    {                         
       wf.testConnectionFactoryGetClientAOPStack();
    }
+
+   public void testConnectionFactoryGetTopology() throws Exception
+   {
+      wf.testConnectionFactoryGetTopology();
+   }
    
    // Connection
    
@@ -309,7 +318,17 @@
    {                        
       wf.testConnectionFactoryGetClientAOPStackResponse();
    }
-   
+
+   public void testConnectionFactoryAddCabllack() throws Exception
+   {
+      wf.testConnectionFactoryAddCabllack();
+   }
+
+   public void testConnectionFactoryRemoveCabllack() throws Exception
+   {
+      wf.testConnectionFactoryRemoveCabllack();
+   }
+
    // Connection
    
    public void testConnectionCreateSessionDelegateResponse() throws Exception
@@ -378,8 +397,8 @@
    {                 
       wf.testClosingResponse();
    }
-   
-   
+
+
    //We just check the first byte to make sure serialization is not be used.
    
    private class TestWireFormat extends JMSWireFormat
@@ -452,10 +471,34 @@
       {
          RequestSupport req =
             new ConnectionFactoryGetClientAOPStackRequest("23", (byte)77);;
-                 
-         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK);                           
+
+         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_GETCLIENTAOPSTACK);
       }
-      
+
+      public void testConnectionFactoryAddCabllack() throws Exception
+      {
+         RequestSupport req =
+            new ConnectionFactoryAddCallbackRequest("12", "23", "24",(byte)0);
+
+         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_ADDCALLBACK);
+      }
+
+      public void testConnectionFactoryRemoveCabllack() throws Exception
+      {
+         RequestSupport req =
+            new ConnectionFactoryRemoveCallbackRequest("12", "23", "24",(byte)0);
+
+         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_REMOVECALLBACK);
+      }
+
+      public void testConnectionFactoryGetTopology() throws Exception
+      {
+         RequestSupport req =
+            new ConnectionFactoryGetTopologyRequest("123");
+
+         testPacket(req, PacketSupport.REQ_CONNECTIONFACTORY_GETTOPOLOGY);
+      }
+
       // Connection
       
       public void testConnectionCreateSessionDelegateRequest() throws Exception

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusterViewUpdateTest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -24,12 +24,19 @@
 
 import javax.jms.Connection;
 import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.TextMessage;
 
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.container.ClusteringAspect;
+import org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler;
 import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.state.ConnectionState;
+import org.jboss.jms.delegate.TopologyResult;
 import org.jboss.test.messaging.tools.ServerManagement;
+import java.lang.ref.WeakReference;
 
 /**
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -61,47 +68,223 @@
    	
    	super.setUp();
    }
-      
+
+   /** This method is to make sure CFs are being released on GC, validating if the callbacks
+    *  are not making any hard references */
+   public void testUpdateTopology() throws Throwable
+   {
+
+      JBossConnectionFactory cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+      ClientClusteredConnectionFactoryDelegate clusterDelegate = (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+      assertEquals(2, clusterDelegate.getDelegates().length);
+      clusterDelegate.getTopology();
+      assertEquals(2, clusterDelegate.getDelegates().length);
+
+      // Kill the same node as the CF is connected to
+      for (int i=5;i>0;i--)
+      {
+         log.info("kill in " + i);
+         Thread.sleep(1000);
+      }
+      ServerManagement.kill(1);
+      Thread.sleep(5000);
+      assertEquals(1, clusterDelegate.getDelegates().length);
+      TopologyResult topology = clusterDelegate.getTopology();
+      assertEquals(1, topology.getDelegates().length);
+
+      clusterDelegate.closeCallback();
+   }
+
+
+   /** This method is to make sure CFs are being released on GC, validating if the callbacks
+    *  are not making any hard references.
+    *
+    * Note: Even though this test is looking for memory leaks, it's important this test
+    *       runs as part of the validation of ConnectionFactory updates, as a leak on CF would
+    *       cause problems with excessive number of connections.
+    *  */
+  public void testGarbageCollectionOnClusteredCF() throws Throwable
+   {
+
+      JBossConnectionFactory cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+      ClientClusteredConnectionFactoryDelegate clusterDelegate = (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+      WeakReference<ClientClusteredConnectionFactoryDelegate> ref = new WeakReference<ClientClusteredConnectionFactoryDelegate>(clusterDelegate);
+
+      // Using a separate block, as everything on this block has to be released (no references from the method)
+      {
+         Connection conn = cf.createConnection();
+         conn.start();
+         Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+         MessageProducer prod = session.createProducer(queue[0]);
+         MessageConsumer cons = session.createConsumer(queue[0]);
+         prod.send(session.createTextMessage("Hello"));
+         session.commit();
+         TextMessage message = (TextMessage)cons.receive(1000);
+         assertEquals("Hello", message.getText());
+         log.info("Received message " + message.getText());
+         session.commit();
+         conn.close();
+      }
+
+
+      // cf = null;
+      clusterDelegate = null;
+
+      int loops = 0;
+      // Stays on loop until GC is done
+      while (ref.get()!=null)
+      {
+         for (int i=0;i<10000;i++)
+         {
+            /// Just throwing extra garbage on the memory.. to make sure GC will happen
+            
+            String str = "GARBAGE GARBAGE GARBAGE GARBAGE GARBAGE GARBAGE GARBAGE " + i;
+         }
+
+         log.info("Calling system.gc()");
+         System.gc();
+         Thread.sleep(1000);
+         if (loops++ > 10)
+         {
+            // This should be more than enough already... even the object wasn't cleared on more than
+            // 2 GC cycles we have a leak already.
+
+            // Note! Due to AOP references the CFDelegate will be released from AOP instances
+            //       only after 2 or more GC cycles.
+            break;
+         }
+      }
+
+        // Case there are still references to the ConnectionFactory, uncomment this code,
+        //  add -agentlib:jbossAgent to your JVM arguments (with jboss-profiler lib on path or LD_LIBRARY_PATH)
+        //  and this will tell you where the code is leaking.
+//      org.jboss.profiler.jvmti.JVMTIInterface jvmti = new org.jboss.profiler.jvmti.JVMTIInterface();
+//
+//
+//      if (ref.get() != null)
+//      {
+//         if (jvmti.isActive())
+//         {
+//            log.info("There are still references to CF");
+//            HashMap refMap = jvmti.createIndexMatrix();
+//            log.info(jvmti.exploreObjectReferences(refMap, ref.get(), 5, true));
+//         }
+//         else
+//         {
+//            log.info("Profiler is not active");
+//         }
+//      }
+//
+      assertNull("There is a memory leak on ClientClusteredConnectionFactoryDelegate", ref.get());
+   }
+   
    public void testUpdateConnectionFactoryWithNoConnections() throws Exception
    {
-   	ServerManagement.kill(1);
+
+      ServerManagement.kill(1);
    	
    	Thread.sleep(5000);
    	
    	Connection conn = createConnectionOnServer(cf, 0);
-   	
-   	ClientClusteredConnectionFactoryDelegate cfDelegate =
-         (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+      try
+      {
 
-      assertEquals(1, cfDelegate.getDelegates().length);
-   	
-   	conn.close();
+         ClientClusteredConnectionFactoryDelegate cfDelegate =
+            (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+
+         assertEquals(1, cfDelegate.getDelegates().length);
+      }
+      finally
+      {
+         conn.close();
+      }
    }
    
    public void testUpdateConnectionFactoryWithNoInitialConnections() throws Exception
    {
-   	//We kill all the servers - this tests the connection factory's ability to create a first connection
-   	//when its entire cached set of delegates is stale
-   	
-   	ServerManagement.kill(1);
-   	
-   	ServerManagement.kill(0);
-   	
-   	ServerManagement.start(0, "all", false);      
-   	
-   	Thread.sleep(5000);
-   	
-   	Connection conn = createConnectionOnServer(cf, 0);
-   	
-   	ClientClusteredConnectionFactoryDelegate cfDelegate =
-         (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+      try
+      {
+         ClientClusteredConnectionFactoryDelegate clusterDelegate = (ClientClusteredConnectionFactoryDelegate)this.cf.getDelegate();
 
-      assertEquals(1, cfDelegate.getDelegates().length);
-   	
-   	conn.close();
-   	
-   	ServerManagement.kill(0);
+         //JBossConnectionFactory cf2 = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+         //ClientClusteredConnectionFactoryDelegate clusterDelegate2 = (ClientClusteredConnectionFactoryDelegate)cf2.getDelegate();
+
+         //We kill all the servers - this tests the connection factory's ability to create a first connection
+         //when its entire cached set of delegates is stale
+
+         startDefaultServer(2, currentOverrides, false);
+
+         assertEquals(3, clusterDelegate.getDelegates().length);
+
+         // assertEquals(3, clusterDelegate2.getDelegates().length);
+         log.info("#################################### Killing server 1 and 0");
+         ServerManagement.log(ServerManagement.INFO, "Killing server1", 2);
+         ServerManagement.kill(1);
+         // Need some time for Lease 
+         Thread.sleep(5000);
+
+         assertEquals("Delegates are different on topology", 2,clusterDelegate.getTopology().getDelegates().length);
+         assertEquals(2, clusterDelegate.getDelegates().length);
+
+         ServerManagement.log(ServerManagement.INFO, "Stopping server0", 2);
+         ServerManagement.stop(0);
+
+         Thread.sleep(1000);
+
+         assertEquals(1, clusterDelegate.getDelegates().length);
+
+         Connection conn = createConnectionOnServer(cf, 2);
+
+         ClientClusteredConnectionFactoryDelegate cfDelegate =
+            (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+
+         assertEquals(1, cfDelegate.getDelegates().length);
+
+         conn.close();
+      }
+      finally
+      {
+         ServerManagement.kill(2);
+      }
    }
+
+
+   /** Case the updateCF is not captured, the hopping should run nicely.
+    *  This test will disable CF callback for a connection and validate if hoping is working*/
+   public void testNoUpdateCaptured() throws Exception
+   {
+      JBossConnectionFactory cfNoCallback = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+      ClientClusteredConnectionFactoryDelegate noCallbackDelegate =  (ClientClusteredConnectionFactoryDelegate )cfNoCallback.getDelegate();
+      noCallbackDelegate.closeCallback();
+
+      ServerManagement.kill(1);
+
+      Connection conn = null;
+
+      for (int i=0; i<4; i++)
+      {
+         try
+         {
+            conn = cfNoCallback.createConnection();
+            // 0 is the only server alive, so.. all connection should be performed on it
+            assertEquals(0, getServerId(conn));
+         }
+         finally
+         {
+            try
+            {
+               if (conn != null)
+               {
+                  conn.close();
+               }
+            }
+            catch (Throwable ignored)
+            {
+            }
+         }
+      }
+
+   }
    
    public void testUpdateConnectionFactoryOnKill() throws Exception
    {

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -25,8 +25,12 @@
 import javax.jms.Connection;
 
 import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
 import org.jboss.jms.exception.MessagingNetworkFailureException;
 import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
+import org.jboss.test.messaging.tools.container.ServiceContainer;
 import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
 
 /**
@@ -171,6 +175,45 @@
       }
    }
 
+   public void testRestartServer() throws Exception
+   {
+      JBossConnectionFactory cf2 = (JBossConnectionFactory) ic[1].lookup("/ConnectionFactory");
+
+      ClientClusteredConnectionFactoryDelegate clusterCF = (ClientClusteredConnectionFactoryDelegate)cf.getDelegate();
+      ClientConnectionFactoryDelegate delegates[] = clusterCF.getDelegates();
+      clusterCF.closeCallback();
+
+      ServerManagement.kill(1);
+
+      //Restart the server on the same place
+      ServiceAttributeOverrides attr = new ServiceAttributeOverrides();
+      attr.put(ServiceContainer.REMOTING_OBJECT_NAME, "LocatorURI",delegates[1].getServerLocatorURI());
+      ServerManagement.start(1,config,attr,false);
+
+      // The server back on the same remoting port as before
+      startDefaultServer(1, attr, false);
+
+      Connection conn = null;
+      try
+      {
+         conn = cf2.createConnection();
+      }
+      finally
+      {
+         try
+         {
+            if (conn != null)
+            {
+               conn.close();
+            }
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+      
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java	2007-08-16 09:23:53 UTC (rev 3018)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ClusteringTestBase.java	2007-08-21 04:19:48 UTC (rev 3019)
@@ -186,20 +186,16 @@
 	      if (!ServerManagement.isStarted(i))
 	      {
 	      	log.info("Server " + i + " is not started - starting it");
-	      	
-	      	ServerManagement.start(i, config, currentOverrides, ic == null);
-	      	
-	      	if (ic == null)
-	         {
-	         	ic = new InitialContext[nodeCount];
-	            queue = new Queue[nodeCount];
-	            topic = new Topic[nodeCount];
-	         }
-	      	
-	      	log.info("deploying queue on node " + i);
-	      	ServerManagement.deployQueue("testDistributedQueue", i);
-	         ServerManagement.deployTopic("testDistributedTopic", i);
+
+            startDefaultServer(i, overrides, ic == null);
 	         
+            if (ic == null)
+            {
+               ic = new InitialContext[nodeCount];
+               queue = new Queue[nodeCount];
+               topic = new Topic[nodeCount];
+            }
+
 	         ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
 	          
 	         queue[i] = (Queue)ic[i].lookup("queue/testDistributedQueue");
@@ -230,7 +226,17 @@
       	cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
       }
    }
-   
+
+   protected void startDefaultServer(int serverNumber, ServiceAttributeOverrides attributes, boolean cleanDatabase)
+      throws Exception
+   {
+      ServerManagement.start(serverNumber, config, attributes, cleanDatabase);
+
+      log.info("deploying queue on node " + serverNumber);
+      ServerManagement.deployQueue("testDistributedQueue", serverNumber);
+      ServerManagement.deployTopic("testDistributedTopic", serverNumber);
+   }
+
    private boolean overridesChanged(ServiceAttributeOverrides sao1, ServiceAttributeOverrides sao2)
    {
    	Map map1 = sao1 == null ? null : sao1.getMap();




More information about the jboss-cvs-commits mailing list