[jboss-cvs] JBoss Messaging SVN: r3965 - in branches/Branch_Stable: src/main/org/jboss/jms/server/connectionmanager and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 27 14:01:52 EDT 2008


Author: timfox
Date: 2008-03-27 14:01:52 -0400 (Thu, 27 Mar 2008)
New Revision: 3965

Modified:
   branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionManager.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1262


Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionManager.java	2008-03-27 17:51:04 UTC (rev 3964)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/ConnectionManager.java	2008-03-27 18:01:52 UTC (rev 3965)
@@ -24,6 +24,7 @@
 import java.util.List;
 
 import org.jboss.jms.delegate.ConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.messaging.core.contract.MessagingComponent;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
@@ -59,17 +60,15 @@
     * @return List<ConnectionEndpoint>
     */
    List getActiveConnections();
+   
+   void registerConnectionFactoryCallback(String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler);
 
-   void addConnectionFactoryCallback(String uniqueName, String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler);
-
-   void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler);
+   void unregisterConnectionFactoryCallback(String JVMID, String remotingSessionID);
    
-   ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName);
 
-   /**
-    * @param clientToServer - true if the failure has been detected on a direct connection from
-    *        client to this server, false if the failure has been detected while trying to send a
-    *        callback from this server to the client.
-    */
-   void handleClientFailure(String remotingSessionID, boolean clientToServer);
+   void handleClientFailure(String remotingSessionID);
+   
+   void registerConnectionFactory(ServerConnectionFactoryEndpoint cf);
+   
+   void unregisterConnectionFactory(ServerConnectionFactoryEndpoint cf);
 }

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2008-03-27 17:51:04 UTC (rev 3964)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionmanager/SimpleConnectionManager.java	2008-03-27 18:01:52 UTC (rev 3965)
@@ -25,17 +25,16 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
 
 import org.jboss.jms.delegate.ConnectionEndpoint;
 import org.jboss.jms.server.ConnectionManager;
 import org.jboss.jms.server.endpoint.ServerConnectionEndpoint;
+import org.jboss.jms.server.endpoint.ServerConnectionFactoryEndpoint;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.ClusterNotification;
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
@@ -67,6 +66,8 @@
    // Attributes -----------------------------------------------------------------------------------
 
    private Map</** VMID */String, Map</** RemoteSessionID */String, ConnectionEndpoint>> jmsClients;
+   
+   private Map</* VMID */String, Map</* remoting session id */String, ServerInvokerCallbackHandler>> cfHandlers;
 
    // Map<remotingClientSessionID<String> - jmsClientVMID<String>
    private Map<String, String> remotingSessions;
@@ -74,24 +75,27 @@
    // Set<ConnectionEndpoint>
    private Set<ConnectionEndpoint> activeConnectionEndpoints;
 
-   private Map</** CFUniqueName*/ String, ConnectionFactoryCallbackInformation> cfCallbackInfo;
+   private Replicator replicator;
    
-   private Replicator replicator;
+   private Set<ServerConnectionFactoryEndpoint> connectionFactories = new ConcurrentHashSet<ServerConnectionFactoryEndpoint>();
+   
 
    // Constructors ---------------------------------------------------------------------------------
 
    public SimpleConnectionManager()
    {
-      jmsClients = new HashMap<String, Map<String, ConnectionEndpoint>>();
+   	jmsClients = new HashMap<String, Map<String, ConnectionEndpoint>>();
+      
       remotingSessions = new HashMap<String, String>();
+      
+      cfHandlers = new HashMap<String, Map<String, ServerInvokerCallbackHandler>>();
+            
       activeConnectionEndpoints = new HashSet<ConnectionEndpoint>();
-      cfCallbackInfo = new ConcurrentHashMap<String, ConnectionFactoryCallbackInformation>();
    }
 
    // ConnectionManager implementation -------------------------------------------------------------
-
    
-   
+ 
    public synchronized void registerConnection(String jmsClientVMID,
                                                String remotingClientSessionID,
                                                ConnectionEndpoint endpoint)
@@ -114,11 +118,32 @@
       log.debug("registered connection " + endpoint + " as " +
                 Util.guidToString(remotingClientSessionID));
    }
+   
+   public synchronized void registerConnectionFactoryCallback(String JVMID,
+                                                               String remotingSessionID,
+                                                               ServerInvokerCallbackHandler handler)
+   {
+   	Map<String, ServerInvokerCallbackHandler> handlers = cfHandlers.get(JVMID);
+      
+      if (handlers == null)
+      {
+      	handlers = new HashMap<String, ServerInvokerCallbackHandler>();
+         
+      	cfHandlers.put(JVMID, handlers);
+      }
+      
+      handlers.put(remotingSessionID, handler);
+      
+      remotingSessions.put(remotingSessionID, JVMID);
 
+      log.debug("registered cf callback handler " + handler + " as " +
+                Util.guidToString(remotingSessionID));
+   }
+
    public synchronized ConnectionEndpoint unregisterConnection(String jmsClientVMId,
                                                                String remotingClientSessionID)
    {
-      Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientVMId);
+      Map<String, ConnectionEndpoint> endpoints = this.jmsClients.get(jmsClientVMId);
       
       if (endpoints != null)
       {
@@ -126,7 +151,6 @@
 
          if (e != null)
          {
-            endpoints.remove(e);
             activeConnectionEndpoints.remove(e);
          }
 
@@ -145,6 +169,23 @@
       return null;
    }
    
+   public synchronized void unregisterConnectionFactoryCallback(String JVMID, String remotingSessionID)
+   {
+   	Map<String, ServerInvokerCallbackHandler> handlers = cfHandlers.get(JVMID);
+      
+      if (handlers != null)
+      {
+      	handlers.remove(remotingSessionID);
+      	
+      	if (handlers.isEmpty())
+      	{
+      		cfHandlers.remove(JVMID);
+      	}
+      	
+      	remotingSessions.remove(remotingSessionID);
+      }
+   }
+
    public synchronized List getActiveConnections()
    {
       // I will make a copy to avoid ConcurrentModification
@@ -153,26 +194,21 @@
       return list;
    }
       
-   public synchronized void handleClientFailure(String remotingSessionID, boolean clientToServer)
+   public synchronized void handleClientFailure(String remotingSessionID)
    {
-      String jmsClientID = (String)remotingSessions.get(remotingSessionID);
-
-      if (jmsClientID == null)
-      {
-         log.warn(this + " cannot look up remoting session ID " + remotingSessionID);
-      }
-
-      log.warn("A problem has been detected " +
-         (clientToServer ?
-            "with the connection to remote client ":
-            "trying to send a message to remote client ") +
-         remotingSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
-         "its connection(s) or the network has failed. All connection resources " +
-         "corresponding to that client process will now be removed.");
-
-      closeConsumersForClientVMID(jmsClientID);
+      cleanupForSessionID(remotingSessionID);
    }
    
+   public void registerConnectionFactory(ServerConnectionFactoryEndpoint cf)
+   {
+   	connectionFactories.add(cf);
+   }
+      
+   public void unregisterConnectionFactory(ServerConnectionFactoryEndpoint cf)
+   {
+   	connectionFactories.remove(cf);
+   }
+   
    // ConnectionListener implementation ------------------------------------------------------------
 
    /**
@@ -197,32 +233,12 @@
       
       if (remotingSessionID != null)
       {
-         handleClientFailure(remotingSessionID, true);
+         handleClientFailure(remotingSessionID);
       }
    }
-
-   /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
-   public synchronized void addConnectionFactoryCallback(String uniqueName, String JVMID, String remotingSessionID, ServerInvokerCallbackHandler handler)
-   {
-      remotingSessions.put(remotingSessionID, JVMID);
-      getCFInfo(uniqueName).addClient(JVMID, handler);
-   }
-
-   /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
-   public synchronized void removeConnectionFactoryCallback(String uniqueName, String JVMID, ServerInvokerCallbackHandler handler)
-   {
-      getCFInfo(uniqueName).removeHandler(JVMID, handler);
-   }
-
-   /** Synchronized is not really needed.. just to be safe as this is not supposed to be highly contended */
-   public synchronized ServerInvokerCallbackHandler[] getConnectionFactoryCallback(String uniqueName)
-   {
-      return getCFInfo(uniqueName).getAllHandlers();
-   }
-
+     
    // ClusterNotificationListener implementation ---------------------------------------------------
 
-
    /**
     * Closing connections that are coming from a failed node
     * @param notification
@@ -266,7 +282,10 @@
 
             log.trace("Closing consumers for clientVMID=" + clientVMID);
 
-            closeConsumersForClientVMID(clientVMID);
+            if (clientVMID != null)
+            {
+            	cleanupForVMID(clientVMID);
+            }
 			}
 			catch (Exception e)
 			{
@@ -321,183 +340,183 @@
    // Protected ------------------------------------------------------------------------------------
 
    // Private --------------------------------------------------------------------------------------
-
-   private ConnectionFactoryCallbackInformation getCFInfo(String uniqueName)
+  
+   private synchronized void cleanupForVMID(String jmsClientID)
    {
-      ConnectionFactoryCallbackInformation callback = cfCallbackInfo.get(uniqueName);
-      if (callback == null)
-      {
-         callback = new ConnectionFactoryCallbackInformation(uniqueName);
-         cfCallbackInfo.put(uniqueName, callback);
-         callback = cfCallbackInfo.get(uniqueName);
-      }
-      return callback;
-   }
-
-
-   private synchronized void closeConsumersForClientVMID(String jmsClientID)
-   {
-      if (jmsClientID == null)
-      {
-         return;
-      }
-      // Remoting only provides one pinger per invoker, not per connection therefore when the pinger
-      // dies we must close ALL connections corresponding to that jms client ID.
-
       Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientID);
 
       if (endpoints != null)
       {
-         List<ConnectionEndpoint> sces = new ArrayList<ConnectionEndpoint>();
+      	//Copy to prevent ConcurrentModificationException
+         List<ConnectionEndpoint> sces = new ArrayList<ConnectionEndpoint>(endpoints.values());
 
-         for (Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
-         {
-            ConnectionEndpoint sce = entry.getValue();
-            sces.add(sce);
-         }
-
          // Now close the end points - this will result in a callback into unregisterConnection
          // to remove the data from the jmsClients and sessions maps.
-         // Note we do this outside the loop to prevent ConcurrentModificationException
 
          for(ConnectionEndpoint sce: sces )
          {
+   			log.debug("clearing up state for connection " + sce);
+
+            // sce could also be a mock test.. so this test is required
+            if (sce instanceof ServerConnectionEndpoint)
+            {
+               //Remoting is dumb and doesn't clean up it's state after itself - so we have to do it.
+               ((ServerConnectionEndpoint)sce).closeCallbackClient();
+            }
+   			
             try
             {
-      			log.debug("clearing up state for connection " + sce);
-
-               // sce could also be a mock test.. so this test is required
-               if (sce instanceof ServerConnectionEndpoint)
-               {
-                  //Remoting is dumb and doesn't clean up it's state after itself - so we have to do it.
-                  ((ServerConnectionEndpoint)sce).closeCallbackClient();
-               }
-      			
-               sce.closing(-1);
-               sce.close();
-               log.debug("cleared up state for connection " + sce);
+            	sce.closing(-1);
             }
-            catch (JMSException e)
+            catch (Throwable ignore)
+            {               	
+            }
+            try
             {
-               log.error("Failed to close connection", e);
-            }          
+            	sce.close();
+            }
+            catch (Throwable ignore)
+            {               	
+            }
+            log.debug("cleared up state for connection " + sce);      
          }
       }
-
-      for (ConnectionFactoryCallbackInformation cfInfo: cfCallbackInfo.values())
-      {
-         ServerInvokerCallbackHandler[] handlers = cfInfo.getAllHandlers(jmsClientID);
-         for (ServerInvokerCallbackHandler handler: handlers)
+      
+      Map<String, ServerInvokerCallbackHandler> handlers = cfHandlers.remove(jmsClientID);
+            
+      if (handlers != null)
+      {      	
+      	Map<String, ServerInvokerCallbackHandler> handlersClone = new HashMap<String, ServerInvokerCallbackHandler>(handlers);
+      	      	
+         for (Map.Entry<String, ServerInvokerCallbackHandler> entry: handlersClone.entrySet())
          {
             try
             {
-               handler.getCallbackClient().disconnect();
+               entry.getValue().getCallbackClient().disconnect();
             }
-            catch (Throwable e)
+            catch (Throwable ignore)
             {
-               log.warn (e, e);
             }
 
             try
             {
-               handler.destroy();
+               entry.getValue().destroy();
             }
-            catch (Throwable e)
+            catch (Throwable ignore)
             {
-               log.warn (e, e);
+            }           
+            
+            for (ServerConnectionFactoryEndpoint ep: connectionFactories)
+            {
+            	ep.removeCallbackhandler(entry.getValue());
             }
-
-            cfInfo.removeHandler(jmsClientID, handler);
+            
+            unregisterConnectionFactoryCallback(jmsClientID, entry.getKey());
          }
-
       }
-
    }
-
-   // Inner classes --------------------------------------------------------------------------------
-
-   /** Class used to organize Callbacks on ClusteredConnectionFactories */
-   static class ConnectionFactoryCallbackInformation
-   {
-
-      // We keep two lists, one containing all clients a CF will have to maintain and another
-      //   organized by JVMId as we will need that organization when cleaning up dead clients
-      String uniqueName;
-      Map</**VMID */ String , /** Active clients*/ConcurrentHashSet<ServerInvokerCallbackHandler>> clientHandlersByVM;
-      ConcurrentHashSet<ServerInvokerCallbackHandler> clientHandlers;
-
-
-      public ConnectionFactoryCallbackInformation(String uniqueName)
-      {
-         this.uniqueName = uniqueName;
-         this.clientHandlersByVM = new ConcurrentHashMap<String, ConcurrentHashSet<ServerInvokerCallbackHandler>>();
-         this.clientHandlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
-      }
-
-      public void addClient(String vmID, ServerInvokerCallbackHandler handler)
-      {
-         clientHandlers.add(handler);
-         getHandlersList(vmID).add(handler);
-      }
-
-      public ServerInvokerCallbackHandler[] getAllHandlers(String vmID)
-      {
-         Set<ServerInvokerCallbackHandler> list = getHandlersList(vmID);
-         ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[list.size()];
-         return (ServerInvokerCallbackHandler[])list.toArray(array);
-      }
-
-      public ServerInvokerCallbackHandler[] getAllHandlers()
-      {
-         ServerInvokerCallbackHandler[] array = new ServerInvokerCallbackHandler[clientHandlers.size()];
-         return (ServerInvokerCallbackHandler[])clientHandlers.toArray(array);
-      }
-
-      public void removeHandler(String vmID, ServerInvokerCallbackHandler handler)
-      {
-         clientHandlers.remove(handler);
-         getHandlersList(vmID).remove(handler);
-      }
-
-      private ConcurrentHashSet<ServerInvokerCallbackHandler> getHandlersList(String vmID)
-      {
-         ConcurrentHashSet<ServerInvokerCallbackHandler> perVMList = clientHandlersByVM.get(vmID);
-         if (perVMList == null)
+         
+   private void cleanupForSessionID(String jmsSessionID)
+   {      
+   	String jmsClientID = remotingSessions.get(jmsSessionID);
+   	
+      log.warn("A problem has been detected " +
+               "with the connection to remote client " +
+               jmsSessionID + ", jmsClientID=" + jmsClientID + ". It is possible the client has exited without closing " +
+            "its connection(s) or the network has failed. All associated connection resources will be cleaned up.");
+   	
+   	if (jmsClientID != null)
+   	{      	
+         Map<String, ConnectionEndpoint> endpoints = jmsClients.get(jmsClientID);
+   
+         if (endpoints != null)
          {
-            perVMList = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
-            clientHandlersByVM.put(vmID, perVMList);
-            perVMList = clientHandlersByVM.get(vmID);
+         	ConnectionEndpoint conn = null;
+         
+         	for (Map.Entry<String, ConnectionEndpoint> entry: endpoints.entrySet())
+         	{
+         		if (entry.getKey().equals(jmsSessionID))
+         		{   
+         			conn = entry.getValue();
+         			
+         			break;
+         		}
+         	}
+         	
+         	if (conn != null)
+         	{
+         		 // sce could also be a mock test.. so this test is required
+               if (conn instanceof ServerConnectionEndpoint)
+               {
+                  //Remoting is dumb and doesn't clean up it's state after itself - so we have to do it.
+                  ((ServerConnectionEndpoint)conn).closeCallbackClient();
+               }
+               
+               try
+               {
+               	conn.closing(-1);
+               }
+               catch (Throwable ignore)
+               {            	
+               }
+               try
+               {
+               	conn.close();
+               }
+               catch (Throwable ignore)
+               {            	
+               }
+               
+               return;
+         	}
          }
-         return perVMList;
-      }
+   	}
 
-   }
+      Map<String, ServerInvokerCallbackHandler> handlers = cfHandlers.get(jmsClientID);
+      
+      if (handlers != null)
+      {      	
+      	boolean found = false;
+      	
+      	for (Map.Entry<String, ServerInvokerCallbackHandler> entry: handlers.entrySet())
+      	{
+      		if (entry.getKey().equals(jmsSessionID))
+      		{      			      			
+               try
+               {
+                  entry.getValue().getCallbackClient().disconnect();
+               }
+               catch (Throwable ignore)
+               {
+               }
    
-   private void dump()
-   {
-   	log.debug("***********Dumping conn map");
-   	for (Iterator iter = jmsClients.entrySet().iterator(); iter.hasNext(); )
-   	{
-   		Map.Entry entry = (Map.Entry)iter.next();
-   		
-   		String jmsClientVMID = (String)entry.getKey();
-   		
-   		Map endpoints = (Map)entry.getValue();
-   		
-   		log.debug(jmsClientVMID + "----->");
-   		
-   		for (Iterator iter2 = endpoints.entrySet().iterator(); iter2.hasNext(); )
+               try
+               {
+                  entry.getValue().destroy();
+               }
+               catch (Throwable ignore)
+               {
+               }           
+               
+               for (ServerConnectionFactoryEndpoint ep: connectionFactories)
+               {
+               	ep.removeCallbackhandler(entry.getValue());
+               }
+               
+               found = true;
+               
+               break;
+      		}
+         }
+      	
+      	if (found)
       	{
-   			Map.Entry entry2 = (Map.Entry)iter2.next();
-   			
-   			String sessionID = (String)entry2.getKey();
-   			
-   			ConnectionEndpoint endpoint = (ConnectionEndpoint)entry2.getValue();
-   			
-   			log.debug("            " + sessionID + "------>" + System.identityHashCode(endpoint));
+      		unregisterConnectionFactoryCallback(jmsClientID, jmsSessionID);
       	}
-   	}
-   	log.debug("*** Dumped conn map");
+      }
+
    }
 
+   // Inner classes --------------------------------------------------------------------------------
+
 }

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2008-03-27 17:51:04 UTC (rev 3964)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionFactoryEndpoint.java	2008-03-27 18:01:52 UTC (rev 3965)
@@ -22,6 +22,7 @@
 package org.jboss.jms.server.endpoint;
 
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.JMSException;
 
@@ -37,9 +38,10 @@
 import org.jboss.jms.wireformat.ConnectionFactoryUpdate;
 import org.jboss.jms.wireformat.Dispatcher;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.util.ConcurrentHashSet;
 import org.jboss.messaging.util.ExceptionUtil;
 import org.jboss.remoting.callback.Callback;
-import org.jboss.remoting.callback.ServerInvokerCallbackHandler; 
+import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 /**
  * Concrete implementation of ConnectionFactoryEndpoint
@@ -86,12 +88,13 @@
 
    /** Cluster Topology on ClusteredConnectionFactories
        Information to failover to other connections on clients **/
-   ClientConnectionFactoryDelegate[] delegates;
+   private ClientConnectionFactoryDelegate[] delegates;
 
    /** Cluster Topology on ClusteredConnectionFactories
        Information to failover to other connections on clients **/
-   Map failoverMap;
+   private Map failoverMap;
 
+   private Set<ServerInvokerCallbackHandler> handlers = new ConcurrentHashSet<ServerInvokerCallbackHandler>();
    
 
    // Constructors ---------------------------------------------------------------------------------
@@ -286,21 +289,32 @@
                            ServerInvokerCallbackHandler callbackHandler) throws JMSException
    {
       log.debug("Adding callbackHandler on ConnectionFactory");
-      serverPeer.getConnectionManager().addConnectionFactoryCallback(this.uniqueName, VMID, remotingSessionID, callbackHandler);
+      
+      handlers.add(callbackHandler);
+      
+      serverPeer.getConnectionManager().registerConnectionFactoryCallback(VMID, remotingSessionID, callbackHandler);
    }
 
    public void removeCallback(String VMID, String remotingSessionID,
-                           ServerInvokerCallbackHandler callbackHandler) throws JMSException
+                              ServerInvokerCallbackHandler callbackHandler) throws JMSException
    {
       log.debug("Removing callbackHandler on ConnectionFactory");
-      serverPeer.getConnectionManager().removeConnectionFactoryCallback(this.uniqueName, VMID, callbackHandler);
+      
+      handlers.remove(callbackHandler);
+      
+      serverPeer.getConnectionManager().unregisterConnectionFactoryCallback(VMID, remotingSessionID);
    }
 
    public TopologyResult getTopology() throws JMSException
    {
       return new TopologyResult(uniqueName, delegates, failoverMap);
    }
-
+         
+   public void removeCallbackhandler(ServerInvokerCallbackHandler handler)
+   {
+   	handlers.remove(handler);
+   }
+   
    // Public ---------------------------------------------------------------------------------------
    
    public String getID()
@@ -328,16 +342,15 @@
       throws Exception
    {
       updateTopology(delegates, failoverMap);
+ 
+      log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + handlers.size());
 
-      ServerInvokerCallbackHandler[] clientFactoriesToUpdate = serverPeer.getConnectionManager().getConnectionFactoryCallback(this.uniqueName);
-      log.debug("updateClusteredClients being called!!! clientFactoriesToUpdate.size = " + clientFactoriesToUpdate.length);
-
       ConnectionFactoryUpdate message =
          new ConnectionFactoryUpdate(uniqueName, delegates, failoverMap);
 
       Callback callback = new Callback(message);
 
-      for (ServerInvokerCallbackHandler o: clientFactoriesToUpdate)
+      for (ServerInvokerCallbackHandler o: handlers)
       {
          log.debug("Updating CF on callback " + o);
          o.handleCallbackOneway(callback);

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java	2008-03-27 17:51:04 UTC (rev 3964)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredClientCrashTest.java	2008-03-27 18:01:52 UTC (rev 3965)
@@ -102,13 +102,13 @@
 
       assertEquals("OK", remoteServer.executeCommand(command));
 
-      assertEquals(new Integer(1),ServerManagement.getServer(1).executeCommand(new VerifySizeOfCFClients(cfDelegate.getUniqueName())));
+      //assertEquals(new Integer(1),ServerManagement.getServer(1).executeCommand(new VerifySizeOfCFClients(cfDelegate.getUniqueName())));
 
       ServerManagement.kill(2);
       Thread.sleep((long)(60000));
 
-      assertEquals(new Integer(0), ServerManagement.getServer(0).executeCommand(new VerifySizeOfCFClients(cfDelegate.getUniqueName())));
-      assertEquals(new Integer(0), ServerManagement.getServer(1).executeCommand(new VerifySizeOfCFClients(cfDelegate.getUniqueName())));
+     // assertEquals(new Integer(0), ServerManagement.getServer(0).executeCommand(new VerifySizeOfCFClients(cfDelegate.getUniqueName())));
+     // assertEquals(new Integer(0), ServerManagement.getServer(1).executeCommand(new VerifySizeOfCFClients(cfDelegate.getUniqueName())));
    }
 
 
@@ -140,23 +140,23 @@
       }
    }
 
-   public static class VerifySizeOfCFClients implements Command
-   {
+//   public static class VerifySizeOfCFClients implements Command
+//   {
+//
+//      String uniqueName;
+//
+//      public VerifySizeOfCFClients(String uniqueName)
+//      {
+//         this.uniqueName = uniqueName;
+//      }
+//
+//      public Object execute(Server server) throws Exception
+//      {
+//
+//         int size = server.getServerPeer().getConnectionManager().getConnectionFactoryCallback(uniqueName).length;
+//
+//         return new Integer(size);
+//      }
+//   }
 
-      String uniqueName;
-
-      public VerifySizeOfCFClients(String uniqueName)
-      {
-         this.uniqueName = uniqueName;
-      }
-
-      public Object execute(Server server) throws Exception
-      {
-
-         int size = server.getServerPeer().getConnectionManager().getConnectionFactoryCallback(uniqueName).length;
-
-         return new Integer(size);
-      }
-   }
-
 }

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2008-03-27 17:51:04 UTC (rev 3964)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/server/connectionmanager/SimpleConnectionManagerTest.java	2008-03-27 18:01:52 UTC (rev 3965)
@@ -106,14 +106,19 @@
 	       
 	      entry = (Map.Entry)iter.next();
 	      
+	      String sessId2 = (String)entry.getKey();
+	      
 	      //Simulate failure of connection
 	      
-	      cm.handleClientFailure(sessId1, true);
+	      cm.handleClientFailure(sessId1);
+  
+	      jmsClients = cm.getClients();
+	      assertEquals(1, jmsClients.size());        
 	      
-	      //both connections should be shut
+	      cm.handleClientFailure(sessId2);
 	      
 	      jmsClients = cm.getClients();
-	      assertEquals(0, jmsClients.size());        
+	      assertEquals(0, jmsClients.size());   
       }
       finally
       {
@@ -175,23 +180,24 @@
 
       assertFalse(cm.containsRemotingSession("sessionid5"));
 
-      cm.handleClientFailure("sessionid4", true);
+      cm.handleClientFailure("sessionid4");
 
       assertNull(cm.unregisterConnection("jvm2", "sessionid4"));
-      assertNull(cm.unregisterConnection("jvm2", "sessionid3"));
+      ConnectionEndpoint r3 = cm.unregisterConnection("jvm2", "sessionid3");
+      assertEquals(e3, r3);
+      assertFalse(e3.isClosed());
 
       assertFalse(cm.containsRemotingSession("sessionid4"));
       assertFalse(cm.containsRemotingSession("sessionid3"));
 
-      assertTrue(e3.isClosed());
       assertTrue(e4.isClosed());
 
-      ConnectionEndpoint r3 = cm.unregisterConnection("jvm1", "sessionid1");
-      assertEquals(e1, r3);
+      ConnectionEndpoint r4 = cm.unregisterConnection("jvm1", "sessionid1");
+      assertEquals(e1, r4);
       assertFalse(e1.isClosed());
 
-      ConnectionEndpoint r4 = cm.unregisterConnection("jvm1", "sessionid2");
-      assertEquals(e2, r4);
+      ConnectionEndpoint r5 = cm.unregisterConnection("jvm1", "sessionid2");
+      assertEquals(e2, r5);
       assertFalse(e2.isClosed());
 
       assertFalse(cm.containsRemotingSession("sessionid2"));




More information about the jboss-cvs-commits mailing list