[jboss-cvs] JBoss Messaging SVN: r1771 - in trunk: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx tests tests/src/org/jboss/test/messaging/jms tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 12 13:26:15 EST 2006


Author: timfox
Date: 2006-12-12 13:25:55 -0500 (Tue, 12 Dec 2006)
New Revision: 1771

Removed:
   trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
Modified:
   trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
   trunk/src/main/org/jboss/jms/client/JBossSession.java
   trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
   trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/BrowserState.java
   trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
   trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
   trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
   trunk/src/main/org/jboss/jms/client/state/ProducerState.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
   trunk/src/main/org/jboss/jms/tx/ResourceManager.java
   trunk/src/main/org/jboss/jms/tx/TxState.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
More HA stuff



Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -143,7 +143,7 @@
          // call pre or postDeliver so messages won't be acked, or stored in session/tx
          sess = conn.createSessionDelegate(false, Session.CLIENT_ACKNOWLEDGE, false);
 
-         cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true);
+         cons = sess.createConsumerDelegate(dest, messageSelector, false, subName, true, -1);
       }
       finally
       {

Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -252,7 +252,7 @@
          tccc.set(getClass().getClassLoader());
 
          ConsumerDelegate consumerDelegate = delegate.
-            createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false);
+            createConsumerDelegate((JBossDestination)d, messageSelector, noLocal, null, false, -1);
          
          return new JBossMessageConsumer(consumerDelegate);
       }
@@ -305,7 +305,7 @@
          tccc.set(getClass().getClassLoader());
 
          ConsumerDelegate consumerDelegate =
-            delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false);
+            delegate.createConsumerDelegate((JBossTopic)topic, null, false, name, false, -1);
 
          return new JBossMessageConsumer(consumerDelegate);
       }
@@ -339,7 +339,7 @@
          messageSelector = null;
       }
       ConsumerDelegate consumerDelegate =
-         delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false);
+         delegate.createConsumerDelegate((JBossTopic)topic, messageSelector, noLocal, name, false, -1);
       return new JBossMessageConsumer(consumerDelegate);
    }
 

Modified: trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/container/ConnectionAspect.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -30,9 +30,7 @@
 import org.jboss.jms.client.JBossConnectionMetaData;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.state.ConnectionState;
-import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
-import org.jboss.jms.tx.ResourceManagerFactory;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
@@ -200,10 +198,7 @@
       
       // Finished with the connection - we need to shutdown callback server
       state.getRemotingConnection().stop();
-      
-      // Remove reference to resource manager
-      ResourceManagerFactory.instance.checkInResourceManager(state.getServerID());
-      
+       
       // Remove reference to message id generator
       MessageIdGeneratorFactory.instance.checkInGenerator(state.getServerID());
       

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -23,9 +23,11 @@
 package org.jboss.jms.client.container;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.JMSException;
 
@@ -50,6 +52,7 @@
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
 import org.jboss.jms.tx.AckInfo;
+import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
@@ -77,6 +80,7 @@
    //Cache this here
    private ClientConnectionFactoryDelegate[] delegates;
    
+   //Cache this here
    private Map failoverMap;
    
    private int currentRobinIndex;
@@ -125,6 +129,7 @@
       {
          currentRobinIndex = 0;
       }
+      
       return currentDelegate;
    }
    
@@ -136,7 +141,6 @@
 
          MethodInvocation methodInvoke = (MethodInvocation)invocation;
 
-         // TODO: FIX THIS! metaData should contain CF_DELEGATES
          Object target = methodInvoke.getTargetObject();
          
          if (target instanceof ClusteredClientConnectionFactoryDelegate)
@@ -146,8 +150,6 @@
 
          if (delegates != null)
          {
-            //TODO: Fix this! metadata should contain CF_FAILOVER_INDEXES
-            //failoverIndexes = (int[])metaData.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CF_FAILOVER_INDEXES);
             failoverMap = ((ClusteredClientConnectionFactoryDelegate)target).getFailoverMap();
 
             if (failoverMap == null)
@@ -167,15 +169,14 @@
       
       CreateConnectionResult res = (CreateConnectionResult)cf.createConnectionDelegate(username, password, -1);
       
-      ClientConnectionDelegate connDelegate =
-         (ClientConnectionDelegate)res.getDelegate();
+      ClientConnectionDelegate connDelegate = (ClientConnectionDelegate)res.getDelegate();
       
-      initialiseConnection(connDelegate);
+      addListener(connDelegate);
       
       return connDelegate;   
    }
    
-   private void initialiseConnection(ClientConnectionDelegate connDelegate)
+   private void addListener(ClientConnectionDelegate connDelegate)
    {
       //Add a connection listener
       
@@ -186,20 +187,19 @@
       state.getRemotingConnection().getInvokingClient().addConnectionListener(listener);
    }
 
-   
    //The connection has failed
    private void handleFailure(ClientConnectionDelegate failedConnection) throws Exception
    {
       log.info("Handling failure");
       
+      //Get the connection factory we are going to failover onto
       ClientConnectionFactoryDelegate newCF = getFailoverDelegate(failedConnection);
-
-      //TODO implement client side valve to prevent invocations occurring whilst failover is occurring
-      
+  
       ConnectionState state = (ConnectionState)((DelegateSupport)failedConnection).getState();
       
       log.info("calling createFailoverConnectionDelegate");
       
+      //Create a connection using that connection factory
       CreateConnectionResult res =
          newCF.createConnectionDelegate(state.getUser(), state.getPassword(), state.getServerID());
       
@@ -209,12 +209,12 @@
       {
          log.info("Got connection");
          
+         //We got the right server and created a new connection ok
+         
          ClientConnectionDelegate newConnection = (ClientConnectionDelegate)res.getDelegate();
          
          log.info("newconnection is " + newConnection);
          
-         //We got the right server and created a new connection
-         
          failover(failedConnection, newConnection);
       }
       else
@@ -289,71 +289,54 @@
 
       ConnectionState failedState = (ConnectionState)failedConnection.getState();
 
-      int oldServerID = failedState.getServerID();
-
       ConnectionState newState = (ConnectionState)newConnection.getState();
       
-      log.info("new state is: " + newState);
-
-      failedState.copy(newState);
-
-      // this is necessary so the connection will start "talking" to the new server instead
-      failedState.setRemotingConnection(newState.getRemotingConnection());
-      
       if (failedState.getClientID() != null)
       {
          newConnection.setClientID(failedState.getClientID());
       }
 
-      // Transfering state from newDelegate to currentDelegate
-      failedConnection.copyState(newConnection);
+      // Transfer attributes from newDelegate to failedDelegate
+      failedConnection.copyAttributes(newConnection);
       
+      int oldServerId = failedState.getServerID();
+      
+      CallbackManager oldCallbackManager = failedState.getRemotingConnection().getCallbackManager();
+      
+      //We need to update some of the attributes on the state
+      failedState.copyState(newState);
+      
       log.info("failing over children");
 
       for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
       {
          SessionState failedSessionState = (SessionState)i.next();
 
-         log.info("Creating session");
-         
+         ClientSessionDelegate failedSessionDelegate =
+            (ClientSessionDelegate)failedSessionState.getDelegate();
+                  
          ClientSessionDelegate newSessionDelegate = (ClientSessionDelegate)newConnection.
             createSessionDelegate(failedSessionState.isTransacted(),
                                   failedSessionState.getAcknowledgeMode(),
                                   failedSessionState.isXA());
          
-         log.info("Created session");
-                  
-         ClientSessionDelegate failedSessionDelegate =
-            (ClientSessionDelegate)failedSessionState.getDelegate();
+         SessionState newSessionState = (SessionState)newSessionDelegate.getState();
 
-         failedSessionDelegate.copyState(newSessionDelegate);
+         failedSessionDelegate.copyAttributes(newSessionDelegate);
          
-         log.info("copied state");
+         //We need to update some of the attributes on the state
+         newSessionState.copyState(newSessionState);                                  
          
-         //Now we remove any unacked np messages - this is because we don't want to ack them
-         //since the server won't know about them and will barf
-         Iterator iter = failedSessionState.getToAck().iterator();
+         if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
          
-         while (iter.hasNext())
-         {
-            AckInfo info = (AckInfo)iter.next();
-            
-            if (!info.getMessage().getMessage().isReliable())
-            {
-               iter.remove();
-            }            
-         }
+         List children = new ArrayList();
          
-         //TODO remove any unacked from the resource manager
-
-         if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
-
-         //TODO Clebert please add comment as to why this clone is necessary
-         //In general, please comment more - there is a serious lack of comments!!
-         List children = new ArrayList();
+         // TODO Why is this clone necessary?
          children.addAll(failedSessionState.getChildren());
+         
+         Set consumerIds = new HashSet();
 
-         for(Iterator j = children.iterator(); j.hasNext(); )
+         for (Iterator j = children.iterator(); j.hasNext(); )
          {
             HierarchicalStateSupport sessionChild = (HierarchicalStateSupport)j.next();
 
@@ -362,45 +345,83 @@
                handleFailoverOnProducer((ProducerState)sessionChild, newSessionDelegate);
             }
             else if (sessionChild instanceof ConsumerState)
-            {
+            {               
                handleFailoverOnConsumer(failedConnection,
                                         failedState,
                                         failedSessionState,
                                         (ConsumerState)sessionChild,
                                         failedSessionDelegate,
-                                        oldServerID);
+                                        oldServerId,
+                                        oldCallbackManager);       
+               
+               // We add the new consumer id to the list of old ids
+               consumerIds.add(new Integer(((ConsumerState)sessionChild).getConsumerID()));
+               
             }
             else if (sessionChild instanceof BrowserState)
             {
                 handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
             }
          }
-         
+                           
          /* Now we must sent the list of unacked AckInfos to the server - so the consumers
           * delivery lists can be repopulated
           */
          List ackInfos = null;
          
-         if (!failedSessionState.isTransacted())
+         if (!failedSessionState.isTransacted() && !failedSessionState.isXA())
          {
+            /*
+            Now we remove any unacked np messages - this is because we don't want to ack them
+            since the server won't know about them and will barf
+            */
+            
+            Iterator iter = newSessionState.getToAck().iterator();
+            
+            while (iter.hasNext())
+            {
+               AckInfo info = (AckInfo)iter.next();
+               
+               if (!info.getMessage().getMessage().isReliable())
+               {
+                  iter.remove();
+               }            
+            }
+            
             //Get the ack infos from the list in the session state
             ackInfos = failedSessionState.getToAck();
          }
          else
-         {
-            //Transacted session - we need to get the acks
-            //TODO
+         {            
+            //Transacted session - we need to get the acks from the resource manager
+            //btw we have kept the old resource manager
+            ResourceManager rm = failedState.getResourceManager();
+            
+            // Remove any non persistent acks - so server doesn't barf on commit
+            
+            rm.removeNonPersistentAcks(consumerIds);
+            
+            ackInfos = rm.getAckInfosForConsumerIds(consumerIds);            
          }
          
-         //TODO for a transacted session the ackinfos will be in the resource manager!!
-         
          if (!ackInfos.isEmpty())
          {
+            log.info("Sending " + ackInfos.size() + " unacked");
             newSessionDelegate.sendUnackedAckInfos(ackInfos);
-         }
-                  
+         }                 
       }
             
+//      problem - what if the consumer has closed - but there are still acks in the session or rm?
+//               
+//      we still need to replace them but with what?
+//               
+//      in this case we can't recreate a consumer on the server since it has closed
+//      
+//      solution here is to store by session id - major reworking!!!!!!!!
+      
+      
+     // todo need to replace consumer id
+      
       //We must not start the connection until the end
       if (failedState.isStarted())
       {
@@ -410,14 +431,13 @@
       log.info("Failover done");
    }
    
-   
-
-   private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
+   private void handleFailoverOnConsumer(ClientConnectionDelegate failedConnectionDelegate,
                                          ConnectionState failedConnectionState,
                                          SessionState failedSessionState,
                                          ConsumerState failedConsumerState,
                                          ClientSessionDelegate failedSessionDelegate,
-                                         int oldServerID)
+                                         int oldServerID,
+                                         CallbackManager oldCallbackManager)
       throws JMSException
    {
       log.info("Failing over consumer");
@@ -428,48 +448,63 @@
       if (trace) { log.trace("handleFailoverOnConsumer: creating alternate consumer"); }
 
       ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
-         failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
-                           failedConsumerState.getSelector(),
-                           failedConsumerState.isNoLocal(),
-                           failedConsumerState.getSubscriptionName(),
-                           failedConsumerState.isConnectionConsumer(),
-                           failedConsumerDelegate.getChannelId());
+         createConsumerDelegate((JBossDestination)failedConsumerState.getDestination(),
+                                 failedConsumerState.getSelector(),
+                                 failedConsumerState.isNoLocal(),
+                                 failedConsumerState.getSubscriptionName(),
+                                 failedConsumerState.isConnectionConsumer(),
+                                 failedConsumerState.getChannelId());
 
       if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
+            
+      //Copy the attributes from the new consumer to the old consumer
+      failedConsumerDelegate.copyAttributes(newConsumerDelegate);
 
-      failedConsumerDelegate.copyState(newConsumerDelegate);
-
+      ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
+      
       int oldConsumerID = failedConsumerState.getConsumerID();
+      
+      //Update attributes on the old state
+      failedConsumerState.copyState(newState);
 
-      ConsumerState newState = (ConsumerState)newConsumerDelegate.getState();
-      failedConsumerState.copy(newState);
-
-      if (failedSessionState.isTransacted())
+      if (failedSessionState.isTransacted() || failedSessionState.isXA())
       {
          //Replace the old consumer id with the new consumer id
          
-         //TODO what about XA?? - may have done work in many transactions - so need to replace all
+         ResourceManager rm = failedConnectionState.getResourceManager();
          
-         failedConnectionState.getResourceManager().
-            handleFailover(failedSessionState.getCurrentTxId(),
-                           oldConsumerID,
-                           failedConsumerState.getConsumerID());
+         rm.handleFailover(oldConsumerID, failedConsumerState.getConsumerID());
       }
-
-      CallbackManager cm = failedConnectionState.getRemotingConnection().getCallbackManager();
-
-      MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
-      handler.setConsumerId(failedConsumerState.getConsumerID());
+            
+      //We need to re-use the existing message callback handler
+            
+      log.info("Old server id:" + oldServerID + " old consumer id:" + oldConsumerID);
+      MessageCallbackHandler oldHandler = oldCallbackManager.unregisterHandler(oldServerID, oldConsumerID);
       
-      //Clear the buffer of the handler
-      handler.clearBuffer();
-
-      cm.registerHandler(failedConnectionState.getServerID(),
-                         failedConsumerState.getConsumerID(),
-                         handler);
+      ConnectionState newConnectionState = (ConnectionState)failedConnectionDelegate.getState();
       
-      failedSessionState.addCallbackHandler(handler);
+      CallbackManager newCallbackManager = newConnectionState.getRemotingConnection().getCallbackManager();
       
+      log.info("New server id:" + newConnectionState.getServerID() + " new consuer id:" + newState.getConsumerID());
+      
+      //Remove the new handler
+      MessageCallbackHandler newHandler = newCallbackManager.unregisterHandler(newConnectionState.getServerID(),
+                                                                               newState.getConsumerID());
+      
+      log.info("New handler is " + System.identityHashCode(newHandler));
+      
+      //But we need to update some fields from the new one
+      oldHandler.copyState(newHandler);
+      
+      //Now we re-register the old handler with the new callback manager
+            
+      newCallbackManager.registerHandler(newConnectionState.getServerID(),
+                                         newState.getConsumerID(),
+                                         oldHandler);
+      
+      //We don't need to add the handler to the session state since it is already there - we
+      //are re-using the old handler
+      
       log.info("failed over consumer");
    }
    
@@ -484,8 +519,10 @@
       ClientProducerDelegate failedProducerDelegate =
          (ClientProducerDelegate)failedProducerState.getDelegate();
 
-      failedProducerDelegate.copyState(newProducerDelegate);
-
+      failedProducerDelegate.copyAttributes(newProducerDelegate);
+      
+      failedProducerState.copyState((ProducerState)newProducerDelegate.getState());
+      
       if (trace) { log.trace("handling fail over on producerDelegate " + failedProducerDelegate + " destination=" + failedProducerState.getDestination()); }
    }
 
@@ -499,8 +536,10 @@
       ClientBrowserDelegate failedBrowserDelegate =
          (ClientBrowserDelegate)failedBrowserState.getDelegate();
 
-      failedBrowserDelegate.copyState(newBrowserDelegate);
-
+      failedBrowserDelegate.copyAttributes(newBrowserDelegate);
+      
+      failedBrowserState.copyState((BrowserState)newBrowserDelegate.getState());
+      
       if (trace) { log.trace("handling fail over on browserDelegate " + failedBrowserDelegate + " destination=" + failedBrowserState.getJmsDestination() + " selector=" + failedBrowserState.getMessageSelector()); }
 
    }

Modified: trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -23,11 +23,10 @@
 
 import javax.jms.Destination;
 
-import org.jboss.aop.Advised;
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
-import org.jboss.aop.metadata.SimpleMetaData;
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
+import org.jboss.jms.client.delegate.ClientConsumerDelegate;
 import org.jboss.jms.client.delegate.ClientProducerDelegate;
 import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -39,7 +38,6 @@
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.delegate.BrowserDelegate;
 import org.jboss.jms.delegate.ConnectionFactoryDelegate;
-import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.delegate.ProducerDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
 import org.jboss.jms.destination.JBossDestination;
@@ -47,9 +45,6 @@
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.jms.server.Version;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
-import org.jboss.jms.server.remoting.MetaDataConstants;
-import org.jboss.jms.tx.ResourceManager;
-import org.jboss.jms.tx.ResourceManagerFactory;
 
 /**
  * Maintains the hierarchy of parent and child state objects. For each delegate, this interceptor
@@ -89,37 +84,29 @@
       ClientConnectionDelegate connectionDelegate = (ClientConnectionDelegate)res.getDelegate();
       
       if (connectionDelegate != null)
-      {
-         
+      {         
          connectionDelegate.init();
          
-         SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
+         //SimpleMetaData md = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
          
-         int serverID =
-            ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID)).intValue();
+         int serverID = connectionDelegate.getServerId();
          
-         Version connectionVersion = 
-            (Version)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION);
+         Version versionToUse = connectionDelegate.getVersionToUse();
          
-         JMSRemotingConnection connection = 
-            (JMSRemotingConnection)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION);
+         JMSRemotingConnection connection = connectionDelegate.getRemotingConnection();
          
-         if (connectionVersion == null)
+         if (versionToUse == null)
          {
             throw new IllegalStateException("Connection version is null");
          }
          
-         // We have one resource manager per unique server
-         ResourceManager rm = ResourceManagerFactory.instance.checkOutResourceManager(serverID);
-         
          //We have one message id generator per unique server
          MessageIdGenerator gen =
             MessageIdGeneratorFactory.instance.checkOutGenerator(serverID, cfd);
          
          ConnectionState connectionState =
             new ConnectionState(serverID, connectionDelegate,
-                     connection,
-                     connectionVersion, rm, gen);
+                                connection, versionToUse, gen);
          
          connectionDelegate.setState(connectionState);
       }
@@ -150,7 +137,7 @@
    
    public Object handleCreateConsumerDelegate(Invocation invocation) throws Throwable
    {
-      ConsumerDelegate consumerDelegate = (ConsumerDelegate)invocation.invokeNext();
+      ClientConsumerDelegate consumerDelegate = (ClientConsumerDelegate)invocation.invokeNext();
       DelegateSupport delegate = (DelegateSupport)consumerDelegate;
       
       delegate.init();
@@ -164,21 +151,18 @@
       String subscriptionName = (String)mi.getArguments()[3];
       boolean connectionConsumer = ((Boolean)mi.getArguments()[4]).booleanValue();
 
-      SimpleMetaData md = ((Advised)consumerDelegate)._getInstanceAdvisor().getMetaData();
+      int consumerID = consumerDelegate.getID();
       
-      int consumerID =
-         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID)).intValue();
+      int prefetchSize = consumerDelegate.getPrefetchSize();
       
-      int prefetchSize =
-         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE)).intValue();
+      int maxDeliveries = consumerDelegate.getMaxDeliveries();
       
-      int maxDeliveries = 
-         ((Integer)md.getMetaData(MetaDataConstants.JMS, MetaDataConstants.MAX_DELIVERIES)).intValue();
+      long channelId = consumerDelegate.getChannelId();
       
       ConsumerState consumerState =
          new ConsumerState(sessionState, consumerDelegate, dest, selector, noLocal,
                            subscriptionName, consumerID, connectionConsumer, prefetchSize,
-                           maxDeliveries);
+                           maxDeliveries, channelId);
 
       delegate.setState(consumerState);
       return consumerDelegate;

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -28,13 +28,12 @@
 import javax.jms.ServerSessionPool;
 import javax.transaction.xa.Xid;
 
-import org.jboss.aop.util.PayloadKey;
 import org.jboss.jms.client.JBossConnectionConsumer;
 import org.jboss.jms.client.remoting.JMSRemotingConnection;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.delegate.ConnectionDelegate;
 import org.jboss.jms.delegate.SessionDelegate;
-import org.jboss.jms.server.remoting.MetaDataConstants;
+import org.jboss.jms.server.Version;
 import org.jboss.jms.tx.TransactionRequest;
 import org.jboss.remoting.Client;
 
@@ -57,11 +56,12 @@
 
    // Attributes ----------------------------------------------------
 
-   // This should not be exposed other than through meta data
    private int serverId;
 
    private transient JMSRemotingConnection remotingConnection;
    
+   private Version versionToUse;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -223,8 +223,6 @@
    public void init()
    {
       super.init();
-      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.SERVER_ID,
-                                new Integer(serverId), PayloadKey.TRANSIENT);
    }
 
    public void setRemotingConnection(JMSRemotingConnection conn)
@@ -236,8 +234,32 @@
    {
       return remotingConnection;
    }
+   
+   public int getServerId()
+   {
+      return serverId;
+   }
+   
+   public Version getVersionToUse()
+   {
+      return versionToUse;
+   }
+   
+   public void setVersionToUse(Version versionToUse)
+   {
+      this.versionToUse = versionToUse;
+   }
+   
+   public void copyAttributes(DelegateSupport newDelegate)
+   {
+      super.copyAttributes(newDelegate);
+      
+      this.remotingConnection = ((ClientConnectionDelegate)newDelegate).getRemotingConnection();
+      
+      this.versionToUse = ((ClientConnectionDelegate)newDelegate).getVersionToUse();
+   }
 
-   // Protected -----------------------------------------------------
+   // Protected -----------------------------------------------------   
 
    protected Client getClient()
    {

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -26,7 +26,6 @@
 
 import javax.jms.JMSException;
 
-import org.jboss.aop.Advised;
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.Invocation;
 import org.jboss.aop.joinpoint.MethodInvocation;
@@ -68,13 +67,14 @@
    // Attributes ----------------------------------------------------
 
    //This data is needed in order to create a connection
-   protected String serverLocatorURI;
-   protected Version serverVersion;
-
-   // This property is used on redirect on failover logic (verify if a new delegate could be used during a failover)
-   protected int serverId;
-   protected boolean clientPing;
+   private String serverLocatorURI;
+ 
+   private Version serverVersion;
+ 
+   private int serverId;
    
+   private boolean clientPing;
+   
    private transient boolean trace;
 
    // Static --------------------------------------------------------
@@ -271,18 +271,9 @@
          
          if (connectionDelegate != null)
          {
-            //We set the version for the connection and the remoting connection on the meta-data
-            //this is so the StateCreationAspect can pick it up
-   
-            SimpleMetaData metaData = ((Advised)connectionDelegate)._getInstanceAdvisor().getMetaData();
-   
-            metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.REMOTING_CONNECTION,
-                                 remotingConnection, PayloadKey.TRANSIENT);
-   
-            metaData.addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONNECTION_VERSION,
-                                 version, PayloadKey.TRANSIENT);
-
             connectionDelegate.setRemotingConnection(remotingConnection);
+            
+            connectionDelegate.setVersionToUse(version);
          }
          else
          {
@@ -296,7 +287,6 @@
             {
             }
          }
-
       }
 
       return ret;
@@ -307,7 +297,6 @@
       return "ClientConnectionFactoryDelegate[" + id + "]";
    }
    
-   //This MUST ONLY be used in testing
    public String getServerLocatorURI()
    {
       return serverLocatorURI;
@@ -317,9 +306,26 @@
    {
       return serverId;
    }
+   
+   public boolean getClientPing()
+   {
+      return clientPing;
+   }
+   
+   public Version getServerVersion()
+   {
+      return serverVersion;
+   }
+   
+   public void copyAttributes(DelegateSupport newDelegate)
+   {
+      super.copyAttributes(newDelegate);
+   }
 
    // Protected -----------------------------------------------------
 
+   
+
    protected Client getClient()
    {
       return null;

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -25,11 +25,9 @@
 import javax.jms.Message;
 import javax.jms.MessageListener;
 
-import org.jboss.aop.util.PayloadKey;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.jms.delegate.ConsumerDelegate;
 import org.jboss.jms.destination.JBossDestination;
-import org.jboss.jms.server.remoting.MetaDataConstants;
 import org.jboss.remoting.Client;
 
 /**
@@ -50,11 +48,10 @@
 
    // Attributes ----------------------------------------------------
    
-   // This should not be exposed other than through meta data
    private int bufferSize;
-   protected int maxDeliveries;
+   
+   private int maxDeliveries;
 
-   // This should not be exposed other than through meta data
    private long channelId;
 
    // Static --------------------------------------------------------
@@ -73,11 +70,6 @@
    {      
    }
 
-   public long getChannelId()
-   {
-       return channelId;
-   }
-
    // ConsumerDelegate implementation -------------------------------
 
    /**
@@ -182,24 +174,41 @@
 
    // Public --------------------------------------------------------
 
-   public void init()
-   {
-      super.init();
-      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID,
-                                new Integer(id), PayloadKey.TRANSIENT);
-      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.PREFETCH_SIZE,
-                                new Integer(bufferSize), PayloadKey.TRANSIENT);
-      getMetaData().addMetaData(MetaDataConstants.JMS, MetaDataConstants.MAX_DELIVERIES,
-                                new Integer(maxDeliveries), PayloadKey.TRANSIENT);
-   }
-
    public String toString()
    {
       return "ConsumerDelegate[" + id + "](ChannelId=" + this.channelId+")" ;
    }
+   
+   public int getPrefetchSize()
+   {
+      return bufferSize;
+   }
+   
+   public int getMaxDeliveries()
+   {
+      return maxDeliveries;
+   }
+   
+   public long getChannelId()
+   {
+      return channelId;
+   }
+   
+   public void copyAttributes(DelegateSupport newDelegate)
+   {
+      super.copyAttributes(newDelegate);
+      
+      this.bufferSize = ((ClientConsumerDelegate)newDelegate).getPrefetchSize();
+      
+      this.maxDeliveries = ((ClientConsumerDelegate)newDelegate).getMaxDeliveries();
+      
+      this.channelId = ((ClientConsumerDelegate)newDelegate).getChannelId();
+   }
 
    // Protected -----------------------------------------------------
    
+
+
    protected Client getClient()
    {
       // Use the Client in the Connection's state
@@ -207,20 +216,7 @@
          getInvokingClient();
    }
 
-   public void copyState(DelegateSupport newDelegate)
-   {
-      super.copyState(newDelegate);
-      this.channelId = ((ClientConsumerDelegate)newDelegate).channelId;
-      this.getMetaData().removeMetaData(MetaDataConstants.JMS, MetaDataConstants.CONSUMER_ID);
-      this.getMetaData().addMetaData(MetaDataConstants.JMS,
-                                     MetaDataConstants.CONSUMER_ID,
-                                     newDelegate.getMetaData().
-                                        getMetaData(MetaDataConstants.JMS,
-                                                    MetaDataConstants.CONSUMER_ID),
-                                     PayloadKey.TRANSIENT);
-   }
 
-
    // Package Private -----------------------------------------------
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -187,25 +187,12 @@
     */
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
-                                                  boolean connectionConsumer) throws JMSException
+                                                  boolean connectionConsumer, long failoverChannelId) throws JMSException
    {
       throw new IllegalStateException("This invocation should not be handled here!");
    }
 
    /**
-    * @see org.jboss.jms.server.endpoint.ServerSessionEndpoint#failOverConsumer(org.jboss.jms.destination.JBossDestination, String, boolean, String, boolean, long, int) 
-    * @see org.jboss.jms.client.container.StateCreationAspect#handleCreateConsumerDelegate(org.jboss.aop.joinpoint.Invocation)
-    * */
-   public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
-                                            String selectorString,
-                                            boolean noLocal,  String subscriptionName,
-                                            boolean connectionConsumer,
-                                            long oldChannelID) throws JMSException
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
-   /**
     * This invocation should either be handled by the client-side interceptor chain or by the
     * server-side endpoint.
     */

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClusteredClientConnectionFactoryDelegate.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -73,8 +73,8 @@
                                                    ClientConnectionFactoryDelegate[] delegates,
                                                    Map failoverMap)
    {
-      this(mainDelegate.getID(), mainDelegate.serverId, mainDelegate.serverLocatorURI,
-         mainDelegate.serverVersion, mainDelegate.clientPing, delegates, failoverMap);
+      this(mainDelegate.getID(), mainDelegate.getServerId(), mainDelegate.getServerLocatorURI(),
+           mainDelegate.getServerVersion(), mainDelegate.getClientPing(), delegates, failoverMap);
    }
 
    // DelegateSupport overrides -------------------------------------
@@ -89,18 +89,7 @@
          {
             delegates[i].init();
          }
-      }
-      
-      //This doesn't seem to be used so I'm commenting it out
-
-//      // We add this to the meta data so the failOver aspect can get access to it
-//      getMetaData().addMetaData(MetaDataConstants.JMS,
-//                                MetaDataConstants.CF_DELEGATES,
-//                                delegates, PayloadKey.TRANSIENT);
-//
-//      getMetaData().addMetaData(MetaDataConstants.JMS,
-//                                MetaDataConstants.FAILOVER_MAP,
-//                                failoverMap, PayloadKey.TRANSIENT);
+      }      
    }
 
    // Public --------------------------------------------------------
@@ -112,7 +101,6 @@
       return delegates;
    }
 
-   /** TODO As metadata is not working, I'm exposing this temporarily */
    public Map getFailoverMap()
    {
       return failoverMap;

Modified: trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/delegate/DelegateSupport.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -152,7 +152,7 @@
      * has to be transfered to this actual object. For example, a Connection will have to assume the
      * ObjectID of the new connection endpoint and the new RemotingConnection.
      */
-    public void copyState(DelegateSupport newDelegate)
+    public void copyAttributes(DelegateSupport newDelegate)
     {
         id = newDelegate.getID();
     }

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -94,17 +94,13 @@
 
    public void registerHandler(int serverID, int consumerID, MessageCallbackHandler handler)
    {
-      log.info(this + " registeringHandler, serverID:" + serverID + " consumerID:" + consumerID);
-      
       Long lookup = computeLookup(serverID, consumerID);
 
       callbackHandlers.put(lookup, handler);
    }
 
    public MessageCallbackHandler unregisterHandler(int serverID, int consumerID)
-   {
-      log.info(this + " unregisteringHandler, serverID:" + serverID + " consumerID:" + consumerID);
-      
+   { 
       Long lookup = computeLookup(serverID, consumerID);
 
       return (MessageCallbackHandler)callbackHandlers.remove(lookup);

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -36,6 +36,7 @@
 import org.jboss.jms.message.MessageProxy;
 import org.jboss.jms.tx.AckInfo;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Message;
 import org.jboss.messaging.util.Future;
 import org.jboss.remoting.callback.HandleCallbackException;
 
@@ -216,9 +217,9 @@
     *         or -1 if closed
     */
    public HandleMessageResponse handleMessage(List msgs) throws HandleCallbackException
-   {            
+   {                      
       if (trace) { log.trace(this + " receiving " + msgs.size() + " message(s) from the remoting layer"); }
-                      
+            
       synchronized (mainLock)
       {
          if (closed)
@@ -516,12 +517,7 @@
          messagesAdded();
       }
    }
-   
-   public void clearBuffer()
-   {
-      buffer.clear();
-   }
-   
+     
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
@@ -780,6 +776,22 @@
          consumerDelegate.confirmDelivery(count);
       }
    }
+   
+   public void copyState(MessageCallbackHandler newHandler)
+   {
+      synchronized (mainLock)
+      {
+         this.consumerID = newHandler.consumerID;
+         
+         this.consumerDelegate = newHandler.consumerDelegate;
+         
+         this.sessionDelegate = newHandler.sessionDelegate;
+         
+         this.serverSending = false;
+         
+         this.buffer.clear();
+      }
+   }
 
 }
 

Modified: trunk/src/main/org/jboss/jms/client/state/BrowserState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/BrowserState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/BrowserState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -67,7 +67,6 @@
       return parent.getVersionToUse();
    }
 
-
    public org.jboss.jms.destination.JBossDestination getJmsDestination()
    {
       return jmsDestination;
@@ -77,15 +76,23 @@
    {
       return messageSelector;
    }
-
+   
    public void setParent(HierarchicalState parent)
    {
       this.parent=(SessionState)parent;
    }
+   
    public HierarchicalState getParent()
    {
       return parent;
    }
-
+   
+   // When failing over a browser, we keep the old browser's state but there are certain fields
+   // we need to update
+   public void copyState(BrowserState newState)
+   {      
+      //Actually only one field
+      this.delegate = newState.delegate;
+   }
 }
 

Modified: trunk/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/ConnectionState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -70,7 +70,6 @@
 
    protected boolean started;
 
-
    /** This property used to be delcared on ConnectionAspect */
    private String clientID;
 
@@ -83,10 +82,9 @@
     /** This property used to be delcared on ConnectionAspect */
    private boolean listenerAdded;
    
-   
    public ConnectionState(int serverID, ConnectionDelegate delegate,
                           JMSRemotingConnection remotingConnection, Version versionToUse,
-                          ResourceManager rm, MessageIdGenerator gen)
+                          MessageIdGenerator gen)
       throws Exception
    {
       super(null, (DelegateSupport)delegate);
@@ -99,7 +97,10 @@
 
       this.versionToUse = versionToUse;
 
-      this.resourceManager = rm;
+      //Each connection has it's own resource manager
+      //If we can failover all connections with the same server id at the same time
+      //then we can maintain one rm per unique server as opposed to per connection
+      this.resourceManager = new ResourceManager();
 
       this.idGenerator = gen;
 
@@ -230,12 +231,19 @@
     {
         return null;
     }
-
-    public void copy(ConnectionState newState)
+    
+    //When failing over a connection, we keep the old connection's state but there are certain fields
+    //we need to update
+    public void copyState(ConnectionState newState)
     {
-        this.serverID = newState.serverID;
-        this.idGenerator = newState.idGenerator;
+       this.remotingConnection = newState.remotingConnection;
+       
+       this.idGenerator = newState.idGenerator;
+       
+       this.serverID = newState.serverID;
+       
+       this.versionToUse = newState.versionToUse;
+       
+       this.delegate = newState.delegate;
     }
-
-
 }

Modified: trunk/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/ConsumerState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -63,9 +63,12 @@
    
    private int maxDeliveries;
    
+   //Needed for failover
+   private long channelId;
+   
    public ConsumerState(SessionState parent, ConsumerDelegate delegate, Destination dest,
                         String selector, boolean noLocal, String subscriptionName, int consumerID,
-                        boolean isCC, int prefetchSize, int maxDeliveries)
+                        boolean isCC, int prefetchSize, int maxDeliveries, long channelId)
    {
       super(parent, (DelegateSupport)delegate);
       children = Collections.EMPTY_SET;
@@ -77,6 +80,7 @@
       this.prefetchSize = prefetchSize;
       this.subscriptionName=subscriptionName;
       this.maxDeliveries = maxDeliveries;
+      this.channelId = channelId;
    }
 
    public DelegateSupport getDelegate()
@@ -155,14 +159,25 @@
       this.subscriptionName = subscriptionName;
    }
 
-   public void copy(ConsumerState newState)
-   {
-      this.consumerID = newState.consumerID;
-   }
-
    public int getMaxDeliveries()
    {
       return maxDeliveries;
    }
+   
+   public long getChannelId()
+   {
+      return channelId;
+   }
+   
+   // When failing over a consumer, we keep the old consumer's state but there are certain fields
+   // we need to update
+   public void copyState(ConsumerState newState)
+   {      
+      this.consumerID = newState.consumerID;
+      
+      this.delegate = newState.delegate;
+      
+      this.channelId = newState.channelId;
+   }
 
 }

Modified: trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/HierarchicalState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -41,9 +41,11 @@
    Set getChildren();
    
    DelegateSupport getDelegate();
+   
    void setDelegate(DelegateSupport delegate);
 
    HierarchicalState getParent();
+   
    void setParent(HierarchicalState parent);
 
    Version getVersionToUse();

Modified: trunk/src/main/org/jboss/jms/client/state/ProducerState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/ProducerState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/ProducerState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -56,102 +56,107 @@
    private SessionState parent;
 
    private ProducerDelegate delegate;
-
+   
    public ProducerState(SessionState parent, ProducerDelegate delegate, Destination dest)
    {
       super(parent, (DelegateSupport)delegate);
       children = Collections.EMPTY_SET;
       this.destination = dest;
    }
-
+   
    public Destination getDestination()
    {
       return destination;
    }
-
-
-    public DelegateSupport getDelegate()
-    {
-        return (DelegateSupport)delegate;
-    }
-
-    public void setDelegate(DelegateSupport delegate)
-    {
-        this.delegate=(ProducerDelegate)delegate;
-    }
-
-
-    public void setParent(HierarchicalState parent)
-    {
-        this.parent = (SessionState)parent;
-    }
-    public HierarchicalState getParent()
-    {
-        return parent;
-    }
-
-
+      
+   public DelegateSupport getDelegate()
+   {
+      return (DelegateSupport)delegate;
+   }
+   
+   public void setDelegate(DelegateSupport delegate)
+   {
+      this.delegate=(ProducerDelegate)delegate;
+   }
+      
+   public void setParent(HierarchicalState parent)
+   {
+      this.parent = (SessionState)parent;
+   }
+   
+   public HierarchicalState getParent()
+   {
+      return parent;
+   }
+      
    public void setDestination(Destination dest)
    {
       this.destination = dest;
-
+      
    }
    public boolean isDisableMessageID()
    {
       return disableMessageID;
    }
-
+   
    public void setDisableMessageID(boolean disableMessageID)
    {
       this.disableMessageID = disableMessageID;
    }
-
+   
    public boolean isDisableMessageTimestamp()
    {
       return disableMessageTimestamp;
    }
-
+   
    public void setDisableMessageTimestamp(boolean disableMessageTimestamp)
    {
       this.disableMessageTimestamp = disableMessageTimestamp;
    }
-
+   
    public int getPriority()
    {
       return priority;
    }
-
+   
    public void setPriority(int priority)
    {
       this.priority = priority;
    }
-
+   
    public long getTimeToLive()
    {
       return timeToLive;
    }
-
+   
    public void setTimeToLive(long timeToLive)
    {
       this.timeToLive = timeToLive;
    }
-
+   
    public int getDeliveryMode()
    {
       return deliveryMode;
    }
-
+   
    public void setDeliveryMode(int deliveryMode)
    {
       this.deliveryMode = deliveryMode;
    }
-
+   
    public Version getVersionToUse()
    {
       return parent.getVersionToUse();
    }
-
-
+   
+   // When failing over a producer, we keep the old producer's state but there are certain fields
+   // we need to update
+   public void copyState(ProducerState newState)
+   {      
+      //Actually only one field
+      this.delegate = newState.delegate;
+   }
+   
 }
 
 

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -69,64 +69,65 @@
    private List toAck;
 
    private ConnectionState parent;
+   
    private SessionDelegate delegate;
    
    private Map callbackHandlers;
    
    public SessionState(ConnectionState parent, SessionDelegate delegate,
-                       boolean transacted, int ackMode, boolean xa)
+            boolean transacted, int ackMode, boolean xa)
    {
       super(parent, (DelegateSupport)delegate);
       children = new HashSet();
       this.acknowledgeMode = ackMode;
       this.transacted = transacted;
       this.xa = xa;
-
+      
       if (xa)
       {
          // Create an XA resource
          xaResource = new MessagingXAResource(parent.getResourceManager(), this);                            
       }
-
+      
       // If session is transacted and XA, the currentTxId will be updated when the XAResource will
       // be enrolled with a global transaction.
-
+      
       if (transacted & !xa)
       {
          // Create a local tx
          currentTxId = parent.getResourceManager().createLocalTx();        
       }
-
+      
       executor = new QueuedExecutor(new LinkedQueue());
       
       toAck = new ArrayList();
-
+      
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
       callbackHandlers = new HashMap();
    }
-
-
-    public void setParent(HierarchicalState parent)
-    {
-        this.parent = (ConnectionState)parent;
-    }
-    public HierarchicalState getParent()
-    {
-        return parent;
-    }
-
-    public DelegateSupport getDelegate()
-    {
-        return (DelegateSupport)delegate;
-    }
-
-    public void setDelegate(DelegateSupport delegate)
-    {
-        this.delegate=(SessionDelegate)delegate;
-    }
-
-
+   
+   
+   public void setParent(HierarchicalState parent)
+   {
+      this.parent = (ConnectionState)parent;
+   }
+   public HierarchicalState getParent()
+   {
+      return parent;
+   }
+   
+   public DelegateSupport getDelegate()
+   {
+      return (DelegateSupport)delegate;
+   }
+   
+   public void setDelegate(DelegateSupport delegate)
+   {
+      this.delegate=(SessionDelegate)delegate;
+   }
+   
+   
    /**
     * @return List<AckInfo>
     */
@@ -134,7 +135,7 @@
    {
       return toAck;
    }
-    
+   
    public int getAcknowledgeMode()
    {
       return acknowledgeMode;
@@ -204,5 +205,13 @@
    {
       return new ArrayList(callbackHandlers.values());
    }
+   
+   // When failing over a session, we keep the old session's state but there are certain fields
+   // we need to update
+   public void copyState(SessionState newState)
+   {      
+      //Actually only one field
+      this.delegate = newState.delegate;
+   }
 }
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -649,6 +649,9 @@
             deliveries.put(new Long(del.getReference().getMessageID()), del);
          }
       }
+      
+      //Prompt delivery
+      messageQueue.deliver(false);
    }
 
    protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -27,6 +27,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +60,7 @@
 import org.jboss.jms.util.ExceptionUtil;
 import org.jboss.jms.util.MessageQueueNameHelper;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.Queue;
 import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.IdManager;
@@ -117,7 +119,7 @@
    private int nodeId;
    private int maxDeliveryAttempts;
    private Queue dlq;
-
+   
    // Constructors --------------------------------------------------
 
    protected ServerSessionEndpoint(int sessionID, ServerConnectionEndpoint connectionEndpoint)
@@ -144,349 +146,41 @@
       dlq = sp.getDLQ();
       tr = sp.getTxRepository();
       maxDeliveryAttempts = sp.getMaxDeliveryAttempts();
+
    }
-
+   
    // SessionDelegate implementation --------------------------------
-      
-   public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
-                                            String selectorString,
-                                            boolean noLocal,  String subscriptionName,
-                                            boolean connectionConsumer,
-                                            long oldChannelID) throws JMSException
-   {
-      //TODO we must ensure that the server side failover has completed first before
-      //letting this method run
-      
-      try
-      {
-         // fail over channel
-         if (postOffice.isLocal())
-         {
-            throw new IllegalStateException("Cannot failover on a non clustered post office!");
-         }
-
-         // this is a Clustered operation... so postOffice here must be Clustered
-         Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
-         if (binding == null)
-         {
-            throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
-         }
-         
-         // TODO - Remove this log.info before merging into trunk
-         if (binding.getQueue() instanceof RemoteQueueStub)
-         {
-            log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
-         }
-         else
-         {
-            log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
-         }
-         
-         int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
-         
-         int prefetchSize = connectionEndpoint.getPrefetchSize();
-         
-         ServerConsumerEndpoint ep =
-
-            new ServerConsumerEndpoint(consumerID, binding.getQueue(),
-                                       binding.getQueue().getName(), this, selectorString, noLocal,
-                                       jmsDestination, prefetchSize, dlq);
-
-         JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-         
-         ClientConsumerDelegate stub =
-            new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
-                                       prefetchSize, maxDeliveryAttempts);
-
-         
-         putConsumerEndpoint(consumerID, ep); // caching consumer locally
-         
-         connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
-         
-         return stub;
-      }
-      catch (Throwable t)
-      {
-         throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
-      }
-   }
-
-   /*
-    * Please don't put failover logic in here
-    */
-	public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
+       
+   public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
                                                   String selectorString,
                                                   boolean noLocal,
                                                   String subscriptionName,
-                                                  boolean isCC) throws JMSException
+                                                  boolean isCC,
+                                                  long failoverChannelId) throws JMSException
    {
       try
       {
-         if (closed)
+         if (failoverChannelId == -1)
          {
-            throw new IllegalStateException("Session is closed");
-         }
-
-         if ("".equals(selectorString))
-         {
-            selectorString = null;
-         }
-
-         log.debug("creating consumer for " + jmsDestination + ", selector " + selectorString + ", " + (noLocal ? "noLocal, " : "") + "subscription " + subscriptionName);
-
-         ManagedDestination mDest = dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue());
-
-         if (mDest == null)
-         {
-            throw new InvalidDestinationException("No such destination: " + jmsDestination);
-         }
-
-         if (jmsDestination.isTemporary())
-         {
-            // Can only create a consumer for a temporary destination on the same connection
-            // that created it
-            if (!connectionEndpoint.hasTemporaryDestination(jmsDestination))
-            {
-               String msg = "Cannot create a message consumer on a different connection " +
-                            "to that which created the temporary destination";
-               throw new IllegalStateException(msg);
-            }
-         }
-
-         int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
-
-         Binding binding = null;
-
-         // Always validate the selector first
-         Selector selector = null;
-         if (selectorString != null)
-         {
-            selector = new Selector(selectorString);
-         }
-
-         if (jmsDestination.isTopic())
-         {
-            JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
+            //Standard createConsumerDelegate
             
-            if (subscriptionName == null)
-            {
-               // non-durable subscription
-               if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
-
-               //Create the non durable sub
-               QueuedExecutor executor = (QueuedExecutor)pool.get();
-
-               PagingFilteredQueue q;
-
-               if (postOffice.isLocal())
-               {
-                  q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
-                                              executor, selector,
-                                              mDest.getFullSize(),
-                                              mDest.getPageSize(),
-                                              mDest.getDownCacheSize());
-
-                  binding = postOffice.bindQueue(topicCond, q);
-               }
-               else
-               {
-                  q = new LocalClusteredQueue(postOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
-                                              executor, selector, tr,
-                                              mDest.getFullSize(),
-                                              mDest.getPageSize(),
-                                              mDest.getDownCacheSize());
-
-                  ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
-                  if (mDest.isClustered())
-                  {
-                     binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
-                  }
-                  else
-                  {
-                     binding = cpo.bindQueue(topicCond, q);
-                  }
-               }
-            }
-            else
-            {
-               if (jmsDestination.isTemporary())
-               {
-                  throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
-               }
-
-               // we have a durable subscription, look it up
-               String clientID = connectionEndpoint.getClientID();
-               if (clientID == null)
-               {
-                  throw new JMSException("Cannot create durable subscriber without a valid client ID");
-               }
-
-               // See if there any bindings with the same client_id.subscription_name name
-
-               String name = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
-
-               binding = postOffice.getBindingForQueueName(name);
-
-               if (binding == null)
-               {
-                  //Does not already exist
-
-                  if (trace) { log.trace("creating new durable subscription on " + jmsDestination); }
-
-                  QueuedExecutor executor = (QueuedExecutor)pool.get();
-                  PagingFilteredQueue q;
-
-                  if (postOffice.isLocal())
-                  {
-                     q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
-                                                 executor, selector,
-                                                 mDest.getFullSize(),
-                                                 mDest.getPageSize(),
-                                                 mDest.getDownCacheSize());
-
-                     binding = postOffice.bindQueue(topicCond, q);
-                  }
-                  else
-                  {
-                     q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
-                                                 executor, selector, tr,
-                                                 mDest.getFullSize(),
-                                                 mDest.getPageSize(),
-                                                 mDest.getDownCacheSize());
-
-                     ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
-                     if (mDest.isClustered())
-                     {
-                        binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
-                     }
-                     else
-                     {
-                        binding = cpo.bindQueue(topicCond, q);
-                     }
-                  }
-               }
-               else
-               {
-                  //Durable sub already exists
-
-                  if (trace) { log.trace("subscription " + subscriptionName + " already exists"); }
-
-                  // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
-                  // A client can change an existing durable subscription by creating a durable
-                  // TopicSubscriber with the same name and a new topic and/or message selector.
-                  // Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
-                  // one and creating a new one.
-
-                  String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
-
-                  boolean selectorChanged =
-                     (selectorString == null && filterString != null) ||
-                     (filterString == null && selectorString != null) ||
-                     (filterString != null && selectorString != null &&
-                     !filterString.equals(selectorString));
-
-                  if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
-
-                  boolean topicChanged = !binding.getCondition().equals(jmsDestination.getName());
-
-                  if (log.isTraceEnabled()) { log.trace("topic " + (topicChanged ? "has" : "has NOT") + " changed"); }
-
-                  if (selectorChanged || topicChanged)
-                  {
-                     if (trace) { log.trace("topic or selector changed so deleting old subscription"); }
-
-                     // Unbind the durable subscription
-
-                     if (mDest.isClustered() && !postOffice.isLocal())
-                     {
-                        ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
-                        cpo.unbindClusteredQueue(name);
-                     }
-                     else
-                     {
-                        postOffice.unbindQueue(name);
-                     }
-
-                     // create a fresh new subscription
-
-                     QueuedExecutor executor = (QueuedExecutor)pool.get();
-                     PagingFilteredQueue q;
-
-                     if (postOffice.isLocal())
-                     {
-                        q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
-                                                    executor, selector,
-                                                    mDest.getFullSize(),
-                                                    mDest.getPageSize(),
-                                                    mDest.getDownCacheSize());
-                        binding = postOffice.bindQueue(topicCond, q);
-                     }
-                     else
-                     {
-                        q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
-                                                    executor, selector, tr,
-                                                    mDest.getFullSize(),
-                                                    mDest.getPageSize(),
-                                                    mDest.getDownCacheSize());
-
-                        ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
-
-                        if (mDest.isClustered())
-                        {
-                           binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
-                        }
-                        else
-                        {
-                           binding = cpo.bindQueue(topicCond, (LocalClusteredQueue)q);
-                        }
-                     }
-                  }
-               }
-            }
+            return createConsumerDelegateInternal(jmsDestination, selectorString, noLocal, subscriptionName,
+                                                  isCC);
          }
          else
          {
-            //Consumer on a jms queue
-
-            //Let's find the binding
-            binding = postOffice.getBindingForQueueName(jmsDestination.getName());
-
-            if (binding == null)
-            {
-               throw new IllegalStateException("Cannot find binding for jms queue: " + jmsDestination.getName());
-            }
-         }
-
-         int prefetchSize = connectionEndpoint.getPrefetchSize();
-
-         ServerConsumerEndpoint ep =
-            new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
-                                       binding.getQueue().getName(), this, selectorString, noLocal,
-                                       jmsDestination, prefetchSize, dlq);
-          
-         JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
-
-         ClientConsumerDelegate stub =
-            new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
-                                       prefetchSize, maxDeliveryAttempts);
-                       
-         putConsumerEndpoint(consumerID, ep); // caching consumer locally
-         
-         connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
-         
-         log.debug("created and registered " + ep);
-   
-         return stub;
+            //Failover of consumer
+            
+            return failoverConsumer(jmsDestination, selectorString, noLocal, subscriptionName,
+                                    isCC, failoverChannelId);
+         }         
       }
       catch (Throwable t)
       {
          throw ExceptionUtil.handleJMSInvocation(t, this + " createConsumerDelegate");
       }
    }
-	
+      
 	public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination, String messageSelector)
 	   throws JMSException
 	{
@@ -1000,23 +694,7 @@
    }
    
    // Protected -----------------------------------------------------
-   
-   protected void acknowledgeInternal(AckInfo ackInfo) throws Throwable
-   {
-      //If the message was delivered via a connection consumer then the message needs to be acked
-      //via the original consumer that was used to feed the connection consumer - which
-      //won't be one of the consumers of this session
-      //Therefore we always look in the global map of consumers held in the server peer
-      ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
-
-      if (consumer == null)
-      {
-         throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
-      }
       
-      consumer.acknowledge(ackInfo.getMessageID());
-   }
-   
    protected ServerConsumerEndpoint putConsumerEndpoint(int consumerID, ServerConsumerEndpoint d)
    {
       if (trace) { log.trace(this + " caching consumer " + consumerID); }
@@ -1072,6 +750,341 @@
    }   
 
    // Private -------------------------------------------------------
+   
+   private void acknowledgeInternal(AckInfo ackInfo) throws Throwable
+   {
+      //If the message was delivered via a connection consumer then the message needs to be acked
+      //via the original consumer that was used to feed the connection consumer - which
+      //won't be one of the consumers of this session
+      //Therefore we always look in the global map of consumers held in the server peer
+      ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ackInfo.getConsumerID());
 
+      if (consumer == null)
+      {
+         throw new IllegalArgumentException("Cannot find consumer id: " + ackInfo.getConsumerID());
+      }
+      
+      consumer.acknowledge(ackInfo.getMessageID());
+   }
+   
+   private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
+            String selectorString,
+            boolean noLocal,  String subscriptionName,
+            boolean connectionConsumer,
+            long oldChannelID) throws Exception
+   {
+      //fail over channel
+      if (postOffice.isLocal())
+      {
+         throw new IllegalStateException("Cannot failover on a non clustered post office!");
+      }
+      
+      //this is a Clustered operation... so postOffice here must be Clustered
+      Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+      if (binding == null)
+      {
+         throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+      }
+      
+      // TODO - Remove this log.info before merging into trunk
+      if (binding.getQueue() instanceof RemoteQueueStub)
+      {
+         log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((RemoteQueueStub)binding.getQueue()).getChannelID());
+      }
+      else
+      {
+         log.info("OldChannelId=" + oldChannelID + " while currentChannelId=" + ((PagingFilteredQueue)binding.getQueue()).getChannelID());
+      }
+      
+      int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
+      
+      int prefetchSize = connectionEndpoint.getPrefetchSize();
+      
+      ServerConsumerEndpoint ep =
+         
+         new ServerConsumerEndpoint(consumerID, binding.getQueue(),
+                  binding.getQueue().getName(), this, selectorString, noLocal,
+                  jmsDestination, prefetchSize, dlq);
+      
+      JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
+      
+      ClientConsumerDelegate stub =
+         new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
+                  prefetchSize, maxDeliveryAttempts);
+      
+      
+      putConsumerEndpoint(consumerID, ep); // caching consumer locally
+      
+      connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
+      
+      return stub;
+   }
+   
+   private ConsumerDelegate createConsumerDelegateInternal(JBossDestination jmsDestination,
+            String selectorString,
+            boolean noLocal,
+            String subscriptionName,
+            boolean isCC) throws Throwable
+   {
+      if (closed)
+      {
+         throw new IllegalStateException("Session is closed");
+      }
+      
+      if ("".equals(selectorString))
+      {
+         selectorString = null;
+      }
+      
+      log.debug("creating consumer for " + jmsDestination + ", selector " + selectorString + ", " + (noLocal ? "noLocal, " : "") + "subscription " + subscriptionName);
+      
+      ManagedDestination mDest = dm.getDestination(jmsDestination.getName(), jmsDestination.isQueue());
+      
+      if (mDest == null)
+      {
+         throw new InvalidDestinationException("No such destination: " + jmsDestination);
+      }
+      
+      if (jmsDestination.isTemporary())
+      {
+         // Can only create a consumer for a temporary destination on the same connection
+         // that created it
+         if (!connectionEndpoint.hasTemporaryDestination(jmsDestination))
+         {
+            String msg = "Cannot create a message consumer on a different connection " +
+            "to that which created the temporary destination";
+            throw new IllegalStateException(msg);
+         }
+      }
+      
+      int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
+      
+      Binding binding = null;
+      
+      //Always validate the selector first
+      Selector selector = null;
+      if (selectorString != null)
+      {
+         selector = new Selector(selectorString);
+      }
+      
+      if (jmsDestination.isTopic())
+      {
+         JMSCondition topicCond = new JMSCondition(false, jmsDestination.getName());
+         
+         if (subscriptionName == null)
+         {
+            // non-durable subscription
+            if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
+            
+            // Create the non durable sub
+            QueuedExecutor executor = (QueuedExecutor)pool.get();
+            
+            PagingFilteredQueue q;
+            
+            if (postOffice.isLocal())
+            {
+               q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
+                        executor, selector,
+                        mDest.getFullSize(),
+                        mDest.getPageSize(),
+                        mDest.getDownCacheSize());
+               
+               binding = postOffice.bindQueue(topicCond, q);
+            }
+            else
+            {
+               q = new LocalClusteredQueue(postOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
+                        executor, selector, tr,
+                        mDest.getFullSize(),
+                        mDest.getPageSize(),
+                        mDest.getDownCacheSize());
+               
+               ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+               
+               if (mDest.isClustered())
+               {
+                  binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
+               }
+               else
+               {
+                  binding = cpo.bindQueue(topicCond, q);
+               }
+            }
+         }
+         else
+         {
+            if (jmsDestination.isTemporary())
+            {
+               throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
+            }
+            
+            // we have a durable subscription, look it up
+            String clientID = connectionEndpoint.getClientID();
+            if (clientID == null)
+            {
+               throw new JMSException("Cannot create durable subscriber without a valid client ID");
+            }
+            
+            // See if there any bindings with the same client_id.subscription_name name
+            
+            String name = MessageQueueNameHelper.createSubscriptionName(clientID, subscriptionName);
+            
+            binding = postOffice.getBindingForQueueName(name);
+            
+            if (binding == null)
+            {
+               // Does not already exist
+               
+               if (trace) { log.trace("creating new durable subscription on " + jmsDestination); }
+               
+               QueuedExecutor executor = (QueuedExecutor)pool.get();
+               PagingFilteredQueue q;
+               
+               if (postOffice.isLocal())
+               {
+                  q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
+                           executor, selector,
+                           mDest.getFullSize(),
+                           mDest.getPageSize(),
+                           mDest.getDownCacheSize());
+                  
+                  binding = postOffice.bindQueue(topicCond, q);
+               }
+               else
+               {
+                  q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
+                           executor, selector, tr,
+                           mDest.getFullSize(),
+                           mDest.getPageSize(),
+                           mDest.getDownCacheSize());
+                  
+                  ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+                  
+                  if (mDest.isClustered())
+                  {
+                     binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
+                  }
+                  else
+                  {
+                     binding = cpo.bindQueue(topicCond, q);
+                  }
+               }
+            }
+            else
+            {
+               //Durable sub already exists
+               
+               if (trace) { log.trace("subscription " + subscriptionName + " already exists"); }
+               
+               // From javax.jms.Session Javadoc (and also JMS 1.1 6.11.1):
+               // A client can change an existing durable subscription by creating a durable
+               // TopicSubscriber with the same name and a new topic and/or message selector.
+               // Changing a durable subscriber is equivalent to unsubscribing (deleting) the old
+               // one and creating a new one.
+               
+               String filterString = binding.getQueue().getFilter() != null ? binding.getQueue().getFilter().getFilterString() : null;
+               
+               boolean selectorChanged =
+                  (selectorString == null && filterString != null) ||
+                  (filterString == null && selectorString != null) ||
+                  (filterString != null && selectorString != null &&
+                           !filterString.equals(selectorString));
+               
+               if (trace) { log.trace("selector " + (selectorChanged ? "has" : "has NOT") + " changed"); }
+               
+               boolean topicChanged = !binding.getCondition().equals(jmsDestination.getName());
+               
+               if (log.isTraceEnabled()) { log.trace("topic " + (topicChanged ? "has" : "has NOT") + " changed"); }
+               
+               if (selectorChanged || topicChanged)
+               {
+                  if (trace) { log.trace("topic or selector changed so deleting old subscription"); }
+                  
+                  // Unbind the durable subscription
+                  
+                  if (mDest.isClustered() && !postOffice.isLocal())
+                  {
+                     ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+                     
+                     cpo.unbindClusteredQueue(name);
+                  }
+                  else
+                  {
+                     postOffice.unbindQueue(name);
+                  }
+                  
+                  // create a fresh new subscription
+                  
+                  QueuedExecutor executor = (QueuedExecutor)pool.get();
+                  PagingFilteredQueue q;
+                  
+                  if (postOffice.isLocal())
+                  {
+                     q = new PagingFilteredQueue(name, idm.getId(), ms, pm, true, true,
+                              executor, selector,
+                              mDest.getFullSize(),
+                              mDest.getPageSize(),
+                              mDest.getDownCacheSize());
+                     binding = postOffice.bindQueue(topicCond, q);
+                  }
+                  else
+                  {
+                     q = new LocalClusteredQueue(postOffice, nodeId, name, idm.getId(), ms, pm, true, true,
+                              executor, selector, tr,
+                              mDest.getFullSize(),
+                              mDest.getPageSize(),
+                              mDest.getDownCacheSize());
+                     
+                     ClusteredPostOffice cpo = (ClusteredPostOffice)postOffice;
+                     
+                     if (mDest.isClustered())
+                     {
+                        binding = cpo.bindClusteredQueue(topicCond, (LocalClusteredQueue)q);
+                     }
+                     else
+                     {
+                        binding = cpo.bindQueue(topicCond, (LocalClusteredQueue)q);
+                     }
+                  }
+               }
+            }
+         }
+      }
+      else
+      {
+         // Consumer on a jms queue
+         
+         // Let's find the binding
+         binding = postOffice.getBindingForQueueName(jmsDestination.getName());
+         
+         if (binding == null)
+         {
+            throw new IllegalStateException("Cannot find binding for jms queue: " + jmsDestination.getName());
+         }
+      }
+      
+      int prefetchSize = connectionEndpoint.getPrefetchSize();
+      
+      ServerConsumerEndpoint ep =
+         new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
+                  binding.getQueue().getName(), this, selectorString, noLocal,
+                  jmsDestination, prefetchSize, dlq);
+      
+      JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
+      
+      ClientConsumerDelegate stub =
+         new ClientConsumerDelegate(consumerID, binding.getQueue().getChannelID(),
+                  prefetchSize, maxDeliveryAttempts);
+      
+      putConsumerEndpoint(consumerID, ep); // caching consumer locally
+      
+      connectionEndpoint.getServerPeer().putConsumerEndpoint(consumerID, ep); // cachin consumer in server peer
+      
+      log.debug("created and registered " + ep);
+      
+      return stub;
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -46,15 +46,10 @@
  */
 public interface SessionEndpoint extends Closeable
 {
-   ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
-                                     String selectorString,
-                                     boolean noLocal,  String subscriptionName,
-                                     boolean connectionConsumer,
-                                     long oldchannelID) throws JMSException;
-
    ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                            boolean noLocal, String subscriptionName,
-                                           boolean connectionConsumer) throws JMSException;
+                                           boolean connectionConsumer,
+                                           long failoverChannelID) throws JMSException;
    
    BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
       throws JMSException;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -86,22 +86,11 @@
    
    public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
                                                   boolean noLocal, String subscriptionName,
-                                                  boolean connectionConsumer) throws JMSException
+                                                  boolean connectionConsumer, long failoverChannelId) throws JMSException
    {
-      return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer);
+      return endpoint.createConsumerDelegate(destination, selector, noLocal, subscriptionName, connectionConsumer, failoverChannelId);
    }
    
-   public ConsumerDelegate failOverConsumer(JBossDestination jmsDestination,
-                                            String selectorString,
-                                            boolean noLocal,  String subscriptionName,
-                                            boolean connectionConsumer,
-                                            long oldChannelID) throws JMSException
-   {
-      return endpoint.failOverConsumer(jmsDestination, selectorString, noLocal,
-                                       subscriptionName, connectionConsumer,
-                                       oldChannelID);
-   }
-
    public BrowserDelegate createBrowserDelegate(JBossDestination queue, String messageSelector)
       throws JMSException
    {

Modified: trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/server/remoting/MetaDataConstants.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -36,26 +36,8 @@
    public static final String REMOTING_SESSION_ID = "REMOTING_SESSION_ID";
    
    public static final String CALLBACK_HANDLER = "CALLBACK_HANDLER";
-   
-   public static final String CONSUMER_ID = "CONSUMER_ID";   
-   
-   public static final String PREFETCH_SIZE = "BUFFER_SIZE";
-   
-   public static final String CLIENT_CONNECTION_ID = "CC_ID";
-   
+    
    public static final String VERSION_NUMBER = "VERSION_NUMBER";
    
    public static final String JMS_CLIENT_VM_ID = "JMS_CLIENT_VM_ID";
-
-   public static final String CF_DELEGATES = "CF_DELEGATES";
-   
-   public static final String SERVER_ID = "SERVER_ID";
-   
-   public static final String REMOTING_CONNECTION = "REMOTING_CONNECTION";
-   
-   public static final String FAILOVER_MAP = "CF_FAIL_IND";
-   
-   public static final String CONNECTION_VERSION = "CONNECTION_VERSION";
-
-   public static final String MAX_DELIVERIES = "MAX_DELS";
 }

Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -27,6 +27,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -72,11 +73,7 @@
    private static final Logger log = Logger.getLogger(ResourceManager.class);
    
    // Constructors --------------------------------------------------
-   
-   protected ResourceManager()
-   {      
-   }
-   
+    
    // Public --------------------------------------------------------
    
    public TxState getTx(Object xid)
@@ -123,14 +120,56 @@
     /**
      * Navigate on ACK and change consumer ids on every ACK not sent yet.
      */
-    public void handleFailover(Object xid, int oldConsumerID, int newConsumerID)
+    public void handleFailover(int oldConsumerID, int newConsumerID)
     {
         if (trace) { log.trace("handleFailover:: Transfering consumer id on ACKs from  " + oldConsumerID + " to " + newConsumerID); }
 
-        TxState tx = getTx(xid);
+        //TODO need to lock the rm while this is happening
         
-        tx.handleFailover(oldConsumerID, newConsumerID);
+        //Note we need to replace ids for *all* transactions - this is because, for XA
+        //the session might have done work in many transactions
+        Iterator iter = this.transactions.values().iterator();
+        
+        while (iter.hasNext())
+        {
+           TxState tx = (TxState)iter.next();
+           
+           tx.handleFailover(oldConsumerID, newConsumerID);
+        }                
     }
+    
+    /*
+     * Get all the ackinfos with a consumer id in the specified set
+     */
+    public List getAckInfosForConsumerIds(Set consumerIds)
+    {
+       Iterator iter = this.transactions.values().iterator();
+       
+       List ackInfos = new ArrayList();
+       
+       while (iter.hasNext())
+       {
+          TxState tx = (TxState)iter.next();
+          
+          tx.getAckInfosForConsumerIds(ackInfos, consumerIds);
+       }
+       
+       return ackInfos;
+    }
+    
+    public void removeNonPersistentAcks(Set consumerIds)
+    {
+       Iterator iter = this.transactions.values().iterator();
+       
+       List ackInfos = new ArrayList();
+       
+       while (iter.hasNext())
+       {
+          TxState tx = (TxState)iter.next();
+          
+          tx.removeNonPersistentAcks(consumerIds);
+       }
+    }
 
    
    /**

Deleted: trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManagerFactory.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -1,107 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.jms.tx;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class manages instances of ResourceManager. It ensures there is one instance per instance
- * of JMS server as specified by the server id.
- * 
- * This allows different JMS connections to the same JMS server (the underlying resource is the JMS server)
- * to use the same resource manager.
- * 
- * This means isSameRM() on XAResource returns true, allowing the Transaction manager to join work in one
- * tx to another thus allowing 1PC optimization which should help performance.
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version $Revision$
- *
- * $Id$
- */
-public class ResourceManagerFactory
-{      
-   public static final ResourceManagerFactory instance = new ResourceManagerFactory();
-   
-   private Map holders;
-   
-   private ResourceManagerFactory()
-   {      
-      holders = new HashMap();
-   }
-      
-   public synchronized boolean containsResourceManager(int serverID)
-   {
-      return holders.containsKey(new Integer(serverID));
-   }
-   
-   /**
-    * @param serverID - server peer ID.
-    */
-   public synchronized ResourceManager checkOutResourceManager(int serverID)
-   {
-      Integer in = new Integer(serverID);
-      
-      Holder h = (Holder)holders.get(in);
-      
-      if (h == null)
-      {
-         h = new Holder();
-         
-         holders.put(in, h);
-      }
-      else
-      {
-         h.refCount++;
-      }
-      
-      return h.rm;
-   }
-   
-   public synchronized void checkInResourceManager(int serverID)
-   {
-      Integer in = new Integer(serverID);
-      
-      Holder h = (Holder)holders.get(in);
-      
-      if (h == null)
-      {
-         throw new IllegalArgumentException("Cannot find resource manager for server: " + serverID);
-      }
-      
-      h.refCount--;
-      
-      if (h.refCount == 0)
-      {
-         holders.remove(in);
-      }      
-   }
-   
-   private static class Holder
-   {
-      ResourceManager rm = new ResourceManager();
-      
-      int refCount = 1;
-   }
-  
-}

Modified: trunk/src/main/org/jboss/jms/tx/TxState.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/TxState.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/src/main/org/jboss/jms/tx/TxState.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -26,6 +26,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.messaging.core.message.MessageFactory;
@@ -110,6 +111,32 @@
            }
        }
    }
+   
+   public void getAckInfosForConsumerIds(List ackInfos, Set consumerIds)
+   {
+      for (Iterator ackIterator = acks.iterator(); ackIterator.hasNext(); )
+      {
+          AckInfo ackInfo = (AckInfo)ackIterator.next();
+          
+          if (consumerIds.contains(new Integer(ackInfo.getConsumerID())))
+          {
+              ackInfos.add(ackInfo);
+          }
+      }
+   }
+   
+   public void removeNonPersistentAcks(Set consumerIds)
+   {
+      for (Iterator ackIterator = acks.iterator(); ackIterator.hasNext(); )
+      {
+          AckInfo ackInfo = (AckInfo)ackIterator.next();
+          
+          if (!ackInfo.msg.getMessage().isReliable())
+          {
+             ackIterator.remove();
+          }
+      }
+   }
     
    // Streamable implementation ---------------------------------
    

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/tests/build.xml	2006-12-12 18:25:55 UTC (rev 1771)
@@ -387,7 +387,7 @@
                     haltonerror="${junit.batchtest.haltonerror}">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${build.tests.classes}">
-               <include name="**/messaging/core/**/*Test.class"/>
+               <!-- <include name="**/messaging/core/**/*Test.class"/> -->
                <include name="**/messaging/jms/**/*Test.class"/>
                <exclude name="**/jms/stress/**"/>
                <exclude name="**/jms/crash/*Test.class"/>
@@ -745,7 +745,7 @@
                     haltonerror="${junit.batchtest.haltonerror}">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${build.tests.classes}">
-               <include name="**/jms/clustering/HATest.class"/>
+               <include name="**/jms/clustering/*Test.class"/>
                <!--
                <include name="**/jms/clustering/SimpleClusteringTest.class"/>
                -->

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionTest.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -44,8 +44,6 @@
 import org.jboss.jms.message.MessageIdGenerator;
 import org.jboss.jms.message.MessageIdGeneratorFactory;
 import org.jboss.jms.server.ServerPeer;
-import org.jboss.jms.tx.ResourceManager;
-import org.jboss.jms.tx.ResourceManagerFactory;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.InvocationRequest;
 import org.jboss.remoting.ServerInvocationHandler;
@@ -121,42 +119,8 @@
          Connection conn = cf.createConnection();
          conn.close();        
       }
-   }
+   }     
    
-   public void testResourceManagersForSameServer() throws Exception
-   {
-      Connection conn1 = cf.createConnection();      
-            
-      ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)conn1).getDelegate();
-      
-      ConnectionState state1 = (ConnectionState)del1.getState();
-      
-      ResourceManager rm1 = state1.getResourceManager();
-      
-      Connection conn2 = cf.createConnection();      
-      
-      ClientConnectionDelegate del2 = (ClientConnectionDelegate)((JBossConnection)conn2).getDelegate();
-      
-      ConnectionState state2 = (ConnectionState)del2.getState();
-      
-      ResourceManager rm2 = state2.getResourceManager();
-
-      //Two connections for same server should share the same resource manager
-      
-      assertTrue(rm1 == rm2);
-      
-      assertTrue(ResourceManagerFactory.instance.containsResourceManager(state2.getServerID()));
-      
-      conn1.close();
-      
-      //Check reference counting
-      assertTrue(ResourceManagerFactory.instance.containsResourceManager(state2.getServerID()));
-           
-      conn2.close();
-      
-      assertFalse(ResourceManagerFactory.instance.containsResourceManager(state2.getServerID()));     
-   }
-   
    public void testMessageIDGeneratorsForSameServer() throws Exception
    {
       Connection conn1 = cf.createConnection();      

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-12 10:49:42 UTC (rev 1770)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-12 18:25:55 UTC (rev 1771)
@@ -37,6 +37,7 @@
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
+import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -69,489 +70,489 @@
    }
    
    // Public --------------------------------------------------------
-//   
-//   /*
-//    * Test that connections created using a clustered connection factory are created round robin on
-//    * different servers
-//    */
-//   public void testRoundRobinConnectionCreation() throws Exception
-//   {
-//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//      
-//      ClusteredClientConnectionFactoryDelegate delegate =
-//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//      
-//      log.info ("number of delegates = " + delegate.getDelegates().length);
-//      log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-//      
-//      assertEquals(3, delegate.getDelegates().length);
-//      
-//      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//      
-//      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//      
-//      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//      
-//      assertEquals(0, cf1.getServerId());
-//      
-//      assertEquals(1, cf2.getServerId());
-//      
-//      assertEquals(2, cf3.getServerId());
-//      
-//      assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-//      
-//      Connection conn1 = null;
-//      
-//      Connection conn2 = null;
-//      
-//      Connection conn3 = null;
-//      
-//      Connection conn4 = null;
-//      
-//      Connection conn5 = null;
-//      
-//      try
-//      {         
-//         conn1 = factory.createConnection();
-//         
-//         conn2 = factory.createConnection();
-//         
-//         conn3 = factory.createConnection();
-//         
-//         conn4 = factory.createConnection();
-//         
-//         conn5 = factory.createConnection();
-//         
-//         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-//         
-//         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-//         
-//         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-//         
-//         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-//         
-//         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-//         
-//         int serverID1 = state1.getServerID();
-//         
-//         int serverID2 = state2.getServerID();
-//         
-//         int serverID3 = state3.getServerID();
-//         
-//         int serverID4 = state4.getServerID();
-//         
-//         int serverID5 = state5.getServerID();
-//         
-//         log.info("server id 1: " + serverID1);
-//         
-//         log.info("server id 2: " + serverID2);
-//         
-//         log.info("server id 3: " + serverID3);
-//         
-//         log.info("server id 4: " + serverID4);
-//         
-//         log.info("server id 5: " + serverID5);
-//         
-//         assertEquals(0, serverID1);
-//         
-//         assertEquals(1, serverID2);
-//         
-//         assertEquals(2, serverID3);
-//         
-//         assertEquals(0, serverID4);
-//         
-//         assertEquals(1, serverID5);
-//      }
-//      finally
-//      {
-//         if (conn1 != null)
-//         {
-//            conn1.close();
-//         }
-//         
-//         if (conn2 != null)
-//         {
-//            conn2.close();
-//         }
-//         
-//         if (conn3 != null)
-//         {
-//            conn3.close();
-//         }
-//         
-//         if (conn4 != null)
-//         {
-//            conn4.close();
-//         }
-//         
-//         if (conn5 != null)
-//         {
-//            conn5.close();
-//         }
-//      }
-//      
-//   }
-// 
-//   /*
-//    * Test that the failover mapping is created correctly and updated properly when nodes leave
-//    * or join
-//    */
-//   public void testDefaultFailoverMap() throws Exception
-//   {     
-//      {
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//         
-//         //The order here depends on the order the servers were started in
-//         
-//         //If any servers get stopped and then started then the order will change
-//    
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         log.info("cf3 serverid=" + cf3.getServerId());
-//         
-//         
-//         assertEquals(0, cf1.getServerId());
-//         
-//         assertEquals(1, cf2.getServerId());
-//         
-//         assertEquals(2, cf3.getServerId());
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         assertEquals(3, delegates.length);
-//         
-//         assertEquals(3, failoverMap.size());
-//         
-//         // Default failover policy just chooses the node to the right
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-//      }
-//      
-//      //Now cleanly stop one of the servers
-//            
-//      log.info("************** STOPPING SERVER 0");
-//      ServerManagement.stop(0);
-//      
-//      log.info("server stopped");
-//      
-//      assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-//      
-//      {         
-//         //Lookup another connection factory
-//         
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//         
-//         log.info("Got connection factory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         log.info("Got failover map");
-//         
-//         assertEquals(2, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         //Order here depends on order servers were started in
-//         
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         assertEquals(1, cf1.getServerId());
-//         
-//         assertEquals(2, cf2.getServerId());
-//         
-//         
-//         assertEquals(2, failoverMap.size());
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//      }
-//      
-//      //Cleanly stop another server
-//      
-//      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-//      
-//      ServerManagement.stop(1);
-//      
-//      assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-//      
-//      {         
-//         //Lookup another connection factory
-//         
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         assertEquals(1, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         assertEquals(2, cf1.getServerId());
-//         
-//         
-//         assertEquals(1, failoverMap.size());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//      }
-//            
-//      //Restart server 0
-//      
-//      ServerManagement.start("all", 0);
-//      
-//      {
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//         
-//         log.info("Got connection factory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         log.info("Got failover map");
-//         
-//         assertEquals(2, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         assertEquals(2, cf1.getServerId());
-//         
-//         assertEquals(0, cf2.getServerId());
-//         
-//         
-//         assertEquals(2, failoverMap.size());
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//      }
-//      
-//      
-//      //Restart server 1
-//      
-//      ServerManagement.start("all", 1);
-//      
-//      {
-//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-//         
-//         log.info("Got connection factory");
-//         
-//         ClusteredClientConnectionFactoryDelegate delegate =
-//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//         
-//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//         
-//         Map failoverMap = delegate.getFailoverMap();
-//         
-//         log.info("Got failover map");
-//         
-//         assertEquals(3, delegates.length);
-//         
-//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-//         
-//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-//         
-//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-//         
-//         log.info("cf1 serverid=" + cf1.getServerId());
-//         
-//         log.info("cf2 serverid=" + cf2.getServerId());
-//         
-//         log.info("cf3 serverid=" + cf3.getServerId());
-//         
-//         assertEquals(2, cf1.getServerId());
-//         
-//         assertEquals(0, cf2.getServerId());
-//         
-//         assertEquals(1, cf3.getServerId());
-//         
-//         
-//         assertEquals(3, failoverMap.size());
-//         
-//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-//         
-//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-//         
-//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-//      }            
-//   }
-//   
-//   public void testSimpleFailover() throws Exception
-//   {
-//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//      
-//      ClusteredClientConnectionFactoryDelegate delegate =
-//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-//      assertEquals(3, nodeIDView.size());
-//      
-//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//      
-//      ClientConnectionFactoryDelegate cf1 = delegates[0];
-//      
-//      ClientConnectionFactoryDelegate cf2 = delegates[1];
-//      
-//      ClientConnectionFactoryDelegate cf3 = delegates[2];
-//      
-//      int server0Id = cf1.getServerId();
-//      
-//      int server1Id = cf2.getServerId();
-//      
-//      int server2Id = cf3.getServerId();
-//      
-//      log.info("server 0 id: " + server0Id);
-//      
-//      log.info("server 1 id: " + server1Id);
-//      
-//      log.info("server 2 id: " + server2Id);
-//                  
-//      Map failoverMap = delegate.getFailoverMap();
-//      
-//      log.info(failoverMap.get(new Integer(server0Id)));
-//      log.info(failoverMap.get(new Integer(server1Id)));
-//      log.info(failoverMap.get(new Integer(server2Id)));
-//      
-//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//      
-//      // server 1 should failover onto server 2
-//      
-//      assertEquals(server2Id, server1FailoverId);
-//      
-//      Connection conn = null;
-//      
-//      try
-//      {
-//      
-//         //Get a connection on server 1
-//         conn = factory.createConnection(); //connection on server 0
-//         
-//         conn.close();
-//         
-//         conn = factory.createConnection(); //connection on server 1
-//         
-//         JBossConnection jbc = (JBossConnection)conn;
-//         
-//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//         
-//         ConnectionState state = (ConnectionState)del.getState();
-//         
-//         int initialServerID = state.getServerID();
-//         
-//         assertEquals(1, initialServerID);
-//                           
-//         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         MessageProducer prod = sess.createProducer(queue1);
-//         
-//         MessageConsumer cons = sess.createConsumer(queue1);
-//         
-//         final int NUM_MESSAGES = 100;
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            TextMessage tm = sess.createTextMessage("message:" + i);
-//            
-//            prod.send(tm);
-//         }
-//         
-//         //So now, messages should be in queue1 on server 1
-//         //So we now kill server 1
-//         //Which should cause transparent failover of connection conn onto server 1
-//         
-//         log.info("************ KILLING (CRASHING) SERVER 1");
-//         
-//         ServerManagement.getServer(1).destroy();
-//         
-//         log.info("killed server, now waiting");
-//         
-//         Thread.sleep(5000);
-//         
-//         log.info("done wait");
-//         
-//         state = (ConnectionState)del.getState();
-//         
-//         int finalServerID = state.getServerID();
-//         
-//         log.info("final server id= " + finalServerID);
-//         
-//         //server id should now be 2
-//         
-//         assertEquals(2, finalServerID);
-//         
-//         conn.start();
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            TextMessage tm = (TextMessage)cons.receive(1000);
-//            
-//            log.info("message is " + tm);
-//            
-//            assertNotNull(tm);
-//            
-//            assertEquals("message:" + i, tm.getText());
-//         }
-//         log.info("done");
-//      }
-//      finally
-//      {         
-//         if (conn != null)
-//         {
-//            try
-//            {
-//               conn.close();
-//            }
-//            catch (Exception e)
-//            {
-//               e.printStackTrace();
-//            }
-//         }
-//      }
-//      
-//   }
    
+   /*
+    * Test that connections created using a clustered connection factory are created round robin on
+    * different servers
+    */
+   public void testRoundRobinConnectionCreation() throws Exception
+   {
+      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+      
+      ClusteredClientConnectionFactoryDelegate delegate =
+         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+      
+      log.info ("number of delegates = " + delegate.getDelegates().length);
+      log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+      
+      assertEquals(3, delegate.getDelegates().length);
+      
+      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+      
+      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+      
+      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+      
+      assertEquals(0, cf1.getServerId());
+      
+      assertEquals(1, cf2.getServerId());
+      
+      assertEquals(2, cf3.getServerId());
+      
+      assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+      
+      Connection conn1 = null;
+      
+      Connection conn2 = null;
+      
+      Connection conn3 = null;
+      
+      Connection conn4 = null;
+      
+      Connection conn5 = null;
+      
+      try
+      {         
+         conn1 = factory.createConnection();
+         
+         conn2 = factory.createConnection();
+         
+         conn3 = factory.createConnection();
+         
+         conn4 = factory.createConnection();
+         
+         conn5 = factory.createConnection();
+         
+         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+         
+         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+         
+         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+         
+         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+         
+         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+         
+         int serverID1 = state1.getServerID();
+         
+         int serverID2 = state2.getServerID();
+         
+         int serverID3 = state3.getServerID();
+         
+         int serverID4 = state4.getServerID();
+         
+         int serverID5 = state5.getServerID();
+         
+         log.info("server id 1: " + serverID1);
+         
+         log.info("server id 2: " + serverID2);
+         
+         log.info("server id 3: " + serverID3);
+         
+         log.info("server id 4: " + serverID4);
+         
+         log.info("server id 5: " + serverID5);
+         
+         assertEquals(0, serverID1);
+         
+         assertEquals(1, serverID2);
+         
+         assertEquals(2, serverID3);
+         
+         assertEquals(0, serverID4);
+         
+         assertEquals(1, serverID5);
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+         
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+         
+         if (conn4 != null)
+         {
+            conn4.close();
+         }
+         
+         if (conn5 != null)
+         {
+            conn5.close();
+         }
+      }
+      
+   }
+ 
+   /*
+    * Test that the failover mapping is created correctly and updated properly when nodes leave
+    * or join
+    */
+   public void testDefaultFailoverMap() throws Exception
+   {     
+      {
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+         
+         //The order here depends on the order the servers were started in
+         
+         //If any servers get stopped and then started then the order will change
+    
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         log.info("cf3 serverid=" + cf3.getServerId());
+         
+         
+         assertEquals(0, cf1.getServerId());
+         
+         assertEquals(1, cf2.getServerId());
+         
+         assertEquals(2, cf3.getServerId());
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         assertEquals(3, delegates.length);
+         
+         assertEquals(3, failoverMap.size());
+         
+         // Default failover policy just chooses the node to the right
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+      }
+      
+      //Now cleanly stop one of the servers
+            
+      log.info("************** STOPPING SERVER 0");
+      ServerManagement.stop(0);
+      
+      log.info("server stopped");
+      
+      assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+      
+      {         
+         //Lookup another connection factory
+         
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+         
+         log.info("Got connection factory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         log.info("Got failover map");
+         
+         assertEquals(2, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         //Order here depends on order servers were started in
+         
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         assertEquals(1, cf1.getServerId());
+         
+         assertEquals(2, cf2.getServerId());
+         
+         
+         assertEquals(2, failoverMap.size());
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+      }
+      
+      //Cleanly stop another server
+      
+      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+      
+      ServerManagement.stop(1);
+      
+      assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+      
+      {         
+         //Lookup another connection factory
+         
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         assertEquals(1, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         assertEquals(2, cf1.getServerId());
+         
+         
+         assertEquals(1, failoverMap.size());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+      }
+            
+      //Restart server 0
+      
+      ServerManagement.start("all", 0);
+      
+      {
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+         
+         log.info("Got connection factory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         log.info("Got failover map");
+         
+         assertEquals(2, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         assertEquals(2, cf1.getServerId());
+         
+         assertEquals(0, cf2.getServerId());
+         
+         
+         assertEquals(2, failoverMap.size());
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+      }
+      
+      
+      //Restart server 1
+      
+      ServerManagement.start("all", 1);
+      
+      {
+         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+         
+         log.info("Got connection factory");
+         
+         ClusteredClientConnectionFactoryDelegate delegate =
+            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+         
+         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+         
+         Map failoverMap = delegate.getFailoverMap();
+         
+         log.info("Got failover map");
+         
+         assertEquals(3, delegates.length);
+         
+         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+         
+         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+         
+         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+         
+         log.info("cf1 serverid=" + cf1.getServerId());
+         
+         log.info("cf2 serverid=" + cf2.getServerId());
+         
+         log.info("cf3 serverid=" + cf3.getServerId());
+         
+         assertEquals(2, cf1.getServerId());
+         
+         assertEquals(0, cf2.getServerId());
+         
+         assertEquals(1, cf3.getServerId());
+         
+         
+         assertEquals(3, failoverMap.size());
+         
+         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+         
+         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+         
+         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+      }            
+   }
    
+   public void testSimpleFailover() throws Exception
+   {
+      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+      
+      ClusteredClientConnectionFactoryDelegate delegate =
+         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+      assertEquals(3, nodeIDView.size());
+      
+      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+      
+      ClientConnectionFactoryDelegate cf1 = delegates[0];
+      
+      ClientConnectionFactoryDelegate cf2 = delegates[1];
+      
+      ClientConnectionFactoryDelegate cf3 = delegates[2];
+      
+      int server0Id = cf1.getServerId();
+      
+      int server1Id = cf2.getServerId();
+      
+      int server2Id = cf3.getServerId();
+      
+      log.info("server 0 id: " + server0Id);
+      
+      log.info("server 1 id: " + server1Id);
+      
+      log.info("server 2 id: " + server2Id);
+                  
+      Map failoverMap = delegate.getFailoverMap();
+      
+      log.info(failoverMap.get(new Integer(server0Id)));
+      log.info(failoverMap.get(new Integer(server1Id)));
+      log.info(failoverMap.get(new Integer(server2Id)));
+      
+      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+      
+      // server 1 should failover onto server 2
+      
+      assertEquals(server2Id, server1FailoverId);
+      
+      Connection conn = null;
+      
+      try
+      {
+      
+         //Get a connection on server 1
+         conn = factory.createConnection(); //connection on server 0
+         
+         conn.close();
+         
+         conn = factory.createConnection(); //connection on server 1
+         
+         JBossConnection jbc = (JBossConnection)conn;
+         
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+         
+         ConnectionState state = (ConnectionState)del.getState();
+         
+         int initialServerID = state.getServerID();
+         
+         assertEquals(1, initialServerID);
+                           
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess.createProducer(queue1);
+         
+         MessageConsumer cons = sess.createConsumer(queue1);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("message:" + i);
+            
+            prod.send(tm);
+         }
+         
+         //So now, messages should be in queue1 on server 1
+         //So we now kill server 1
+         //Which should cause transparent failover of connection conn onto server 1
+         
+         log.info("************ KILLING (CRASHING) SERVER 1");
+         
+         ServerManagement.getServer(1).kill();
+         
+         log.info("killed server, now waiting");
+         
+         Thread.sleep(5000);
+         
+         log.info("done wait");
+         
+         state = (ConnectionState)del.getState();
+         
+         int finalServerID = state.getServerID();
+         
+         log.info("final server id= " + finalServerID);
+         
+         //server id should now be 2
+         
+         assertEquals(2, finalServerID);
+         
+         conn.start();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(1000);
+            
+            log.info("message is " + tm);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message:" + i, tm.getText());
+         }
+         log.info("done");
+      }
+      finally
+      {         
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+      
+   }
+   
+   
    public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
    {
       JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
@@ -723,178 +724,178 @@
       
    }
    
-//   public void testFailoverWithUnackedMessagesTransactional() throws Exception
-//   {
-//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-//      
-//      ClusteredClientConnectionFactoryDelegate delegate =
-//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-//
-//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
-//      assertEquals(3, nodeIDView.size());
-//      
-//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-//      
-//      ClientConnectionFactoryDelegate cf1 = delegates[0];
-//      
-//      ClientConnectionFactoryDelegate cf2 = delegates[1];
-//      
-//      ClientConnectionFactoryDelegate cf3 = delegates[2];
-//      
-//      int server0Id = cf1.getServerId();
-//      
-//      int server1Id = cf2.getServerId();
-//      
-//      int server2Id = cf3.getServerId();
-//      
-//      log.info("server 0 id: " + server0Id);
-//      
-//      log.info("server 1 id: " + server1Id);
-//      
-//      log.info("server 2 id: " + server2Id);
-//                  
-//      Map failoverMap = delegate.getFailoverMap();
-//      
-//      log.info(failoverMap.get(new Integer(server0Id)));
-//      log.info(failoverMap.get(new Integer(server1Id)));
-//      log.info(failoverMap.get(new Integer(server2Id)));
-//      
-//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
-//      
-//      // server 1 should failover onto server 2
-//      
-//      assertEquals(server2Id, server1FailoverId);
-//      
-//      Connection conn = null;
-//      
-//      try
-//      {      
-//         //Get a connection on server 1
-//         conn = factory.createConnection(); //connection on server 0
-//         
-//         conn.close();
-//         
-//         conn = factory.createConnection(); //connection on server 1
-//         
-//         JBossConnection jbc = (JBossConnection)conn;
-//         
-//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
-//         
-//         ConnectionState state = (ConnectionState)del.getState();
-//         
-//         int initialServerID = state.getServerID();
-//         
-//         assertEquals(1, initialServerID);
-//                           
-//         Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
-//         
-//         MessageProducer prod = sess.createProducer(queue1);
-//         
-//         MessageConsumer cons = sess.createConsumer(queue1);
-//         
-//         final int NUM_MESSAGES = 100;
-//         
-//         for (int i = 0; i < NUM_MESSAGES; i++)
-//         {
-//            TextMessage tm = sess.createTextMessage("message:" + i);
-//            
-//            prod.send(tm);
-//         }
-//         
-//         sess.commit();
-//         
-//         conn.start();
-//         
-//         //Now consume half of the messages but don't commit them these will end up in 
-//         //client side resource manager
-//         
-//         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-//         {
-//            TextMessage tm = (TextMessage)cons.receive(500);
-//            
-//            assertNotNull(tm);
-//            
-//            assertEquals("message:" + i, tm.getText());
-//         }
-//         
-//         //So now, messages should be in queue1 on server 1
-//         //So we now kill server 1
-//         //Which should cause transparent failover of connection conn onto server 1
-//         
-//         log.info("************ KILLING (CRASHING) SERVER 1");
-//         
-//         ServerManagement.getServer(1).kill();
-//
-//         log.info("killed server, now waiting");
-//         
-//         Thread.sleep(5000);
-//         
-//         log.info("done wait");
-//         
-//         state = (ConnectionState)del.getState();
-//         
-//         int finalServerID = state.getServerID();
-//         
-//         log.info("final server id= " + finalServerID);
-//         
-//         //server id should now be 2
-//         
-//         assertEquals(2, finalServerID);
-//         
-//         conn.start();
-//         
-//         //Now should be able to consume the rest of the messages
-//         
-//         log.info("here1");
-//         
-//         TextMessage tm = null;
-//         
-//         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-//         {
-//            tm = (TextMessage)cons.receive(500);
-//                                    
-//            log.info("message is " + tm.getText());
-//            
-//            assertNotNull(tm);
-//            
-//            assertEquals("message:" + i, tm.getText());
-//         }
-//         
-//         log.info("here2");
-//         
-//         //Now should be able to commit them
-//         
-//         sess.commit();
-//         
-//         //Now check there are no more messages there
-//         sess.close();
-//         
-//         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-//         
-//         cons = sess.createConsumer(queue1);
-//         
-//         Message m = cons.receive(500);
-//         
-//         assertNull(m);
-//         
-//         log.info("got to end of test");
-//      }
-//      finally
-//      {         
-//         if (conn != null)
-//         {
-//            try
-//            {
-//               conn.close();
-//            }
-//            catch (Exception e)
-//            {
-//               e.printStackTrace();
-//            }
-//         }
-//      }
-//      
-//   }
+   public void testFailoverWithUnackedMessagesTransactional() throws Exception
+   {
+      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+      
+      ClusteredClientConnectionFactoryDelegate delegate =
+         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+
+      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+      assertEquals(3, nodeIDView.size());
+      
+      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+      
+      ClientConnectionFactoryDelegate cf1 = delegates[0];
+      
+      ClientConnectionFactoryDelegate cf2 = delegates[1];
+      
+      ClientConnectionFactoryDelegate cf3 = delegates[2];
+      
+      int server0Id = cf1.getServerId();
+      
+      int server1Id = cf2.getServerId();
+      
+      int server2Id = cf3.getServerId();
+      
+      log.info("server 0 id: " + server0Id);
+      
+      log.info("server 1 id: " + server1Id);
+      
+      log.info("server 2 id: " + server2Id);
+                  
+      Map failoverMap = delegate.getFailoverMap();
+      
+      log.info(failoverMap.get(new Integer(server0Id)));
+      log.info(failoverMap.get(new Integer(server1Id)));
+      log.info(failoverMap.get(new Integer(server2Id)));
+      
+      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+      
+      // server 1 should failover onto server 2
+      
+      assertEquals(server2Id, server1FailoverId);
+      
+      Connection conn = null;
+      
+      try
+      {      
+         //Get a connection on server 1
+         conn = factory.createConnection(); //connection on server 0
+         
+         conn.close();
+         
+         conn = factory.createConnection(); //connection on server 1
+         
+         JBossConnection jbc = (JBossConnection)conn;
+         
+         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+         
+         ConnectionState state = (ConnectionState)del.getState();
+         
+         int initialServerID = state.getServerID();
+         
+         assertEquals(1, initialServerID);
+                           
+         Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageProducer prod = sess.createProducer(queue1);
+         
+         MessageConsumer cons = sess.createConsumer(queue1);
+         
+         final int NUM_MESSAGES = 100;
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess.createTextMessage("message:" + i);
+            
+            prod.send(tm);
+         }
+         
+         sess.commit();
+         
+         conn.start();
+         
+         //Now consume half of the messages but don't commit them these will end up in 
+         //client side resource manager
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(500);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message:" + i, tm.getText());
+         }
+         
+         //So now, messages should be in queue1 on server 1
+         //So we now kill server 1
+         //Which should cause transparent failover of connection conn onto server 1
+         
+         log.info("************ KILLING (CRASHING) SERVER 1");
+         
+         ServerManagement.getServer(1).kill();
+
+         log.info("killed server, now waiting");
+         
+         Thread.sleep(5000);
+         
+         log.info("done wait");
+         
+         state = (ConnectionState)del.getState();
+         
+         int finalServerID = state.getServerID();
+         
+         log.info("final server id= " + finalServerID);
+         
+         //server id should now be 2
+         
+         assertEquals(2, finalServerID);
+         
+         conn.start();
+         
+         //Now should be able to consume the rest of the messages
+         
+         log.info("here1");
+         
+         TextMessage tm = null;
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         {
+            tm = (TextMessage)cons.receive(500);
+                                    
+            log.info("message is " + tm.getText());
+            
+            assertNotNull(tm);
+            
+            assertEquals("message:" + i, tm.getText());
+         }
+         
+         log.info("here2");
+         
+         //Now should be able to commit them
+         
+         sess.commit();
+         
+         //Now check there are no more messages there
+         sess.close();
+         
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons = sess.createConsumer(queue1);
+         
+         Message m = cons.receive(500);
+         
+         assertNull(m);
+         
+         log.info("got to end of test");
+      }
+      finally
+      {         
+         if (conn != null)
+         {
+            try
+            {
+               conn.close();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+      
+   }
    
    
    




More information about the jboss-cvs-commits mailing list