[jboss-cvs] JBoss Messaging SVN: r1770 - in trunk: . src/etc/server/default/deploy src/etc/xmdesc src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests tests/src/org/jboss/test/messaging/core tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/jms/clustering

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 12 05:50:19 EST 2006


Author: timfox
Date: 2006-12-12 05:49:42 -0500 (Tue, 12 Dec 2006)
New Revision: 1770

Added:
   trunk/jboss-remoting-npe-temp-fix.jar
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedDestinationsTest.java
Removed:
   trunk/jboss-remoting-npe-fix.jar
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
Modified:
   trunk/.classpath
   trunk/src/etc/server/default/deploy/messaging-service.xml
   trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
   trunk/src/main/org/jboss/jms/client/container/HAAspect.java
   trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
   trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
   trunk/src/main/org/jboss/jms/client/state/SessionState.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
   trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
   trunk/src/main/org/jboss/jms/tx/AckInfo.java
   trunk/src/main/org/jboss/messaging/core/Channel.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
   trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
Log:
More HA work



Modified: trunk/.classpath
===================================================================
--- trunk/.classpath	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/.classpath	2006-12-12 10:49:42 UTC (rev 1770)
@@ -11,7 +11,6 @@
 	<classpathentry kind="lib" path="lib/jboss-system.jar"/>
 	<classpathentry kind="lib" path="lib/jboss-transaction.jar"/>
 	<classpathentry kind="lib" path="lib/jnp-client.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
 	<classpathentry kind="lib" path="thirdparty/oswego-concurrent/lib/concurrent.jar"/>
 	<classpathentry kind="lib" path="tests/lib/jboss-common-jdbc-wrapper.jar"/>
 	<classpathentry kind="lib" path="tests/lib/jboss-jca.jar"/>
@@ -41,9 +40,6 @@
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jdk14-pluggable-instrumentor.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jrockit-pluggable-instrumentor.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/pluggable-instrumentor.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-archive-browsing.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common-client.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/namespace.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/jbossxb/lib/jboss-xml-binding.jar"/>
 	<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/backport-util-concurrent.jar"/>
 	<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-runtime.jar"/>
@@ -51,5 +47,8 @@
 	<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/common-logging-log4j/lib/jboss-logging-log4j.jar"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/common-logging-spi/lib/jboss-logging-spi.jar"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/common-core/lib/jboss-common-core.jar"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Deleted: trunk/jboss-remoting-npe-fix.jar
===================================================================
(Binary files differ)

Added: trunk/jboss-remoting-npe-temp-fix.jar
===================================================================
(Binary files differ)


Property changes on: trunk/jboss-remoting-npe-temp-fix.jar
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Modified: trunk/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/messaging-service.xml	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/etc/server/default/deploy/messaging-service.xml	2006-12-12 10:49:42 UTC (rev 1770)
@@ -35,6 +35,8 @@
       </attribute>
       <attribute name="MaxDeliveryAttempts">10</attribute>
       <attribute name="DLQName">DLQ</attribute>
+      <attribute name="FailoverStartTimeout">3000</attribute>
+      <attribute name="FailoverCompleteTimeout">12000</attribute>
    </mbean>
 
 </server>
\ No newline at end of file

Modified: trunk/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- trunk/src/etc/xmdesc/ServerPeer-xmbean.xml	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/etc/xmdesc/ServerPeer-xmbean.xml	2006-12-12 10:49:42 UTC (rev 1770)
@@ -150,7 +150,19 @@
       <description>The JNDI name of the DLQ</description>
       <name>DLQName</name>
       <type>java.lang.String</type>
-   </attribute>     
+   </attribute>  
+   
+   <attribute access="read-write" getMethod="getFailoverStartTimeout" setMethod="setFailoverStartTimeout">
+      <description>The maximum amount of time to wait for failover to begin</description>
+      <name>FailoverStartTimeout</name>
+      <type>long</type>
+   </attribute> 
+   
+   <attribute access="read-write" getMethod="getFailoverCompleteTimeout" setMethod="setFailoverCompleteTimeout">
+      <description>The maximum amount of time to wait for failover to complete</description>
+      <name>FailoverCompleteTimeout</name>
+      <type>long</type>
+   </attribute>      
 
    <!-- Managed operations -->
 

Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -49,6 +49,7 @@
 import org.jboss.jms.client.state.SessionState;
 import org.jboss.jms.destination.JBossDestination;
 import org.jboss.jms.server.endpoint.CreateConnectionResult;
+import org.jboss.jms.tx.AckInfo;
 import org.jboss.logging.Logger;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ConnectionListener;
@@ -328,8 +329,23 @@
          failedSessionDelegate.copyState(newSessionDelegate);
          
          log.info("copied state");
+         
+         //Now we remove any unacked np messages - this is because we don't want to ack them
+         //since the server won't know about them and will barf
+         Iterator iter = failedSessionState.getToAck().iterator();
+         
+         while (iter.hasNext())
+         {
+            AckInfo info = (AckInfo)iter.next();
+            
+            if (!info.getMessage().getMessage().isReliable())
+            {
+               iter.remove();
+            }            
+         }
+         
+         //TODO remove any unacked from the resource manager
 
-
          if (trace) { log.trace("replacing session (" + failedSessionDelegate + ") with a new failover session " + newSessionDelegate); }
 
          //TODO Clebert please add comment as to why this clone is necessary
@@ -359,8 +375,32 @@
                 handleFailoverOnBrowser((BrowserState)sessionChild, newSessionDelegate);
             }
          }
+         
+         /* Now we must sent the list of unacked AckInfos to the server - so the consumers
+          * delivery lists can be repopulated
+          */
+         List ackInfos = null;
+         
+         if (!failedSessionState.isTransacted())
+         {
+            //Get the ack infos from the list in the session state
+            ackInfos = failedSessionState.getToAck();
+         }
+         else
+         {
+            //Transacted session - we need to get the acks
+            //TODO
+         }
+         
+         //TODO for a transacted session the ackinfos will be in the resource manager!!
+         
+         if (!ackInfos.isEmpty())
+         {
+            newSessionDelegate.sendUnackedAckInfos(ackInfos);
+         }
+                  
       }
-      
+            
       //We must not start the connection until the end
       if (failedState.isStarted())
       {
@@ -369,6 +409,8 @@
       
       log.info("Failover done");
    }
+   
+   
 
    private void handleFailoverOnConsumer(ClientConnectionDelegate connectionDelegate,
                                          ConnectionState failedConnectionState,
@@ -387,11 +429,11 @@
 
       ClientConsumerDelegate newConsumerDelegate = (ClientConsumerDelegate)failedSessionDelegate.
          failOverConsumer((JBossDestination)failedConsumerState.getDestination(),
-                                failedConsumerState.getSelector(),
-                                failedConsumerState.isNoLocal(),
-                                failedConsumerState.getSubscriptionName(),
-                                failedConsumerState.isConnectionConsumer(),
-                                failedConsumerDelegate.getChannelId());
+                           failedConsumerState.getSelector(),
+                           failedConsumerState.isNoLocal(),
+                           failedConsumerState.getSubscriptionName(),
+                           failedConsumerState.isConnectionConsumer(),
+                           failedConsumerDelegate.getChannelId());
 
       if (trace) { log.trace("handleFailoverOnConsumer: alternate consumer created"); }
 
@@ -418,6 +460,9 @@
 
       MessageCallbackHandler handler = cm.unregisterHandler(oldServerID, oldConsumerID);
       handler.setConsumerId(failedConsumerState.getConsumerID());
+      
+      //Clear the buffer of the handler
+      handler.clearBuffer();
 
       cm.registerHandler(failedConnectionState.getServerID(),
                          failedConsumerState.getConsumerID(),
@@ -426,8 +471,8 @@
       failedSessionState.addCallbackHandler(handler);
       
       log.info("failed over consumer");
-
    }
+   
 
    private void handleFailoverOnProducer(ProducerState failedProducerState,
                                          ClientSessionDelegate failedSessionDelegate)

Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -441,6 +441,15 @@
       throw new IllegalStateException("This invocation should not be handled here!");
    }
    
+   /**
+    * This invocation should either be handled by the client-side interceptor chain or by the
+    * server-side endpoint.
+    */
+   public void sendUnackedAckInfos(List ackInfos) throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+   
 
    // Public --------------------------------------------------------
 
@@ -457,6 +466,7 @@
       return ((ConnectionState)state.getParent()).getRemotingConnection().getInvokingClient();
    }
 
+
    // Package Private -----------------------------------------------
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/remoting/CallbackManager.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -94,6 +94,8 @@
 
    public void registerHandler(int serverID, int consumerID, MessageCallbackHandler handler)
    {
+      log.info(this + " registeringHandler, serverID:" + serverID + " consumerID:" + consumerID);
+      
       Long lookup = computeLookup(serverID, consumerID);
 
       callbackHandlers.put(lookup, handler);
@@ -101,6 +103,8 @@
 
    public MessageCallbackHandler unregisterHandler(int serverID, int consumerID)
    {
+      log.info(this + " unregisteringHandler, serverID:" + serverID + " consumerID:" + consumerID);
+      
       Long lookup = computeLookup(serverID, consumerID);
 
       return (MessageCallbackHandler)callbackHandlers.remove(lookup);

Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -316,6 +316,9 @@
          // otherwise the messages wouldn't get cancelled until the corresponding session died.
          // So if another consumer in another session tried to consume from the channel before that
          // session died it wouldn't receive those messages.
+         // We can't just cancel all the messages in the SCE since some of those messages might
+         // have actually been delivered (unlike these) and we may want to acknowledge them
+         // later, after this consumer has been closed
 
          List ackInfos = new ArrayList();
 
@@ -514,6 +517,11 @@
       }
    }
    
+   public void clearBuffer()
+   {
+      buffer.clear();
+   }
+   
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -48,7 +48,6 @@
  */
 public class SessionState extends HierarchicalStateSupport
 {
-
    protected static Logger log = Logger.getLogger(SessionState.class);
 
    private int acknowledgeMode;
@@ -205,16 +204,5 @@
    {
       return new ArrayList(callbackHandlers.values());
    }
-
-   /*** used for HA Handling */
-   public void cleanCallBackHandlers()
-   {
-       if (log.isTraceEnabled())
-       {
-           log.trace("Clearing callBackHandlers size=" + callbackHandlers.size());
-       }
-       callbackHandlers.clear();
-   }
-
 }
 

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -111,6 +111,10 @@
    private String dlqName;
 
    private Object failoverStatusLock;
+   
+   private long failoverStartTimeout = 3000;
+   
+   private long failoverCompleteTimeout = 12000;
       
    // wired components
 
@@ -135,9 +139,6 @@
    protected ObjectName postOfficeObjectName;
    protected PostOffice postOffice;
 
-//   protected ObjectName topicPostOfficeObjectName;
-//   protected PostOffice topicPostOffice;
-
    protected ObjectName jmsUserManagerObjectName;
    protected JMSUserManager jmsUserManager;
 
@@ -352,16 +353,6 @@
       postOfficeObjectName = on;
    }
 
-//   public ObjectName getTopicPostOffice()
-//   {
-//      return topicPostOfficeObjectName;
-//   }
-//
-//   public void setTopicPostOffice(ObjectName on)
-//   {
-//      topicPostOfficeObjectName = on;
-//   }
-
    public ObjectName getJmsUserManager()
    {
       return jmsUserManagerObjectName;
@@ -478,6 +469,28 @@
    {
       this.queuedExecutorPoolSize = poolSize;
    }
+   
+   public long getFailoverStartTimeout()
+   {
+      return this.failoverStartTimeout;
+   }
+   
+   public void setFailoverStartTimeout(long timeout)
+   {
+      this.failoverStartTimeout = timeout;
+   }
+   
+   public long getFailoverCompleteTimeout()
+   {
+      return this.failoverCompleteTimeout;
+   }
+   
+   public void setFailoverCompleteTimeout(long timeout)
+   {
+      this.failoverCompleteTimeout = timeout;
+   }
+   
+   
 
    // JMX Operations ------------------------------------------------
 
@@ -729,14 +742,11 @@
       
       Replicator replicator = getReplicator();
       
-      //TODO - these must be configurable
-      final long FAILOVER_START_TIMEOUT = 15000;
+      //Failover
+
+      long startToWait = failoverStartTimeout;
       
-      final long FAILOVER_COMPLETE_TIMEOUT = 25000;
-      
-      long startToWait = FAILOVER_START_TIMEOUT;
-      
-      long completeToWait = FAILOVER_COMPLETE_TIMEOUT;
+      long completeToWait = failoverCompleteTimeout;
                      
       //Must lock here
       synchronized (failoverStatusLock)
@@ -1104,5 +1114,4 @@
          }
       }      
    }
-
 }

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerBrowserEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -31,9 +31,9 @@
 import org.jboss.jms.selector.Selector;
 import org.jboss.jms.server.remoting.JMSDispatcher;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
 import org.jboss.messaging.core.Filter;
 import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
 
 /**
  * Concrete implementation of BrowserEndpoint.
@@ -66,7 +66,7 @@
    // Constructors --------------------------------------------------
 
    protected ServerBrowserEndpoint(ServerSessionEndpoint session, int id,
-                                   PagingFilteredQueue destination, String messageSelector)
+                                   Channel destination, String messageSelector)
       throws JMSException
    {     
       this.session = session;

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -42,6 +42,7 @@
 import org.jboss.jms.server.remoting.MessagingMarshallable;
 import org.jboss.jms.util.ExceptionUtil;
 import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Channel;
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.MessageReference;
@@ -49,7 +50,6 @@
 import org.jboss.messaging.core.Receiver;
 import org.jboss.messaging.core.Routable;
 import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.local.PagingFilteredQueue;
 import org.jboss.messaging.core.plugin.contract.PostOffice;
 import org.jboss.messaging.core.plugin.postoffice.Binding;
 import org.jboss.messaging.core.tx.Transaction;
@@ -87,7 +87,7 @@
 
    private int id;
 
-   private PagingFilteredQueue messageQueue;
+   private Channel messageQueue;
    
    private String queueName;
 
@@ -131,7 +131,7 @@
    
    // Constructors --------------------------------------------------
 
-   protected ServerConsumerEndpoint(int id, PagingFilteredQueue messageQueue, String queueName,
+   protected ServerConsumerEndpoint(int id, Channel messageQueue, String queueName,
                                     ServerSessionEndpoint sessionEndpoint,
                                     String selector, boolean noLocal, JBossDestination dest,
                                     int prefetchSize, Queue dlq)
@@ -633,7 +633,24 @@
       }
       
    }
-   
+
+   protected void createDeliveries(List messageIds) throws Throwable
+   {
+      List dels = messageQueue.createDeliveries(messageIds);
+            
+      synchronized (lock)
+      {      
+         Iterator iter = dels.iterator();
+         
+         while (iter.hasNext())
+         {
+            Delivery del = (Delivery)iter.next();
+            
+            deliveries.put(new Long(del.getReference().getMessageID()), del);
+         }
+      }
+   }
+
    protected void cancelDelivery(Long messageID, int deliveryCount) throws Throwable
    {
       Delivery del = (Delivery)deliveries.remove(messageID);
@@ -653,7 +670,7 @@
          throw new IllegalStateException("Cannot find delivery to cancel:" + id);
       }
    }
-               
+           
    protected void start()
    {             
       synchronized (lock)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -27,6 +27,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -186,11 +187,11 @@
          int prefetchSize = connectionEndpoint.getPrefetchSize();
          
          ServerConsumerEndpoint ep =
-            new ServerConsumerEndpoint(consumerID, (PagingFilteredQueue)binding.getQueue(),
+
+            new ServerConsumerEndpoint(consumerID, binding.getQueue(),
                                        binding.getQueue().getName(), this, selectorString, noLocal,
                                        jmsDestination, prefetchSize, dlq);
 
-         
          JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
          
          ClientConsumerDelegate stub =
@@ -755,6 +756,56 @@
          throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDeliveries");
       }
    }
+   
+   public void sendUnackedAckInfos(List ackInfos) throws JMSException
+   {
+      try
+      {
+         //Sort into different list for each consumer
+         Map ackMap = new HashMap();
+                  
+         for (int i = ackInfos.size() - 1; i >= 0; i--)
+         {
+            AckInfo ack = (AckInfo)ackInfos.get(i);
+            
+            ServerConsumerEndpoint consumer =
+               this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+   
+            if (consumer == null)
+            {
+               throw new IllegalArgumentException("Cannot find consumer id: " + ack.getConsumerID());
+            }
+            
+            LinkedList acks = (LinkedList)ackMap.get(consumer);
+            
+            if (acks == null)
+            {
+               acks = new LinkedList();
+               
+               ackMap.put(consumer, acks);
+            }
+            
+            acks.addFirst(new Long(ack.getMessageID()));
+         }  
+         
+         Iterator iter = ackMap.entrySet().iterator();
+         
+         while (iter.hasNext())
+         {
+            Map.Entry entry = (Map.Entry)iter.next();
+            
+            ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)entry.getKey();
+            
+            List acks = (List)entry.getValue();
+            
+            consumer.createDeliveries(acks);
+         }
+      }
+      catch (Throwable t)
+      {
+         throw ExceptionUtil.handleJMSInvocation(t, this + " sendUnackedAckInfos");
+      }
+   }
 
    public void addTemporaryDestination(JBossDestination dest) throws JMSException
    {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -120,5 +120,14 @@
     * @param ackInfos
     */
    void cancelDeliveries(List ackInfos) throws JMSException;
+   
+   
+   /**
+    * Send a list of unacked ackInfos to the server so the delivery lists can be repopulated
+    * used at failover
+    * @param ackInfos
+    * @throws JMSException
+    */
+   void sendUnackedAckInfos(List ackInfos) throws JMSException;
 }
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -147,6 +147,11 @@
    {
       endpoint.cancelDeliveries(ackInfos);
    }
+   
+   public void sendUnackedAckInfos(List ackInfos) throws JMSException
+   {
+      endpoint.sendUnackedAckInfos(ackInfos);
+   }
 
 
    // AdvisedSupport overrides --------------------------------------
@@ -161,6 +166,7 @@
       return "SessionAdvised->" + endpoint;
    }
 
+
    // Public --------------------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -37,8 +37,8 @@
 
 import org.jboss.aop.Dispatcher;
 import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.client.remoting.HandleMessageResponse;
-import org.jboss.jms.client.remoting.CallbackManager;
 import org.jboss.jms.message.JBossMessage;
 import org.jboss.jms.server.ServerPeer;
 import org.jboss.jms.server.Version;
@@ -95,6 +95,7 @@
    protected static final byte MORE = 5;
    protected static final byte SEND_TRANSACTION = 6;
    protected static final byte GET_ID_BLOCK = 7;
+   protected static final byte UNACKED_ACKINFOS = 8;
  
 
    // The response codes - start from 100
@@ -312,6 +313,28 @@
    
                   if (trace) { log.trace("wrote cancelDeliveries()"); }
                }
+               else if ("sendUnackedAckInfos".equals(methodName) && mi.getArguments() != null)
+               {
+                  dos.writeByte(UNACKED_ACKINFOS);
+   
+                  writeHeader(mi, dos);
+   
+                  List ids = (List)mi.getArguments()[0];
+   
+                  dos.writeInt(ids.size());
+   
+                  Iterator iter = ids.iterator();
+   
+                  while (iter.hasNext())
+                  {
+                     AckInfo ack = (AckInfo)iter.next();
+                     ack.write(dos);
+                  }
+   
+                  dos.flush();
+   
+                  if (trace) { log.trace("wrote sendUnackedAckInfos()"); }
+               }
                else
                {
                   dos.write(SERIALIZED);
@@ -700,6 +723,35 @@
    
                return request;
             }
+            case UNACKED_ACKINFOS:
+            {
+               MethodInvocation mi = readHeader(dis);
+   
+               int size = dis.readInt();
+   
+               List acks = new ArrayList(size);
+   
+               for (int i = 0; i < size; i++)
+               {
+                  AckInfo ack = new AckInfo();
+                  
+                  ack.read(dis);
+                  
+                  acks.add(ack);
+               }
+   
+               Object[] args = new Object[] {acks};
+   
+               mi.setArguments(args);
+   
+               InvocationRequest request =
+                  new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+                                        new MessagingMarshallable(version, mi), null, null, null);
+   
+               if (trace) { log.trace("read unackedAckInfos()"); }
+   
+               return request;
+            }
             case ID_BLOCK_RESPONSE:
             {
                IdBlock block = new IdBlock();

Modified: trunk/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/AckInfo.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/jms/tx/AckInfo.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -86,7 +86,7 @@
    /** Used to change ack's id during failover */
    public void setConsumerID(int consumerID)
    {
-       this.consumerID=consumerID;
+       this.consumerID = consumerID;
    }
    
    public MessageProxy getMessage()

Modified: trunk/src/main/org/jboss/messaging/core/Channel.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/Channel.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/messaging/core/Channel.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -145,6 +145,8 @@
    void deactivate();
    
    boolean isActive();
+   
+   List createDeliveries(List messageIds);
 
 }
 

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -491,6 +491,66 @@
          }
       }
    }
+   
+   public List createDeliveries(List messageIds)
+   {
+      //debug
+      Iterator iter = messageIds.iterator();
+      
+      log.info("***** createdeliveries");
+      while (iter.hasNext())
+      {
+         Long l = (Long)iter.next();
+         
+         log.info("Creating delivery for " + l);
+      }
+      log.info("**** end dump");
+      
+      iter = messageIds.iterator();
+      
+      List dels = new ArrayList();
+      
+      synchronized (refLock)
+      {
+         synchronized (deliveryLock)
+         {
+            ListIterator liter = messageRefs.iterator();
+                              
+            while (iter.hasNext())
+            {
+               Long id = (Long)iter.next();
+               
+               //Scan the queue
+               while (true)
+               {               
+                  if (!liter.hasNext())
+                  {
+                     // TODO we need to look in paging state too - currently not supported
+                     
+                     throw new IllegalStateException("Cannot find ref in queue! (Might be paged!)");
+                  }
+                  
+                  MessageReference ref = (MessageReference)liter.next();
+                  
+                  if (ref.getMessageID() == id.longValue())
+                  {
+                     liter.remove();
+                     
+                     Delivery del = new SimpleDelivery(this, ref);
+                     
+                     dels.add(del);
+                                    
+                     this.deliveries.add(del);
+                     
+                     break;
+                  }
+               }
+            }  
+         }
+      }
+            
+      return dels;
+   }
 
    // Public --------------------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/RemoteQueueStub.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -272,4 +272,9 @@
    {
       return "RemoteQueueStub(node=" + this.nodeId + " name=" + this.name + " channelId=" + this.id + ")";
    }
+
+   public List createDeliveries(List messageIds)
+   {
+      throw new UnsupportedOperationException();
+   }
 }

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/build.xml	2006-12-12 10:49:42 UTC (rev 1770)
@@ -745,7 +745,7 @@
                     haltonerror="${junit.batchtest.haltonerror}">
             <formatter type="plain" usefile="${junit.formatter.usefile}"/>
             <fileset dir="${build.tests.classes}">
-               <include name="**/jms/clustering/*Test.class"/>
+               <include name="**/jms/clustering/HATest.class"/>
                <!--
                <include name="**/jms/clustering/SimpleClusteringTest.class"/>
                -->

Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleChannel.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -216,6 +216,11 @@
       throw new UnsupportedOperationException();
    }
 
+   public List createDeliveries(List messageIds)
+   {
+      throw new UnsupportedOperationException();
+   }
+
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultMessagePullPolicyTest.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -369,6 +369,12 @@
          // TODO Auto-generated method stub
          return false;
       }
+
+      public List createDeliveries(List messageIds)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
       
    }
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultRouterTest.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+
 import org.jboss.messaging.core.Delivery;
 import org.jboss.messaging.core.DeliveryObserver;
 import org.jboss.messaging.core.Filter;
@@ -608,6 +609,12 @@
          // TODO Auto-generated method stub
          return false;
       }
+
+      public List createDeliveries(List messageIds)
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
       
    }
    

Copied: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedDestinationsTest.java (from rev 1767, trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java)
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-12-12 02:16:50 UTC (rev 1767)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedDestinationsTest.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -0,0 +1,1023 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+
+/**
+ * 
+ * A DistributedDestinationsTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DistributedDestinationsTest extends ClusteringTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public DistributedDestinationsTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+
+   public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredQueueLocalConsumer(false);
+   }
+
+   public void testClusteredQueueLocalConsumerPersistent() throws Exception
+   {
+      clusteredQueueLocalConsumer(true);
+   }
+
+   public void testClusteredTopicNonDurableNonPersistent() throws Exception
+   {
+      clusteredTopicNonDurable(false);
+   }
+
+   public void testClusteredTopicNonDurablePersistent() throws Exception
+   {
+      clusteredTopicNonDurable(true);
+   }
+
+   public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
+   {
+      clusteredTopicNonDurableWithSelectors(false);
+   }
+
+   public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
+   {
+      clusteredTopicNonDurableWithSelectors(true);
+   }
+
+   public void testClusteredTopicDurableNonPersistent() throws Exception
+   {
+      clusteredTopicDurable(false);
+   }
+
+   public void testClusteredTopicDurablePersistent() throws Exception
+   {
+      clusteredTopicDurable(true);
+   }
+
+   public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableLocalConsumer(false);
+   }
+
+   public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableLocalConsumer(true);
+   }
+
+   public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalSub(false);
+   }
+
+   public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
+   {
+      clusteredTopicSharedDurableNoLocalSub(true);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+  
+
+   /*
+    * Create a consumer on each queue on each node.
+    * Send messages in turn from all nodes.
+    * Ensure that the local consumer gets the message
+    */
+   protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+      Connection conn3 = null;
+
+      try
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         conn3 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         log.info("Created sessions");
+
+         MessageConsumer cons1 = sess1.createConsumer(queue0);
+         MessageConsumer cons2 = sess2.createConsumer(queue1);
+         MessageConsumer cons3 = sess3.createConsumer(queue2);
+         
+         log.info("Created consumers");
+
+         conn1.start();
+         conn2.start();
+         conn3.start();
+
+         // Send at node 0
+
+         MessageProducer prod = sess1.createProducer(queue0);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+         
+         log.info("Sent messages");
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         Message m = cons2.receive(2000);
+
+         assertNull(m);
+
+         m = cons3.receive(2000);
+
+         assertNull(m);
+
+         // Send at node 1
+
+         MessageProducer prod1 = sess2.createProducer(queue1);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+
+            prod1.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons3.receive(2000);
+
+         assertNull(m);
+
+         // Send at node 2
+
+         MessageProducer prod2 = sess3.createProducer(queue2);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
+
+            prod2.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+      }
+   }
+
+   // Private -------------------------------------------------------
+
+   /*
+    * Create non durable subscriptions on all nodes of the cluster.
+    * Ensure all messages are receive as appropriate
+    */
+   private void clusteredTopicNonDurable(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+      Connection conn3 = null;
+      try
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         conn3 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(topic0);
+         MessageConsumer cons2 = sess2.createConsumer(topic1);
+         MessageConsumer cons3 = sess3.createConsumer(topic2);
+
+         MessageConsumer cons4 = sess1.createConsumer(topic0);
+
+         MessageConsumer cons5 = sess2.createConsumer(topic1);
+
+         conn1.start();
+         conn2.start();
+         conn3.start();
+
+         // Send at node 0
+
+         MessageProducer prod = sess1.createProducer(topic0);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons4.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons5.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+      }
+   }
+
+   /*
+    * Create non durable subscriptions on all nodes of the cluster.
+    * Include some with selectors
+    * Ensure all messages are receive as appropriate
+    */
+   private void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+      Connection conn3 = null;
+
+      try
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         conn3 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         MessageConsumer cons1 = sess1.createConsumer(topic0);
+         MessageConsumer cons2 = sess2.createConsumer(topic1);
+         MessageConsumer cons3 = sess3.createConsumer(topic2);
+
+         MessageConsumer cons4 = sess1.createConsumer(topic0, "COLOUR='red'");
+
+         MessageConsumer cons5 = sess2.createConsumer(topic1, "COLOUR='blue'");
+
+         conn1.start();
+         conn2.start();
+         conn3.start();
+
+         // Send at node 0
+
+         MessageProducer prod = sess1.createProducer(topic0);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            int c = i % 3;
+            if (c == 0)
+            {
+               tm.setStringProperty("COLOUR", "red");
+            }
+            else if (c == 1)
+            {
+               tm.setStringProperty("COLOUR", "blue");
+            }
+
+            prod.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            int c = i % 3;
+
+            if (c == 0)
+            {
+               TextMessage tm = (TextMessage)cons4.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("message" + i, tm.getText());
+            }
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            int c = i % 3;
+
+            if (c == 1)
+            {
+               TextMessage tm = (TextMessage)cons5.receive(1000);
+
+               assertNotNull(tm);
+
+               assertEquals("message" + i, tm.getText());
+            }
+         }
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+      }
+   }
+
+
+
+   /*
+    * Create durable subscriptions on all nodes of the cluster.
+    * Include a couple with selectors
+    * Ensure all messages are receive as appropriate
+    * None of the durable subs are shared
+    */
+   private void clusteredTopicDurable(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+      Connection conn3 = null;
+      try
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         conn3 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn1, conn2, conn3);
+
+         conn1.setClientID("wib1");
+         conn2.setClientID("wib1");
+         conn3.setClientID("wib1");
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         try
+         {
+            sess1.unsubscribe("sub");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess2.unsubscribe("sub1");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess3.unsubscribe("sub2");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess1.unsubscribe("sub3");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess2.unsubscribe("sub4");
+         }
+         catch (Exception ignore) {}
+
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub1");
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub2");
+         MessageConsumer cons4 = sess1.createDurableSubscriber(topic0, "sub3");
+         MessageConsumer cons5 = sess2.createDurableSubscriber(topic1, "sub4");
+
+         conn1.start();
+         conn2.start();
+         conn3.start();
+
+         // Send at node 0
+
+         MessageProducer prod = sess1.createProducer(topic0);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons4.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons5.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         cons1.close();
+         cons2.close();
+         cons3.close();
+         cons4.close();
+         cons5.close();
+
+         sess1.unsubscribe("sub");
+         sess2.unsubscribe("sub1");
+         sess3.unsubscribe("sub2");
+         sess1.unsubscribe("sub3");
+         sess2.unsubscribe("sub4");
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+      }
+   }
+
+
+
+
+   /*
+    * Create shared durable subs on multiple nodes, the local instance should always get the message
+    */
+   protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+      Connection conn3 = null;
+      try
+
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         conn3 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn1, conn2, conn3);
+         conn1.setClientID("wib1");
+         conn2.setClientID("wib1");
+         conn3.setClientID("wib1");
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         try
+         {
+            sess1.unsubscribe("sub");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess2.unsubscribe("sub");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess3.unsubscribe("sub");
+         }
+         catch (Exception ignore) {}
+
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub");
+         MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub");
+
+         conn1.start();
+         conn2.start();
+         conn3.start();
+
+         // Send at node 0
+
+         MessageProducer prod = sess1.createProducer(topic0);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         Message m = cons2.receive(2000);
+
+         assertNull(m);
+
+         m = cons3.receive(2000);
+
+         assertNull(m);
+
+         // Send at node 1
+
+         MessageProducer prod1 = sess2.createProducer(topic1);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
+
+            prod1.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons3.receive(2000);
+
+         assertNull(m);
+
+         // Send at node 2
+
+         MessageProducer prod2 = sess3.createProducer(topic2);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess3.createTextMessage("message" + i);
+
+            prod2.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons3.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i, tm.getText());
+         }
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+
+         cons1.close();
+         cons2.close();
+         cons3.close();
+
+         // Need to unsubscribe on any node that the durable sub was created on
+
+         sess1.unsubscribe("sub");
+         sess2.unsubscribe("sub");
+         sess3.unsubscribe("sub");
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+      }
+   }
+
+
+
+   /*
+    * Create shared durable subs on multiple nodes, but without sub on local node
+    * should round robin
+    * note that this test assumes round robin
+    */
+   protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
+   {
+      Connection conn1 = null;
+      Connection conn2 = null;
+      Connection conn3 = null;
+
+      try
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         conn3 = cf.createConnection();
+         
+         log.info("Created connections");
+         
+         checkConnectionsDifferentServers(conn1, conn2, conn3);
+         
+         conn2.setClientID("wib1");
+         conn3.setClientID("wib1");
+
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         try
+         {
+            sess2.unsubscribe("sub");
+         }
+         catch (Exception ignore) {}
+         try
+         {
+            sess3.unsubscribe("sub");
+         }
+         catch (Exception ignore) {}
+
+         MessageConsumer cons1 = sess2.createDurableSubscriber(topic1, "sub");
+         MessageConsumer cons2 = sess3.createDurableSubscriber(topic2, "sub");
+
+         conn2.start();
+         conn3.start();
+
+         // Send at node 0
+
+         //Should round robin between the other 2 since there is no active consumer on sub  on node 0
+
+         MessageProducer prod = sess1.createProducer(topic0);
+
+         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + i * 2, tm.getText());
+         }
+
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message" + (i * 2 + 1), tm.getText());
+         }
+
+         cons1.close();
+         cons2.close();
+
+         sess2.unsubscribe("sub");
+         sess3.unsubscribe("sub");
+
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+
+         if (conn3 != null)
+         {
+            conn3.close();
+         }
+      }
+   }
+
+   class MyListener implements MessageListener
+   {
+      private int i;
+
+      MyListener(int i)
+      {
+         this.i = i;
+      }
+
+      public void onMessage(Message m)
+      {
+         try
+         {
+            int count = m.getIntProperty("count");
+
+            log.info("Listener " + i + " received message " + count);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+      }
+
+   }
+
+
+   // Inner classes -------------------------------------------------
+
+   
+
+}

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/HATest.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -26,6 +26,7 @@
 import java.util.Set;
 
 import javax.jms.Connection;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -36,7 +37,6 @@
 import org.jboss.jms.client.delegate.ClientConnectionDelegate;
 import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
 import org.jboss.jms.client.delegate.ClusteredClientConnectionFactoryDelegate;
-import org.jboss.jms.client.delegate.DelegateSupport;
 import org.jboss.jms.client.state.ConnectionState;
 import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
 import org.jboss.test.messaging.tools.ServerManagement;
@@ -69,355 +69,490 @@
    }
    
    // Public --------------------------------------------------------
+//   
+//   /*
+//    * Test that connections created using a clustered connection factory are created round robin on
+//    * different servers
+//    */
+//   public void testRoundRobinConnectionCreation() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//      
+//      ClusteredClientConnectionFactoryDelegate delegate =
+//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//      
+//      log.info ("number of delegates = " + delegate.getDelegates().length);
+//      log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
+//      
+//      assertEquals(3, delegate.getDelegates().length);
+//      
+//      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//      
+//      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//      
+//      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//      
+//      assertEquals(0, cf1.getServerId());
+//      
+//      assertEquals(1, cf2.getServerId());
+//      
+//      assertEquals(2, cf3.getServerId());
+//      
+//      assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//      
+//      Connection conn1 = null;
+//      
+//      Connection conn2 = null;
+//      
+//      Connection conn3 = null;
+//      
+//      Connection conn4 = null;
+//      
+//      Connection conn5 = null;
+//      
+//      try
+//      {         
+//         conn1 = factory.createConnection();
+//         
+//         conn2 = factory.createConnection();
+//         
+//         conn3 = factory.createConnection();
+//         
+//         conn4 = factory.createConnection();
+//         
+//         conn5 = factory.createConnection();
+//         
+//         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
+//         
+//         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
+//         
+//         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
+//         
+//         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
+//         
+//         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
+//         
+//         int serverID1 = state1.getServerID();
+//         
+//         int serverID2 = state2.getServerID();
+//         
+//         int serverID3 = state3.getServerID();
+//         
+//         int serverID4 = state4.getServerID();
+//         
+//         int serverID5 = state5.getServerID();
+//         
+//         log.info("server id 1: " + serverID1);
+//         
+//         log.info("server id 2: " + serverID2);
+//         
+//         log.info("server id 3: " + serverID3);
+//         
+//         log.info("server id 4: " + serverID4);
+//         
+//         log.info("server id 5: " + serverID5);
+//         
+//         assertEquals(0, serverID1);
+//         
+//         assertEquals(1, serverID2);
+//         
+//         assertEquals(2, serverID3);
+//         
+//         assertEquals(0, serverID4);
+//         
+//         assertEquals(1, serverID5);
+//      }
+//      finally
+//      {
+//         if (conn1 != null)
+//         {
+//            conn1.close();
+//         }
+//         
+//         if (conn2 != null)
+//         {
+//            conn2.close();
+//         }
+//         
+//         if (conn3 != null)
+//         {
+//            conn3.close();
+//         }
+//         
+//         if (conn4 != null)
+//         {
+//            conn4.close();
+//         }
+//         
+//         if (conn5 != null)
+//         {
+//            conn5.close();
+//         }
+//      }
+//      
+//   }
+// 
+//   /*
+//    * Test that the failover mapping is created correctly and updated properly when nodes leave
+//    * or join
+//    */
+//   public void testDefaultFailoverMap() throws Exception
+//   {     
+//      {
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//         
+//         //The order here depends on the order the servers were started in
+//         
+//         //If any servers get stopped and then started then the order will change
+//    
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         log.info("cf3 serverid=" + cf3.getServerId());
+//         
+//         
+//         assertEquals(0, cf1.getServerId());
+//         
+//         assertEquals(1, cf2.getServerId());
+//         
+//         assertEquals(2, cf3.getServerId());
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         assertEquals(3, delegates.length);
+//         
+//         assertEquals(3, failoverMap.size());
+//         
+//         // Default failover policy just chooses the node to the right
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+//      }
+//      
+//      //Now cleanly stop one of the servers
+//            
+//      log.info("************** STOPPING SERVER 0");
+//      ServerManagement.stop(0);
+//      
+//      log.info("server stopped");
+//      
+//      assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
+//      
+//      {         
+//         //Lookup another connection factory
+//         
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//         
+//         log.info("Got connection factory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         log.info("Got failover map");
+//         
+//         assertEquals(2, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         //Order here depends on order servers were started in
+//         
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         assertEquals(1, cf1.getServerId());
+//         
+//         assertEquals(2, cf2.getServerId());
+//         
+//         
+//         assertEquals(2, failoverMap.size());
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//      }
+//      
+//      //Cleanly stop another server
+//      
+//      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
+//      
+//      ServerManagement.stop(1);
+//      
+//      assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
+//      
+//      {         
+//         //Lookup another connection factory
+//         
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         assertEquals(1, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         assertEquals(2, cf1.getServerId());
+//         
+//         
+//         assertEquals(1, failoverMap.size());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//      }
+//            
+//      //Restart server 0
+//      
+//      ServerManagement.start("all", 0);
+//      
+//      {
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//         
+//         log.info("Got connection factory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         log.info("Got failover map");
+//         
+//         assertEquals(2, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         assertEquals(2, cf1.getServerId());
+//         
+//         assertEquals(0, cf2.getServerId());
+//         
+//         
+//         assertEquals(2, failoverMap.size());
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//      }
+//      
+//      
+//      //Restart server 1
+//      
+//      ServerManagement.start("all", 1);
+//      
+//      {
+//         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
+//         
+//         log.info("Got connection factory");
+//         
+//         ClusteredClientConnectionFactoryDelegate delegate =
+//            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//         
+//         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//         
+//         Map failoverMap = delegate.getFailoverMap();
+//         
+//         log.info("Got failover map");
+//         
+//         assertEquals(3, delegates.length);
+//         
+//         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
+//         
+//         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
+//         
+//         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
+//         
+//         log.info("cf1 serverid=" + cf1.getServerId());
+//         
+//         log.info("cf2 serverid=" + cf2.getServerId());
+//         
+//         log.info("cf3 serverid=" + cf3.getServerId());
+//         
+//         assertEquals(2, cf1.getServerId());
+//         
+//         assertEquals(0, cf2.getServerId());
+//         
+//         assertEquals(1, cf3.getServerId());
+//         
+//         
+//         assertEquals(3, failoverMap.size());
+//         
+//         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
+//         
+//         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
+//         
+//         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
+//      }            
+//   }
+//   
+//   public void testSimpleFailover() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//      
+//      ClusteredClientConnectionFactoryDelegate delegate =
+//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+//      assertEquals(3, nodeIDView.size());
+//      
+//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//      
+//      ClientConnectionFactoryDelegate cf1 = delegates[0];
+//      
+//      ClientConnectionFactoryDelegate cf2 = delegates[1];
+//      
+//      ClientConnectionFactoryDelegate cf3 = delegates[2];
+//      
+//      int server0Id = cf1.getServerId();
+//      
+//      int server1Id = cf2.getServerId();
+//      
+//      int server2Id = cf3.getServerId();
+//      
+//      log.info("server 0 id: " + server0Id);
+//      
+//      log.info("server 1 id: " + server1Id);
+//      
+//      log.info("server 2 id: " + server2Id);
+//                  
+//      Map failoverMap = delegate.getFailoverMap();
+//      
+//      log.info(failoverMap.get(new Integer(server0Id)));
+//      log.info(failoverMap.get(new Integer(server1Id)));
+//      log.info(failoverMap.get(new Integer(server2Id)));
+//      
+//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//      
+//      // server 1 should failover onto server 2
+//      
+//      assertEquals(server2Id, server1FailoverId);
+//      
+//      Connection conn = null;
+//      
+//      try
+//      {
+//      
+//         //Get a connection on server 1
+//         conn = factory.createConnection(); //connection on server 0
+//         
+//         conn.close();
+//         
+//         conn = factory.createConnection(); //connection on server 1
+//         
+//         JBossConnection jbc = (JBossConnection)conn;
+//         
+//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//         
+//         ConnectionState state = (ConnectionState)del.getState();
+//         
+//         int initialServerID = state.getServerID();
+//         
+//         assertEquals(1, initialServerID);
+//                           
+//         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         
+//         MessageProducer prod = sess.createProducer(queue1);
+//         
+//         MessageConsumer cons = sess.createConsumer(queue1);
+//         
+//         final int NUM_MESSAGES = 100;
+//         
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = sess.createTextMessage("message:" + i);
+//            
+//            prod.send(tm);
+//         }
+//         
+//         //So now, messages should be in queue1 on server 1
+//         //So we now kill server 1
+//         //Which should cause transparent failover of connection conn onto server 1
+//         
+//         log.info("************ KILLING (CRASHING) SERVER 1");
+//         
+//         ServerManagement.getServer(1).destroy();
+//         
+//         log.info("killed server, now waiting");
+//         
+//         Thread.sleep(5000);
+//         
+//         log.info("done wait");
+//         
+//         state = (ConnectionState)del.getState();
+//         
+//         int finalServerID = state.getServerID();
+//         
+//         log.info("final server id= " + finalServerID);
+//         
+//         //server id should now be 2
+//         
+//         assertEquals(2, finalServerID);
+//         
+//         conn.start();
+//         
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = (TextMessage)cons.receive(1000);
+//            
+//            log.info("message is " + tm);
+//            
+//            assertNotNull(tm);
+//            
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//         log.info("done");
+//      }
+//      finally
+//      {         
+//         if (conn != null)
+//         {
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception e)
+//            {
+//               e.printStackTrace();
+//            }
+//         }
+//      }
+//      
+//   }
    
-   /*
-    * Test that connections created using a clustered connection factory are created round robin on
-    * different servers
-    */
-   public void testRoundRobinConnectionCreation() throws Exception
-   {
-      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-      
-      ClusteredClientConnectionFactoryDelegate delegate =
-         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-      
-      log.info ("number of delegates = " + delegate.getDelegates().length);
-      log.info ("number of servers = " + ServerManagement.getServer(0).getNodeIDView().size());
-      
-      assertEquals(3, delegate.getDelegates().length);
-      
-      ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-      
-      ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-      
-      ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-      
-      assertEquals(0, cf1.getServerId());
-      
-      assertEquals(1, cf2.getServerId());
-      
-      assertEquals(2, cf3.getServerId());
-      
-      assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-      
-      Connection conn1 = null;
-      
-      Connection conn2 = null;
-      
-      Connection conn3 = null;
-      
-      Connection conn4 = null;
-      
-      Connection conn5 = null;
-      
-      try
-      {         
-         conn1 = factory.createConnection();
-         
-         conn2 = factory.createConnection();
-         
-         conn3 = factory.createConnection();
-         
-         conn4 = factory.createConnection();
-         
-         conn5 = factory.createConnection();
-         
-         ConnectionState state1 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn1).getDelegate()).getState());
-         
-         ConnectionState state2 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn2).getDelegate()).getState());
-         
-         ConnectionState state3 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn3).getDelegate()).getState());
-         
-         ConnectionState state4 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn4).getDelegate()).getState());
-         
-         ConnectionState state5 = (ConnectionState)(((DelegateSupport)((JBossConnection)conn5).getDelegate()).getState());
-         
-         int serverID1 = state1.getServerID();
-         
-         int serverID2 = state2.getServerID();
-         
-         int serverID3 = state3.getServerID();
-         
-         int serverID4 = state4.getServerID();
-         
-         int serverID5 = state5.getServerID();
-         
-         log.info("server id 1: " + serverID1);
-         
-         log.info("server id 2: " + serverID2);
-         
-         log.info("server id 3: " + serverID3);
-         
-         log.info("server id 4: " + serverID4);
-         
-         log.info("server id 5: " + serverID5);
-         
-         assertEquals(0, serverID1);
-         
-         assertEquals(1, serverID2);
-         
-         assertEquals(2, serverID3);
-         
-         assertEquals(0, serverID4);
-         
-         assertEquals(1, serverID5);
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-         
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-         
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-         
-         if (conn4 != null)
-         {
-            conn4.close();
-         }
-         
-         if (conn5 != null)
-         {
-            conn5.close();
-         }
-      }
-      
-   }
- 
-   /*
-    * Test that the failover mapping is created correctly and updated properly when nodes leave
-    * or join
-    */
-   public void testDefaultFailoverMap() throws Exception
-   {     
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         assertEquals(3, ServerManagement.getServer(0).getNodeIDView().size());
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-         
-         //The order here depends on the order the servers were started in
-         
-         //If any servers get stopped and then started then the order will change
-    
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         log.info("cf3 serverid=" + cf3.getServerId());
-         
-         
-         assertEquals(0, cf1.getServerId());
-         
-         assertEquals(1, cf2.getServerId());
-         
-         assertEquals(2, cf3.getServerId());
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         assertEquals(3, delegates.length);
-         
-         assertEquals(3, failoverMap.size());
-         
-         // Default failover policy just chooses the node to the right
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-      }
-      
-      //Now cleanly stop one of the servers
-            
-      log.info("************** STOPPING SERVER 0");
-      ServerManagement.stop(0);
-      
-      log.info("server stopped");
-      
-      assertEquals(2, ServerManagement.getServer(1).getNodeIDView().size());
-      
-      {         
-         //Lookup another connection factory
-         
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-         
-         log.info("Got connection factory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         log.info("Got failover map");
-         
-         assertEquals(2, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         //Order here depends on order servers were started in
-         
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         assertEquals(1, cf1.getServerId());
-         
-         assertEquals(2, cf2.getServerId());
-         
-         
-         assertEquals(2, failoverMap.size());
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-      }
-      
-      //Cleanly stop another server
-      
-      log.info("Server 1 is started: " + ServerManagement.getServer(1).isServerPeerStarted());
-      
-      ServerManagement.stop(1);
-      
-      assertEquals(1, ServerManagement.getServer(2).getNodeIDView().size());
-      
-      {         
-         //Lookup another connection factory
-         
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic2.lookup("/ConnectionFactory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         assertEquals(1, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         assertEquals(2, cf1.getServerId());
-         
-         
-         assertEquals(1, failoverMap.size());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-      }
-            
-      //Restart server 0
-      
-      ServerManagement.start("all", 0);
-      
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
-         
-         log.info("Got connection factory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         log.info("Got failover map");
-         
-         assertEquals(2, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         assertEquals(2, cf1.getServerId());
-         
-         assertEquals(0, cf2.getServerId());
-         
-         
-         assertEquals(2, failoverMap.size());
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-      }
-      
-      
-      //Restart server 1
-      
-      ServerManagement.start("all", 1);
-      
-      {
-         JBossConnectionFactory factory =  (JBossConnectionFactory )ic1.lookup("/ConnectionFactory");
-         
-         log.info("Got connection factory");
-         
-         ClusteredClientConnectionFactoryDelegate delegate =
-            (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
-         
-         ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
-         
-         Map failoverMap = delegate.getFailoverMap();
-         
-         log.info("Got failover map");
-         
-         assertEquals(3, delegates.length);
-         
-         ClientConnectionFactoryDelegate cf1 = delegate.getDelegates()[0];
-         
-         ClientConnectionFactoryDelegate cf2 = delegate.getDelegates()[1];
-         
-         ClientConnectionFactoryDelegate cf3 = delegate.getDelegates()[2];
-         
-         log.info("cf1 serverid=" + cf1.getServerId());
-         
-         log.info("cf2 serverid=" + cf2.getServerId());
-         
-         log.info("cf3 serverid=" + cf3.getServerId());
-         
-         assertEquals(2, cf1.getServerId());
-         
-         assertEquals(0, cf2.getServerId());
-         
-         assertEquals(1, cf3.getServerId());
-         
-         
-         assertEquals(3, failoverMap.size());
-         
-         assertEquals(cf2.getServerId(), ((Integer)failoverMap.get(new Integer(cf1.getServerId()))).intValue());
-         
-         assertEquals(cf3.getServerId(), ((Integer)failoverMap.get(new Integer(cf2.getServerId()))).intValue());
-         
-         assertEquals(cf1.getServerId(), ((Integer)failoverMap.get(new Integer(cf3.getServerId()))).intValue());
-      }            
-   }
    
-   public void testSimpleFailover() throws Exception
+   public void testFailoverWithUnackedMessagesClientAcknowledge() throws Exception
    {
       JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
       
@@ -462,8 +597,7 @@
       Connection conn = null;
       
       try
-      {
-      
+      {      
          //Get a connection on server 1
          conn = factory.createConnection(); //connection on server 0
          
@@ -481,7 +615,7 @@
          
          assertEquals(1, initialServerID);
                            
-         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
          
          MessageProducer prod = sess.createProducer(queue1);
          
@@ -496,6 +630,20 @@
             prod.send(tm);
          }
          
+         conn.start();
+         
+         //Now consume half of the messages but don't ack them these will end up in 
+         //client side toAck list
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = (TextMessage)cons.receive(500);
+            
+            assertNotNull(tm);
+            
+            assertEquals("message:" + i, tm.getText());
+         }
+         
          //So now, messages should be in queue1 on server 1
          //So we now kill server 1
          //Which should cause transparent failover of connection conn onto server 1
@@ -503,7 +651,7 @@
          log.info("************ KILLING (CRASHING) SERVER 1");
          
          ServerManagement.getServer(1).kill();
-
+         
          log.info("killed server, now waiting");
          
          Thread.sleep(5000);
@@ -522,17 +670,41 @@
          
          conn.start();
          
-         for (int i = 0; i < NUM_MESSAGES; i++)
+         //Now should be able to consume the rest of the messages
+         
+         log.info("here1");
+         
+         TextMessage tm = null;
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
          {
-            TextMessage tm = (TextMessage)cons.receive(1000);
+            tm = (TextMessage)cons.receive(500);
+                                    
+            log.info("message is " + tm.getText());
             
-            log.info("message is " + tm);
-            
             assertNotNull(tm);
             
             assertEquals("message:" + i, tm.getText());
          }
-         log.info("done");
+         
+         log.info("here2");
+         
+         //Now should be able to acknowledge them
+         
+         tm.acknowledge();
+         
+         //Now check there are no more messages there
+         sess.close();
+         
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         cons = sess.createConsumer(queue1);
+         
+         Message m = cons.receive(500);
+         
+         assertNull(m);
+         
+         log.info("got to end of test");
       }
       finally
       {         
@@ -551,6 +723,182 @@
       
    }
    
+//   public void testFailoverWithUnackedMessagesTransactional() throws Exception
+//   {
+//      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");
+//      
+//      ClusteredClientConnectionFactoryDelegate delegate =
+//         (ClusteredClientConnectionFactoryDelegate)factory.getDelegate();
+//
+//      Set nodeIDView = ServerManagement.getServer(0).getNodeIDView();
+//      assertEquals(3, nodeIDView.size());
+//      
+//      ClientConnectionFactoryDelegate[] delegates = delegate.getDelegates();
+//      
+//      ClientConnectionFactoryDelegate cf1 = delegates[0];
+//      
+//      ClientConnectionFactoryDelegate cf2 = delegates[1];
+//      
+//      ClientConnectionFactoryDelegate cf3 = delegates[2];
+//      
+//      int server0Id = cf1.getServerId();
+//      
+//      int server1Id = cf2.getServerId();
+//      
+//      int server2Id = cf3.getServerId();
+//      
+//      log.info("server 0 id: " + server0Id);
+//      
+//      log.info("server 1 id: " + server1Id);
+//      
+//      log.info("server 2 id: " + server2Id);
+//                  
+//      Map failoverMap = delegate.getFailoverMap();
+//      
+//      log.info(failoverMap.get(new Integer(server0Id)));
+//      log.info(failoverMap.get(new Integer(server1Id)));
+//      log.info(failoverMap.get(new Integer(server2Id)));
+//      
+//      int server1FailoverId = ((Integer)failoverMap.get(new Integer(server1Id))).intValue();
+//      
+//      // server 1 should failover onto server 2
+//      
+//      assertEquals(server2Id, server1FailoverId);
+//      
+//      Connection conn = null;
+//      
+//      try
+//      {      
+//         //Get a connection on server 1
+//         conn = factory.createConnection(); //connection on server 0
+//         
+//         conn.close();
+//         
+//         conn = factory.createConnection(); //connection on server 1
+//         
+//         JBossConnection jbc = (JBossConnection)conn;
+//         
+//         ClientConnectionDelegate del = (ClientConnectionDelegate)jbc.getDelegate();
+//         
+//         ConnectionState state = (ConnectionState)del.getState();
+//         
+//         int initialServerID = state.getServerID();
+//         
+//         assertEquals(1, initialServerID);
+//                           
+//         Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+//         
+//         MessageProducer prod = sess.createProducer(queue1);
+//         
+//         MessageConsumer cons = sess.createConsumer(queue1);
+//         
+//         final int NUM_MESSAGES = 100;
+//         
+//         for (int i = 0; i < NUM_MESSAGES; i++)
+//         {
+//            TextMessage tm = sess.createTextMessage("message:" + i);
+//            
+//            prod.send(tm);
+//         }
+//         
+//         sess.commit();
+//         
+//         conn.start();
+//         
+//         //Now consume half of the messages but don't commit them these will end up in 
+//         //client side resource manager
+//         
+//         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+//         {
+//            TextMessage tm = (TextMessage)cons.receive(500);
+//            
+//            assertNotNull(tm);
+//            
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//         
+//         //So now, messages should be in queue1 on server 1
+//         //So we now kill server 1
+//         //Which should cause transparent failover of connection conn onto server 1
+//         
+//         log.info("************ KILLING (CRASHING) SERVER 1");
+//         
+//         ServerManagement.getServer(1).kill();
+//
+//         log.info("killed server, now waiting");
+//         
+//         Thread.sleep(5000);
+//         
+//         log.info("done wait");
+//         
+//         state = (ConnectionState)del.getState();
+//         
+//         int finalServerID = state.getServerID();
+//         
+//         log.info("final server id= " + finalServerID);
+//         
+//         //server id should now be 2
+//         
+//         assertEquals(2, finalServerID);
+//         
+//         conn.start();
+//         
+//         //Now should be able to consume the rest of the messages
+//         
+//         log.info("here1");
+//         
+//         TextMessage tm = null;
+//         
+//         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+//         {
+//            tm = (TextMessage)cons.receive(500);
+//                                    
+//            log.info("message is " + tm.getText());
+//            
+//            assertNotNull(tm);
+//            
+//            assertEquals("message:" + i, tm.getText());
+//         }
+//         
+//         log.info("here2");
+//         
+//         //Now should be able to commit them
+//         
+//         sess.commit();
+//         
+//         //Now check there are no more messages there
+//         sess.close();
+//         
+//         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//         
+//         cons = sess.createConsumer(queue1);
+//         
+//         Message m = cons.receive(500);
+//         
+//         assertNull(m);
+//         
+//         log.info("got to end of test");
+//      }
+//      finally
+//      {         
+//         if (conn != null)
+//         {
+//            try
+//            {
+//               conn.close();
+//            }
+//            catch (Exception e)
+//            {
+//               e.printStackTrace();
+//            }
+//         }
+//      }
+//      
+//   }
+   
+   
+   
+   
 //   public void testEvenSimplerFailover() throws Exception
 //   {
 //      JBossConnectionFactory factory =  (JBossConnectionFactory )ic0.lookup("/ConnectionFactory");

Deleted: trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-12-12 09:11:39 UTC (rev 1769)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/ManualClusteringTest.java	2006-12-12 10:49:42 UTC (rev 1770)
@@ -1,1025 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
-
-/**
- * 
- * A ManualClusteringTest
- * 
- * Nodes must be started up in order node1, node2, node3
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class ManualClusteringTest extends ClusteringTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public ManualClusteringTest(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-
-   public void testClusteredQueueLocalConsumerNonPersistent() throws Exception
-   {
-      clusteredQueueLocalConsumer(false);
-   }
-
-   public void testClusteredQueueLocalConsumerPersistent() throws Exception
-   {
-      clusteredQueueLocalConsumer(true);
-   }
-
-   public void testClusteredTopicNonDurableNonPersistent() throws Exception
-   {
-      clusteredTopicNonDurable(false);
-   }
-
-   public void testClusteredTopicNonDurablePersistent() throws Exception
-   {
-      clusteredTopicNonDurable(true);
-   }
-
-   public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
-   {
-      clusteredTopicNonDurableWithSelectors(false);
-   }
-
-   public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
-   {
-      clusteredTopicNonDurableWithSelectors(true);
-   }
-
-   public void testClusteredTopicDurableNonPersistent() throws Exception
-   {
-      clusteredTopicDurable(false);
-   }
-
-   public void testClusteredTopicDurablePersistent() throws Exception
-   {
-      clusteredTopicDurable(true);
-   }
-
-   public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableLocalConsumer(false);
-   }
-
-   public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableLocalConsumer(true);
-   }
-
-   public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableNoLocalSub(false);
-   }
-
-   public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
-   {
-      clusteredTopicSharedDurableNoLocalSub(true);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-   }
-
-   protected void tearDown() throws Exception
-   {
-      super.tearDown();
-   }
-  
-
-   /*
-    * Create a consumer on each queue on each node.
-    * Send messages in turn from all nodes.
-    * Ensure that the local consumer gets the message
-    */
-   protected void clusteredQueueLocalConsumer(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      Connection conn2 = null;
-      Connection conn3 = null;
-
-      try
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn1 = cf.createConnection();
-         conn2 = cf.createConnection();
-         conn3 = cf.createConnection();
-         
-         log.info("Created connections");
-         
-         checkConnectionsDifferentServers(conn1, conn2, conn3);
-
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         log.info("Created sessions");
-
-         MessageConsumer cons1 = sess1.createConsumer(queue0);
-         MessageConsumer cons2 = sess2.createConsumer(queue1);
-         MessageConsumer cons3 = sess3.createConsumer(queue2);
-         
-         log.info("Created consumers");
-
-         conn1.start();
-         conn2.start();
-         conn3.start();
-
-         // Send at node 0
-
-         MessageProducer prod = sess1.createProducer(queue0);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-
-            prod.send(tm);
-         }
-         
-         log.info("Sent messages");
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }                 
-
-         Message m = cons2.receive(2000);
-
-         assertNull(m);
-
-         m = cons3.receive(2000);
-
-         assertNull(m);
-
-         // Send at node 1
-
-         MessageProducer prod1 = sess2.createProducer(queue1);
-
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message" + i);
-
-            prod1.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons3.receive(2000);
-
-         assertNull(m);
-
-         // Send at node 2
-
-         MessageProducer prod2 = sess3.createProducer(queue2);
-
-         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess3.createTextMessage("message" + i);
-
-            prod2.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-      }
-   }
-
-   // Private -------------------------------------------------------
-
-   /*
-    * Create non durable subscriptions on all nodes of the cluster.
-    * Ensure all messages are receive as appropriate
-    */
-   private void clusteredTopicNonDurable(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      Connection conn2 = null;
-      Connection conn3 = null;
-      try
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn1 = cf.createConnection();
-         conn2 = cf.createConnection();
-         conn3 = cf.createConnection();
-         
-         log.info("Created connections");
-         
-         checkConnectionsDifferentServers(conn1, conn2, conn3);
-
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageConsumer cons1 = sess1.createConsumer(topic0);
-         MessageConsumer cons2 = sess2.createConsumer(topic1);
-         MessageConsumer cons3 = sess3.createConsumer(topic2);
-
-         MessageConsumer cons4 = sess1.createConsumer(topic0);
-
-         MessageConsumer cons5 = sess2.createConsumer(topic1);
-
-         conn1.start();
-         conn2.start();
-         conn3.start();
-
-         // Send at node 0
-
-         MessageProducer prod = sess1.createProducer(topic0);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-
-            prod.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons4.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons5.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-      }
-   }
-
-   /*
-    * Create non durable subscriptions on all nodes of the cluster.
-    * Include some with selectors
-    * Ensure all messages are receive as appropriate
-    */
-   private void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      Connection conn2 = null;
-      Connection conn3 = null;
-
-      try
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn1 = cf.createConnection();
-         conn2 = cf.createConnection();
-         conn3 = cf.createConnection();
-         
-         log.info("Created connections");
-         
-         checkConnectionsDifferentServers(conn1, conn2, conn3);
-
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         MessageConsumer cons1 = sess1.createConsumer(topic0);
-         MessageConsumer cons2 = sess2.createConsumer(topic1);
-         MessageConsumer cons3 = sess3.createConsumer(topic2);
-
-         MessageConsumer cons4 = sess1.createConsumer(topic0, "COLOUR='red'");
-
-         MessageConsumer cons5 = sess2.createConsumer(topic1, "COLOUR='blue'");
-
-         conn1.start();
-         conn2.start();
-         conn3.start();
-
-         // Send at node 0
-
-         MessageProducer prod = sess1.createProducer(topic0);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-
-            int c = i % 3;
-            if (c == 0)
-            {
-               tm.setStringProperty("COLOUR", "red");
-            }
-            else if (c == 1)
-            {
-               tm.setStringProperty("COLOUR", "blue");
-            }
-
-            prod.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            int c = i % 3;
-
-            if (c == 0)
-            {
-               TextMessage tm = (TextMessage)cons4.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("message" + i, tm.getText());
-            }
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            int c = i % 3;
-
-            if (c == 1)
-            {
-               TextMessage tm = (TextMessage)cons5.receive(1000);
-
-               assertNotNull(tm);
-
-               assertEquals("message" + i, tm.getText());
-            }
-         }
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-      }
-   }
-
-
-
-   /*
-    * Create durable subscriptions on all nodes of the cluster.
-    * Include a couple with selectors
-    * Ensure all messages are receive as appropriate
-    * None of the durable subs are shared
-    */
-   private void clusteredTopicDurable(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      Connection conn2 = null;
-      Connection conn3 = null;
-      try
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn1 = cf.createConnection();
-         conn2 = cf.createConnection();
-         conn3 = cf.createConnection();
-         
-         log.info("Created connections");
-         
-         checkConnectionsDifferentServers(conn1, conn2, conn3);
-
-         conn1.setClientID("wib1");
-         conn2.setClientID("wib1");
-         conn3.setClientID("wib1");
-
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         try
-         {
-            sess1.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess2.unsubscribe("sub1");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess3.unsubscribe("sub2");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess1.unsubscribe("sub3");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess2.unsubscribe("sub4");
-         }
-         catch (Exception ignore) {}
-
-         MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
-         MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub1");
-         MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub2");
-         MessageConsumer cons4 = sess1.createDurableSubscriber(topic0, "sub3");
-         MessageConsumer cons5 = sess2.createDurableSubscriber(topic1, "sub4");
-
-         conn1.start();
-         conn2.start();
-         conn3.start();
-
-         // Send at node 0
-
-         MessageProducer prod = sess1.createProducer(topic0);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message" + i);
-
-            prod.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons4.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons5.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         cons1.close();
-         cons2.close();
-         cons3.close();
-         cons4.close();
-         cons5.close();
-
-         sess1.unsubscribe("sub");
-         sess2.unsubscribe("sub1");
-         sess3.unsubscribe("sub2");
-         sess1.unsubscribe("sub3");
-         sess2.unsubscribe("sub4");
-
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-      }
-   }
-
-
-
-
-   /*
-    * Create shared durable subs on multiple nodes, the local instance should always get the message
-    */
-   protected void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      Connection conn2 = null;
-      Connection conn3 = null;
-      try
-
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn1 = cf.createConnection();
-         conn2 = cf.createConnection();
-         conn3 = cf.createConnection();
-         
-         log.info("Created connections");
-         
-         checkConnectionsDifferentServers(conn1, conn2, conn3);
-         conn1.setClientID("wib1");
-         conn2.setClientID("wib1");
-         conn3.setClientID("wib1");
-
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         try
-         {
-            sess1.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess2.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess3.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
-
-         MessageConsumer cons1 = sess1.createDurableSubscriber(topic0, "sub");
-         MessageConsumer cons2 = sess2.createDurableSubscriber(topic1, "sub");
-         MessageConsumer cons3 = sess3.createDurableSubscriber(topic2, "sub");
-
-         conn1.start();
-         conn2.start();
-         conn3.start();
-
-         // Send at node 0
-
-         MessageProducer prod = sess1.createProducer(topic0);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-
-            prod.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         Message m = cons2.receive(2000);
-
-         assertNull(m);
-
-         m = cons3.receive(2000);
-
-         assertNull(m);
-
-         // Send at node 1
-
-         MessageProducer prod1 = sess2.createProducer(topic1);
-
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess3.createTextMessage("message" + i);
-
-            prod1.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons3.receive(2000);
-
-         assertNull(m);
-
-         // Send at node 2
-
-         MessageProducer prod2 = sess3.createProducer(topic2);
-
-         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess3.createTextMessage("message" + i);
-
-            prod2.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons3.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i, tm.getText());
-         }
-
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-
-         cons1.close();
-         cons2.close();
-         cons3.close();
-
-         // Need to unsubscribe on any node that the durable sub was created on
-
-         sess1.unsubscribe("sub");
-         sess2.unsubscribe("sub");
-         sess3.unsubscribe("sub");
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-      }
-   }
-
-
-
-   /*
-    * Create shared durable subs on multiple nodes, but without sub on local node
-    * should round robin
-    * note that this test assumes round robin
-    */
-   protected void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
-   {
-      Connection conn1 = null;
-      Connection conn2 = null;
-      Connection conn3 = null;
-
-      try
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn1 = cf.createConnection();
-         conn2 = cf.createConnection();
-         conn3 = cf.createConnection();
-         
-         log.info("Created connections");
-         
-         checkConnectionsDifferentServers(conn1, conn2, conn3);
-         
-         conn2.setClientID("wib1");
-         conn3.setClientID("wib1");
-
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-         try
-         {
-            sess2.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
-         try
-         {
-            sess3.unsubscribe("sub");
-         }
-         catch (Exception ignore) {}
-
-         MessageConsumer cons1 = sess2.createDurableSubscriber(topic1, "sub");
-         MessageConsumer cons2 = sess3.createDurableSubscriber(topic2, "sub");
-
-         conn2.start();
-         conn3.start();
-
-         // Send at node 0
-
-         //Should round robin between the other 2 since there is no active consumer on sub  on node 0
-
-         MessageProducer prod = sess1.createProducer(topic0);
-
-         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-
-            prod.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + i * 2, tm.getText());
-         }
-
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message" + (i * 2 + 1), tm.getText());
-         }
-
-         cons1.close();
-         cons2.close();
-
-         sess2.unsubscribe("sub");
-         sess3.unsubscribe("sub");
-
-      }
-      finally
-      {
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-
-         if (conn3 != null)
-         {
-            conn3.close();
-         }
-      }
-   }
-
-   class MyListener implements MessageListener
-   {
-      private int i;
-
-      MyListener(int i)
-      {
-         this.i = i;
-      }
-
-      public void onMessage(Message m)
-      {
-         try
-         {
-            int count = m.getIntProperty("count");
-
-            log.info("Listener " + i + " received message " + count);
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
-      }
-
-   }
-
-
-   // Inner classes -------------------------------------------------
-
-   
-
-}




More information about the jboss-cvs-commits mailing list