[jboss-cvs] JBoss Messaging SVN: r3337 - in branches/Branch_Stable: src/etc/server/default/deploy and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Nov 16 07:22:59 EST 2007


Author: timfox
Date: 2007-11-16 07:22:58 -0500 (Fri, 16 Nov 2007)
New Revision: 3337

Added:
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
Removed:
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java
Modified:
   branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml
   branches/Branch_Stable/docs/userguide/en/modules/configuration.xml
   branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
   branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
   branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml
   branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
   branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1153


Modified: branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml
===================================================================
--- branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/docs/userguide/en/modules/c_configuration.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,53 +1,93 @@
-<?xml version="1.0" encoding="UTF-8"?>
+<?xml version="1.0" encoding="UTF8"?>
 <chapter id="c_configuration">
    <title>JBoss Messaging Clustering Notes</title>
-   <para>JBoss Messaging clustering should work out of the box in most cases
-   with no configuration changes. It is however crucial that every node is
-   assigned a unique server id, as specified in the installation guide.</para>
-   <para>Every node deployed must have a unique id, including those in a
-   particular LAN cluster, and also those only linked by mesage
-   bridges.</para>
-   <para>JBoss Messaging clusters JMS queues and topics transparently across
-   the cluster. Messages sent to a distributed queue or topic on one node are
-   consumable on other nodes. To designate that a particular destination is
-   clustered simply set the clustered attribute in the destination deployment
-   descriptor to true.</para>
-   <para>JBoss Messaging balances messages between nodes, catering for faster
-   or slower consumers to efficiently balance processing load across the
-   cluster.</para>
-   <para>JBoss Messaging durable subscrtiptions can also be clustered. This
-   means multiple subscribers can consume from the same durable subscription
-   from different nodes of the cluster. A durable subscription will be
-   clustered if it's topic is clustered</para>
-   <para>JBoss Messaging also supports clustered temporary topics and queues.
-   All temporary topics and queues will be clustered if the post office is
-   clustered</para>
-   <para>If you don't want your nodes to participate in a cluster, or only
-   have one non clustered server you can set the clustered attribute on the
-   postoffice to false</para>
-   <para>If you wish to apply strict JMS ordering to messages, such that a
-   particular JMS consumer consumes messages in the same order as they were
-   produced by a particular producer, you can set the DefaultPreserveOrdering
-   attribute in the server peer to true. By default this is false. The
-   side-effect of setting this to true is that messages cannot be distributed
-   as freely around the cluster</para>
-   <para>When pulling reliable messages from one node to another, JBoss
-   Messaging can use client acnowledgement or an XA transaction. The default
-   is client acknowledgement. Using XA transactions is a fairly heavyweight
-   operation but ensures absolute once and only once delivery.</para>
-   <para>If the call to send a persistent message to a persistent destination
-   returns successfully with no exception, then you can be sure that the
-   message was persisted. However if the call doesn't return successfully e.g.
-   if an exception is thrown, then you *can't be sure the message wasn't
-   persisted*. Since the failure might have occurred after persisting the
-   message but before writing the response to the caller. This is a common
-   attribute of any RPC type call: You can't tell by the call not returning
-   that the call didn't actually succeed. Whether it's a web services call, an
-   HTTP get request, an ejb invocation the same applies. The trick is to code
-   your application so your operations are *idempotent* - i.e. they can be
-   repeated without getting the system into an inconsistent state. With a
-   message system you can do this on the application level, by checking for
-   duplicate messages, and discarding them if they arrive. Duplicate checking
-   is a very powerful technique that can remove the need for XA transactions
-   in many cases.</para>
+  
+   <section id="c_conf.serverpeerid">
+	   <title>Unique server peer id</title>
+	   <para>JBoss Messaging clustering should work out of the box in most cases
+		 with no configuration changes. It is however crucial that every node is
+		 assigned a unique server id, as specified in the installation guide.</para>
+	   <para>Every node deployed must have a unique id, including those in a
+		 particular LAN cluster, and also those only linked by mesage
+		 bridges.</para>
+   </section>
+     
+   <section id="c_conf.clustereddests">   
+	<title>Clustered destinations</title>
+	<para>JBoss Messaging clusters JMS queues and topics transparently across
+		the cluster. Messages sent to a distributed queue or topic on one node are
+		consumable on other nodes. To designate that a particular destination is
+		clustered simply set the clustered attribute in the destination deployment
+		descriptor to true.</para>
+	<para>JBoss Messaging balances messages between nodes, catering for faster
+		or slower consumers to efficiently balance processing load across the
+		cluster.</para>
+   </section>
+   
+      <section id="c_conf.clustereddursubs">
+	   	<title>Clustered durable subs</title>
+	   	<para>JBoss Messaging durable subscriptions can also be clustered. This
+		   	means multiple subscribers can consume from the same durable subscription
+		   	from different nodes of the cluster. A durable subscription will be
+		   	clustered if it's topic is clustered</para>
+	      </section>
+      
+      <section id="c_conf.clusteredtempdest">
+	   	<title>Clustered temporary destinations</title>
+	   	<para>JBoss Messaging also supports clustered temporary topics and queues.
+		   	All temporary topics and queues will be clustered if the post office is
+		   	clustered</para>
+	      </section>
+   
+      <section id="c_conf.nonclusteredserver">  
+	   	<title>Non clustered servers</title>
+	   	<para>If you don't want your nodes to participate in a cluster, or only
+		   	have one non clustered server you can set the clustered attribute on the
+		   	postoffice to false</para>
+	      </section>
+      
+      
+      <section id="c_conf.orderingincluster">
+	   	<title>Message ordering in the cluster</title>
+	   	<para>If you wish to apply strict JMS ordering to messages, such that a
+		   	particular JMS consumer consumes messages in the same order as they were
+		   	produced by a particular producer, you can set the DefaultPreserveOrdering
+		   	attribute in the server peer to true. By default this is false. The
+		   	sideeffect of setting this to true is that messages cannot be distributed
+		   	as freely around the cluster</para>
+	      </section>
+   
+   	   
+      <section id="c_conf.idempotentops">	
+	   	<title>Idempotent operations</title>
+	   	<para>If the call to send a persistent message to a persistent destination
+		   	returns successfully with no exception, then you can be sure that the
+		   	message was persisted. However if the call doesn't return successfully e.g.
+		   	if an exception is thrown, then you *can't be sure the message wasn't
+		   	persisted*. Since the failure might have occurred after persisting the
+		   	message but before writing the response to the caller. This is a common
+		   	attribute of any RPC type call: You can't tell by the call not returning
+		   	that the call didn't actually succeed. Whether it's a web services call, an
+		   	HTTP get request, an ejb invocation the same applies. The trick is to code
+		   	your application so your operations are *idempotent*  i.e. they can be
+		   	repeated without getting the system into an inconsistent state. With a
+		   	message system you can do this on the application level, by checking for
+		   	duplicate messages, and discarding them if they arrive. Duplicate checking
+		   	is a very powerful technique that can remove the need for XA transactions
+		   	in many cases.</para>
+	      </section>
+      
+      
+      <section id="c_conf.clusteredcfs">
+	   	<title>Clustered connection factories</title>
+	   	<para>If the supportsLoadBalancing attribute of the connection factory is set to true then consecutive create connection attempts will round robin between available servers. The first node to try is chosen randomly</para>
+	   	<para>If the supportsFailover attribute of the connection factory is set to true then automatic failover is enabled.
+		   	This will automatically failover from one server to another, transparently to the user, in case of failure.</para>
+	           <para>If automatic failover is not required or you wish to do manual failover (JBoss MQ style) this can be set to false, and you can supply a standard JMS ExceptionListener on the connection which will be called in case of
+		   	connection failure. You would then need to manually close the connection, lookup a new connection factory from
+		   	HA JNDI and recreate the connection.</para>
+	   	
+   </section>
+   
+   
 </chapter>
\ No newline at end of file

Modified: branches/Branch_Stable/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_Stable/docs/userguide/en/modules/configuration.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/docs/userguide/en/modules/configuration.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -105,10 +105,6 @@
       
       &lt;attribute name="ClusterPullConnectionFactoryName"&gt;jboss.messaging.connectionfactory:service=ClusterPullConnectionFactory&lt;/attribute&gt;
       
-      &lt;!-- Use XA when pulling persistent messages from a remote node to this one. --&gt;
-      
-      &lt;attribute name="UseXAForMessagePull"&gt;false&lt;/attribute&gt;
-      
       &lt;!-- When redistributing messages in the cluster. Do we need to preserve the order of messages received
             by a particular consumer from a particular producer? --&gt;
             
@@ -271,13 +267,6 @@
             messages between nodes. You will not normally need to change
             this.</para>
          </section>
-         <section id="conf.serverpeer.attributes.usexaformessagepull">
-            <title>UseXAForMessagePull</title>
-            <para>If true, then move a reliable message from one node to
-            another in an XA transaction. Relaxing this gives better
-            performance at the expense of some reliability. See the cluster
-            configurations section for more details. Default is false.</para>
-         </section>
          <section id="conf.serverpeer.attributes.defaultpreserveordering">
             <title>DefaultPreserveOrdering</title>
             <para>If true, then strict JMS ordering is preserved in the

Modified: branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/messaging-service.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -81,10 +81,6 @@
       
       <attribute name="ClusterPullConnectionFactoryName">jboss.messaging.connectionfactory:service=ClusterPullConnectionFactory</attribute>
       
-      <!-- Use XA when pulling persistent messages from a remote node to this one. -->
-      
-      <attribute name="UseXAForMessagePull">false</attribute>
-      
       <!-- When redistributing messages in the cluster. Do we need to preserve the order of messages received
             by a particular consumer from a particular producer? -->
             

Modified: branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mssql-persistence-service.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -61,7 +61,8 @@
    SELECT_MIN_MAX_PAGE_ORD=SELECT MIN(PAGE_ORD), MAX(PAGE_ORD) FROM JBM_MSG_REF WHERE CHANNEL_ID = ?
    SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
-   UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?   
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?   
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)

Modified: branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/mysql-persistence-service.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -62,6 +62,7 @@
    SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? 
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)

Modified: branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/oracle-persistence-service.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -66,6 +66,7 @@
    SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? 
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)

Modified: branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/postgresql-persistence-service.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -62,6 +62,7 @@
    SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? 
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)

Modified: branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/server/default/deploy/sybase-persistence-service.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -67,6 +67,7 @@
    SELECT_EXISTS_REF_MESSAGE_ID=SELECT MESSAGE_ID FROM JBM_MSG_REF WHERE MESSAGE_ID = ?
    UPDATE_DELIVERY_COUNT=UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?
    UPDATE_CHANNEL_ID=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?
+   MOVE_REFERENCE=UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ? 
    LOAD_MESSAGES=SELECT MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, HEADERS, PAYLOAD, TYPE FROM JBM_MSG
    INSERT_MESSAGE=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE, HEADERS, PAYLOAD) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
    INSERT_MESSAGE_CONDITIONAL=INSERT INTO JBM_MSG (MESSAGE_ID, RELIABLE, EXPIRATION, TIMESTAMP, PRIORITY, TYPE) SELECT ?, ?, ?, ?, ?, ? FROM JBM_DUAL WHERE NOT EXISTS (SELECT MESSAGE_ID FROM JBM_MSG WHERE MESSAGE_ID = ?)

Modified: branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml
===================================================================
--- branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/etc/xmdesc/ServerPeer-xmbean.xml	2007-11-16 12:22:58 UTC (rev 3337)
@@ -181,13 +181,7 @@
       <name>ClusterPullConnectionFactoryName</name>
       <type>java.lang.String</type>
    </attribute> 
-   
-   <attribute access="read-write" getMethod="isUseXAForMessagePull" setMethod="setUseXAForMessagePull">
-      <description>When pulling persistent messages from a remote durable queue to a local one, should XA be used?</description>
-      <name>UseXAForMessagePull</name>
-      <type>boolean</type>
-   </attribute>  
-   
+
    <attribute access="read-write" getMethod="isDefaultPreserveOrdering" setMethod="setDefaultPreserveOrdering">
       <description>When pulling messages do we need to preserve the ordering of messages consumed from a particular producer, for a particular consumer?</description>
       <name>DefaultPreserveOrdering</name>

Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/ProducerAspect.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -89,9 +89,7 @@
       long timeToLive = ((Long)args[4]).longValue();
 
       boolean keepID = args.length>5? ((Boolean)args[5]).booleanValue() : false;
-
-      String keptId = null;
-
+      
       // configure the message for sending, using attributes stored as metadata
 
       ProducerState producerState = getProducerState(mi);
@@ -175,7 +173,7 @@
       // Generate the message id
       ConnectionState connectionState = (ConnectionState)sessionState.getParent();
 
-      long id = 0;
+      long id = -1;
 
       JBossMessage messageToSend;
       boolean foreign = false;
@@ -229,14 +227,13 @@
          // get the actual message
          MessageProxy proxy = (MessageProxy)m;
 
+         m.setJMSDestination(destination);
+         
          if (keepID)
          {
-            keptId = m.getJMSMessageID();
+            id = proxy.getMessage().getMessageID();
          }
 
-
-         m.setJMSDestination(destination);
-
          //The following line executed on the proxy should cause a copy to occur
          //if it is necessary
          proxy.setJMSMessageID(null);
@@ -249,15 +246,13 @@
 
       // Set the new id
 
-      id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
-      messageToSend.setMessageId(id);
-
-      if (keptId != null)
+      if (!keepID)
       {
-         messageToSend.setJMSMessageID(keptId);
+         id = connectionState.getIdGenerator().getId((ConnectionDelegate)connectionState.getDelegate());
       }
-
-
+      
+      messageToSend.setMessageId(id);
+            
       // This only really used for BytesMessages and StreamMessages to reset their state
       messageToSend.doBeforeSend(); 
       

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -342,7 +342,13 @@
                      {
                         ((ClientClusteredConnectionFactoryDelegate)del).setFailoverMap(failoverMap);
                      }
-                  }
+                  }                  
+                                 
+                  //Note we don't rebind at this point - we just update the maps.
+                  //When a node joins or leaves, we first get the join/leave notification
+                  //Then we'll get a subsequent connection factory deploy/undeploy
+                  //Even when a node crashes we'll get this since the postoffice ensures replication removes 
+                  //are called in this event                  
                }
                else if ((notification.type == ClusterNotification.TYPE_REPLICATOR_PUT || notification.type == ClusterNotification.TYPE_REPLICATOR_REMOVE) &&
                         (notification.data instanceof String) && ((String)notification.data).startsWith(Replicator.CF_PREFIX))

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -57,6 +57,7 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
 import org.jboss.messaging.core.contract.Delivery;
+import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
 import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PostOffice;
@@ -698,7 +699,7 @@
       if (dest.isDirect())
       {
       	//Route directly to queue - temp kludge for clustering
-      	
+         
       	Binding binding = postOffice.getBindingForQueueName(dest.getName());
       	
       	if (binding == null)
@@ -708,7 +709,9 @@
       	
       	Queue queue = binding.queue;
       	
-      	Delivery del = queue.handle(null, ref, tx);
+         Long scid = (Long)ref.getMessage().removeHeader(Message.SOURCE_CHANNEL_ID);
+         
+         Delivery del = queue.handleMove(ref, scid.longValue());
       	
       	if (del == null)
       	{

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -438,6 +438,11 @@
 
    // Package protected ----------------------------------------------------------------------------
    
+   boolean isRemote()
+   {
+      return this.remote;
+   }       
+   
    boolean isReplicating()
    {
    	return replicating;

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -1768,8 +1768,16 @@
          return false;
       }
       
-      rec.del.acknowledge(null);  
-      
+      if (rec.getConsumer().isRemote())
+      {
+         //Optimisation for shared DB - we don't ack in DB - we move
+         rec.del.getObserver().acknowledgeNoPersist(rec.del);
+      }
+      else
+      {      
+         rec.del.acknowledge(null);
+      }
+           
       //Now replicate the ack
       
       if (rec.replicating && replicating)

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/DeliveryObserver.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -32,6 +32,8 @@
 public interface DeliveryObserver
 {
    void acknowledge(Delivery d, Transaction tx) throws Throwable;
+   
+   void acknowledgeNoPersist(Delivery d) throws Throwable;
 
    void cancel(Delivery d) throws Throwable;
 }

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Message.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -43,8 +43,15 @@
 	 * The header is checked when sucking messages and if order preservation is true then the message is not accepted.
 	 * This is a basic way of ensuring message order is preserved.
 	 */
-	public static final String CLUSTER_SUCKED = "CLUSTER_SUCKED";
+	public static final String CLUSTER_SUCKED = "SUCKED";
 	
+	/**
+	 * This header is set on a message when it is sucked from one node to another.
+	 * If the header exists on the destination node, and the message is persistent, the message
+	 * will be moved from one channel to the other by doing a simple database update
+	 */
+	public static final String SOURCE_CHANNEL_ID = "SCID";
+		
    /**    
     * @return The unique id of the message
     */

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -40,6 +40,8 @@
 public interface PersistenceManager extends MessagingComponent
 {
    void addReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
+   
+   void moveReference(long sourceChannelID, long destChannelID, MessageReference ref) throws Exception;
 
    void removeReference(long channelID, MessageReference ref, Transaction tx) throws Exception;
    

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/contract/Queue.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -91,4 +91,7 @@
    Map getRecoveryArea();
    
    int getRecoveryMapSize();
+   
+   //Optimisation for shared database
+   Delivery handleMove(MessageReference ref, long sourceChannelID);   
 }

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/ChannelSupport.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -35,8 +35,8 @@
 import org.jboss.messaging.core.contract.DeliveryObserver;
 import org.jboss.messaging.core.contract.Distributor;
 import org.jboss.messaging.core.contract.Filter;
+import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.core.contract.MessageReference;
-import org.jboss.messaging.core.contract.MessageStore;
 import org.jboss.messaging.core.contract.PersistenceManager;
 import org.jboss.messaging.core.impl.tx.Transaction;
 import org.jboss.messaging.core.impl.tx.TransactionException;
@@ -129,7 +129,62 @@
    }
    
    // Receiver implementation ----------------------------------------------------------------------
+   
+   //Optimisation
+   public Delivery handleMove(MessageReference ref, long sourceChannelID)
+   {
+      if (!isActive())
+      {  
+         if (trace) { log.trace(this + " is not active, returning null delivery for " + ref); }
+         
+         return null;
+      }
 
+      checkClosed();
+           
+      if (trace) { log.trace(this + " moving ref " + ref + " from channel " + sourceChannelID); }
+      
+      if (maxSize != -1 && getMessageCount() >= maxSize)
+      {
+         //Have reached maximum size - will drop message
+         
+         log.warn(this + " has reached maximum size, " + ref + " will be dropped");
+         
+         return null;
+      }
+   
+      // Each channel has its own copy of the reference
+      ref = ref.copy();
+
+      try
+      {           
+         if (ref.getMessage().isReliable() && recoverable)
+         {
+            // Reliable message in a recoverable state - also add to db
+            if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
+            
+            pm.moveReference(sourceChannelID, channelID, ref);        
+         }
+         
+         synchronized (lock)
+         {
+            addReferenceInMemory(ref);
+            
+            deliverInternal();
+         }
+            
+         messagesAdded.increment();
+      }
+      catch (Throwable t)
+      {
+         log.error("Failed to handle message", t);
+
+         return null;
+      }
+
+      return new SimpleDelivery(this, ref, true, false);  
+   }
+   
    public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
    {
       if (!isActive())
@@ -141,7 +196,78 @@
 
       checkClosed();
       
-      return handleInternal(sender, ref, tx, true);      
+      if (ref == null)
+      {
+         return null;
+      }
+
+      if (trace) { log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx)); }
+      
+      if (maxSize != -1 && getMessageCount() >= maxSize)
+      {
+         //Have reached maximum size - will drop message
+         
+         log.warn(this + " has reached maximum size, " + ref + " will be dropped");
+         
+         return null;
+      }
+   
+      // Each channel has its own copy of the reference
+      ref = ref.copy();
+
+      try
+      {           
+         if (tx == null)
+         {
+            if (ref.getMessage().isReliable() && recoverable)
+            {
+               // Reliable message in a recoverable state - also add to db
+               if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
+               
+               pm.addReference(channelID, ref, null);        
+            }
+            
+            // If the ref has a scheduled delivery time then we don't add to the in memory queue
+            // instead we create a timeout, so when that time comes delivery will attempted directly
+            
+            if (!checkAndSchedule(ref))
+            {               
+               synchronized (lock)
+               {
+                  addReferenceInMemory(ref);
+                  
+                  deliverInternal();
+               }            
+            }
+         }
+         else
+         {
+            if (trace) { log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
+
+            // add to post commit callback
+            getCallback(tx).addRef(ref);
+            
+            if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
+            
+            if (ref.getMessage().isReliable() && recoverable)
+            {
+               // Reliable message in a recoverable state - also add to db
+               if (trace) { log.trace(this + " adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
+
+               pm.addReference(channelID, ref, tx);
+            }
+         }
+         
+         messagesAdded.increment();
+      }
+      catch (Throwable t)
+      {
+         log.error("Failed to handle message", t);
+
+         return null;
+      }
+
+      return new SimpleDelivery(this, ref, true, false);     
    }
 
    // DeliveryObserver implementation --------------------------------------------------------------
@@ -152,6 +278,11 @@
 
       acknowledgeInternal(d, tx, true);
    }
+   
+   public void acknowledgeNoPersist(Delivery d) throws Throwable
+   {
+      acknowledgeInternal(d, null, false);
+   }
 
    public void cancel(Delivery del) throws Throwable
    {
@@ -580,86 +711,7 @@
       
       return false;
    }
-      
-   protected Delivery handleInternal(DeliveryObserver sender, MessageReference ref,
-                                     Transaction tx, boolean persist)
-   {
-      if (ref == null)
-      {
-         return null;
-      }
-
-      if (trace) { log.trace(this + " handles " + ref + (tx == null ? " non-transactionally" : " in transaction: " + tx)); }
-      
-      if (maxSize != -1 && getMessageCount() >= maxSize)
-      {
-         //Have reached maximum size - will drop message
          
-         log.warn(this + " has reached maximum size, " + ref + " will be dropped");
-         
-         return null;
-      }
-   
-      // Each channel has its own copy of the reference
-      ref = ref.copy();
-
-      try
-      {        	
-         if (tx == null)
-         {
-            if (persist && ref.getMessage().isReliable() && recoverable)
-            {
-               // Reliable message in a recoverable state - also add to db
-               if (trace) { log.trace(this + " adding " + ref + " to database non-transactionally"); }
-               
-               // TODO - this db access could safely be done outside the event loop
-               pm.addReference(channelID, ref, null);        
-            }
-            
-            // If the ref has a scheduled delivery time then we don't add to the in memory queue
-            // instead we create a timeout, so when that time comes delivery will attempted directly
-            
-            if (!checkAndSchedule(ref))
-            {               
-               synchronized (lock)
-               {
-                  addReferenceInMemory(ref);
-                  
-                  deliverInternal();
-               }            
-            }
-         }
-         else
-         {
-            if (trace) { log.trace(this + " adding " + ref + " to state " + (tx == null ? "non-transactionally" : "in transaction: " + tx)); }
-
-            // add to post commit callback
-            getCallback(tx).addRef(ref);
-            
-            if (trace) { log.trace(this + " added transactionally " + ref + " in memory"); }
-            
-            if (persist && ref.getMessage().isReliable() && recoverable)
-            {
-               // Reliable message in a recoverable state - also add to db
-               if (trace) { log.trace(this + " adding " + ref + (tx == null ? " to database non-transactionally" : " in transaction: " + tx)); }
-
-               pm.addReference(channelID, ref, tx);
-            }
-         }
-         
-         messagesAdded.increment();
-      }
-      catch (Throwable t)
-      {
-         log.error("Failed to handle message", t);
-
-         return null;
-      }
-
-      return new SimpleDelivery(this, ref, true, false);
-   }
-
-   
    protected boolean checkAndSchedule(MessageReference ref)
    {
       if (ref.getScheduledDeliveryTime() > System.currentTimeMillis())

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -1282,6 +1282,8 @@
    public void addReference(final long channelID, final MessageReference ref,
          final Transaction tx) throws Exception
    {
+      if (trace) { log.trace("Adding reference " + ref + " in channel " + channelID + " tx " + tx); }
+      
       class AddReferenceRunner extends JDBCTxRunner2
       {
          public Object doTransaction() throws Exception
@@ -1351,7 +1353,45 @@
          new AddReferenceRunner().executeWithRetry();
       }
    }
+   
+   public void moveReference(final long sourceChannelID, final long destChannelID, final MessageReference ref)
+      throws Exception
+   {
+      if (trace) { log.trace("Moving reference " + ref + " from " + sourceChannelID + " to " + destChannelID); }
+      
+      class MoveReferenceRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement psReference = null;
+            
+            try
+            {
+               psReference = conn.prepareStatement(getSQLStatement("MOVE_REFERENCE"));
+               
+               psReference.setLong(1, destChannelID);
+               psReference.setLong(2, sourceChannelID);
+               psReference.setLong(3, ref.getMessage().getMessageID());
 
+               int rows = psReference.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Updated " + rows + " rows");
+               }
+
+               return null;
+            }
+            finally
+            {
+               closeStatement(psReference);
+            }
+         }
+      }
+
+      new MoveReferenceRunner().executeWithRetry();      
+   }
+
    public void updateDeliveryCount(final long channelID,
          final MessageReference ref) throws Exception
    {
@@ -1394,6 +1434,8 @@
    public void removeReference(final long channelID,
          final MessageReference ref, final Transaction tx) throws Exception
    {
+      if (trace) { log.trace("Removing reference " + ref + " in channel " + channelID + " tx " + tx); }
+      
       class RemoveReferenceRunner extends JDBCTxRunner2
       {
          public Object doTransaction() throws Exception
@@ -2376,7 +2418,6 @@
       map.put("LOAD_REFS",
               "SELECT MESSAGE_ID, DELIVERY_COUNT, SCHED_DELIVERY FROM JBM_MSG_REF WHERE STATE = 'C' "
             + "AND CHANNEL_ID = ? ORDER BY ORD");
-
       map.put("UPDATE_REFS_NOT_PAGED",
               "UPDATE JBM_MSG_REF SET PAGE_ORD = NULL WHERE PAGE_ORD BETWEEN ? AND ? AND CHANNEL_ID=?");
       map.put("SELECT_MIN_MAX_PAGE_ORD",
@@ -2387,6 +2428,8 @@
               "UPDATE JBM_MSG_REF SET DELIVERY_COUNT = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?");
       map.put("UPDATE_CHANNEL_ID",
               "UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ?");
+      map.put("MOVE_REFERENCE",
+              "UPDATE JBM_MSG_REF SET CHANNEL_ID = ? WHERE CHANNEL_ID = ? AND MESSAGE_ID = ?");
 
       // Message
       map.put("LOAD_MESSAGES",

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/ClusterConnectionManager.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -470,7 +470,25 @@
 			
 			if (localQueue.isClustered())
 			{				
-				MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, xa, preserveOrdering);
+			   //Find channel id for remote queue - we need this for doing shared DB optimisation
+			   Collection coll = this.postOffice.getAllBindingsForQueueName(queueName);
+			   Iterator iter = coll.iterator();
+			   long sourceChannelID = -1;
+			   while (iter.hasNext())
+			   {
+			      Binding b = (Binding)iter.next();
+			      if (b.queue.getNodeID() == nodeID)
+			      {
+			         sourceChannelID = b.queue.getChannelID();
+			      }
+			   }
+			   if (sourceChannelID == -1)
+			   {
+			      throw new IllegalArgumentException("Cannot find source channel id");
+			   }
+			   			   
+				MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection,
+				                                         xa, preserveOrdering, sourceChannelID);
 	
 				info.addSucker(sucker);
 				

Modified: branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/src/main/org/jboss/messaging/core/impl/clusterconnection/MessageSucker.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -80,13 +80,15 @@
 	
 	private boolean preserveOrdering;
 	
+	private long sourceChannelID;
+	
 	public String toString()
 	{
 		return "MessageSucker:" + System.identityHashCode(this) + " queue:" + localQueue.getName();
 	}
 			
 	MessageSucker(Queue localQueue, JBossConnection sourceConnection, JBossConnection localConnection,
-			        boolean xa, boolean preserveOrdering)
+			        boolean xa, boolean preserveOrdering, long sourceChannelID)
 	{	
 		if (trace) { log.trace("Creating message sucker, localQueue:" + localQueue + " xa:" + xa + " preserveOrdering:" + preserveOrdering); }
 		
@@ -96,10 +98,16 @@
 		
 		this.localConnection = localConnection;
 		
-		this.xa = xa;
+		//this.xa = xa;
 		
+		//XA is currently disabled for message sucking - this is because JBM 1.4.0 uses shared database so XA is
+		//unnecesary - we can move the ref from one channel to another with a database update
+		this.xa = false;
+		
 		this.preserveOrdering = preserveOrdering;
 		
+		this.sourceChannelID = sourceChannelID;
+		
 		if (xa)
 		{
 			tm = TransactionManagerLocator.getInstance().locate();
@@ -125,8 +133,7 @@
 			JBossSession sess = (JBossSession)sourceConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
 		
 			sourceSession = (SessionDelegate)sess.getDelegate();
-			
-			
+						
 			sess = (JBossSession)localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 			
 			localSession = (SessionDelegate)sess.getDelegate();
@@ -244,6 +251,9 @@
 		
 		try
 		{
+		   /*
+		   Commented out until JBM 2.0 
+		    
 			boolean startTx = xa && msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT;
 			
 			if (startTx)
@@ -263,12 +273,18 @@
 				if (trace) { log.trace("Started JTA transaction"); }
 			}
 			
+         org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+         			
 			if (preserveOrdering)
 			{
 				//Add a header saying we have sucked the message
-				((MessageProxy)msg).getMessage().putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
+				coreMessage.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
 			}
 			
+			//Add a header with the node id of the node we sucked from - this is used on the sending end to do
+			//the move optimisation
+			coreMessage.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, sourceChannelID);
+
 			long timeToLive = msg.getJMSExpiration();
 			if (timeToLive != 0)
 			{
@@ -301,6 +317,39 @@
 				
 				if (trace) { log.trace("Acknowledged message"); }
 			}
+			*/
+
+         org.jboss.messaging.core.contract.Message coreMessage = ((MessageProxy)msg).getMessage();
+                  
+         if (preserveOrdering)
+         {
+            //Add a header saying we have sucked the message
+            coreMessage.putHeader(org.jboss.messaging.core.contract.Message.CLUSTER_SUCKED, "x");
+         }
+         
+         //Add a header with the node id of the node we sucked from - this is used on the sending end to do
+         //the move optimisation
+         coreMessage.putHeader(org.jboss.messaging.core.contract.Message.SOURCE_CHANNEL_ID, sourceChannelID);
+
+         long timeToLive = msg.getJMSExpiration();
+         if (timeToLive != 0)
+         {
+            timeToLive -=  System.currentTimeMillis();
+            if (timeToLive <= 0)
+            {
+               timeToLive = 1; //Should have already expired - set to 1 so it expires when it is consumed or delivered
+            }
+         }
+         
+         //First we ack it - this ack only occurs in memory even if it is a persistent message
+         msg.acknowledge();
+         
+         if (trace) { log.trace("Acknowledged message"); }     
+         
+         //Then we send - this causes the ref to be moved (SQL UPDATE) in the database        
+         producer.send(null, msg, msg.getJMSDeliveryMode(), msg.getJMSPriority(), timeToLive, true);
+         
+         if (trace) { log.trace(this + " forwarded message to queue"); }                      
 		}
 		catch (Exception e)
 		{

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleChannel.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -113,7 +113,14 @@
    {
       throw new NotYetImplementedException();
    }
+   
 
+   public void acknowledgeNoPersist(Delivery d) throws Throwable
+   {
+      throw new NotYetImplementedException();
+   }
+
+
    public void cancel(Delivery d) throws Exception
    {
       throw new NotYetImplementedException();

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/core/SimpleDeliveryObserver.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -56,6 +56,11 @@
          notifyAll();
       }
    }
+   
+   public void acknowledgeNoPersist(Delivery d) throws Throwable
+   {
+      // TODO Auto-generated method stub      
+   }
 
    public synchronized void cancel(Delivery d)
    {

Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueDontUseXATest.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
-
-
-/**
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Jul 2007
- *
- * $Id: $
- *
- */
-public class DistributedQueueDontUseXATest extends DistributedQueueTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public DistributedQueueDontUseXATest(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-   
-   // Package private ---------------------------------------------
-   
-   // protected ----------------------------------------------------
-   
-   protected void setUp() throws Exception
-   {
-      this.overrides = new ServiceAttributeOverrides();
-      
-      overrides.put(ServiceContainer.SERVER_PEER_OBJECT_NAME, "UseXAForMessagePull", "false");
-   	
-      super.setUp();
-   }
-   
-   // private -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-   
-}

Copied: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java (from rev 3301, branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java)
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	                        (rev 0)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -0,0 +1,1199 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * 
+ * A DistributedQueueTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2796 $</tt>
+ *
+ * $Id: DistributedDestinationsTest.java 2796 2007-06-25 22:24:41Z timfox $
+ *
+ */
+public class DistributedQueueTest extends ClusteringTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public DistributedQueueTest(String name)
+   {
+      super(name);
+   }
+
+   // Public --------------------------------------------------------
+   
+   public void testMessagePropertiesPreservedOnSuckPersistent() throws Exception
+   {
+   	this.messagePropertiesPreservedOnSuck(true);
+   }
+   
+   public void testMessagePropertiesPreservedOnSuckNonPersistent() throws Exception
+   {
+   	this.messagePropertiesPreservedOnSuck(false);
+   }
+
+   public void testClusteredQueueNonPersistent() throws Exception
+   {
+      clusteredQueue(false);
+   }
+
+   public void testClusteredQueuePersistent() throws Exception
+   {
+      clusteredQueue(true);
+   }
+   
+   public void testLocalNonPersistent() throws Exception
+   {
+      localQueue(false);
+   }
+
+   public void testLocalPersistent() throws Exception
+   {
+      localQueue(true);
+   }   
+   
+   public void testWithConnectionsOnAllNodesClientAck() throws Exception
+   {
+      Connection conn0 = createConnectionOnServer(cf, 0);
+      
+      Connection conn1 = createConnectionOnServer(cf, 1);
+      
+      Connection conn2 = createConnectionOnServer(cf, 2);
+      
+      try
+      {
+      	conn0.start();
+      	
+      	conn1.start();
+      	
+      	conn2.start();
+      	
+      	//Send a load of messages on node 0
+      	      	         
+      	Session sess0_1 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons0_1 = sess0_1.createConsumer(queue[0]);
+      	
+      	MessageProducer prod0 = sess0_1.createProducer(queue[0]);
+      	
+      	Set msgIds = new HashSet();
+      	
+      	final int numMessages = 60;
+      	      	      	 
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		TextMessage tm = sess0_1.createTextMessage("message-" + i);
+      		
+      		prod0.send(tm);      		
+      	}
+      		
+      	TextMessage tm0_1 = null;
+      	
+      	for (int i = 0; i < numMessages / 6; i++)
+      	{
+      		tm0_1 = (TextMessage)cons0_1.receive(5000);
+      		
+      		assertNotNull(tm0_1);
+      		
+      		msgIds.add(tm0_1.getText());
+      	}
+      	
+      	tm0_1.acknowledge();
+      	
+      	cons0_1.close();
+      	
+      	Session sess0_2 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons0_2 = sess0_2.createConsumer(queue[0]);
+      	
+      	TextMessage tm0_2 = null;
+      	
+      	for (int i = 0; i < numMessages / 6; i++)
+      	{
+      		tm0_2 = (TextMessage)cons0_2.receive(5000);
+      		
+      		assertNotNull(tm0_2);
+      		
+      		msgIds.add(tm0_2.getText());
+      	}
+      	
+      	tm0_2.acknowledge();
+      	
+      	cons0_2.close();
+      	
+      	
+      	//Two on node 1
+      	
+      	Session sess1_1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons1_1 = sess1_1.createConsumer(queue[1]);      	
+      	
+      	TextMessage tm1_1 = null;
+      	
+      	for (int i = 0; i < numMessages / 6; i++)
+      	{
+      		tm1_1 = (TextMessage)cons1_1.receive(5000);
+      		
+      		assertNotNull(tm1_1);
+      		
+      		msgIds.add(tm1_1.getText());
+      	}
+      	
+      	tm1_1.acknowledge();
+      	
+      	cons1_1.close();
+     
+      	Session sess1_2 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons1_2 = sess1_2.createConsumer(queue[1]);
+      	
+      	TextMessage tm1_2 = null;
+      	
+      	for (int i = 0; i < numMessages / 6; i++)
+      	{
+      		tm1_2 = (TextMessage)cons1_2.receive(5000);
+      		      		      		
+      		assertNotNull(tm1_2);
+      		
+      		msgIds.add(tm1_2.getText());
+      	}
+      	
+      	tm1_2.acknowledge();
+      	
+      	cons1_2.close();
+      	
+      	
+      	//Two on node 2
+      	
+      	Session sess2_1 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons2_1 = sess2_1.createConsumer(queue[2]);
+      	
+      	TextMessage tm2_1 = null;
+      	
+      	for (int i = 0; i < numMessages / 6; i++)
+      	{
+      		tm2_1 = (TextMessage)cons2_1.receive(5000);
+      		
+      		assertNotNull(tm2_1);
+      		
+      		msgIds.add(tm2_1.getText());
+      	}
+      	
+      	tm2_1.acknowledge();
+      	
+      	cons2_1.close();
+      	
+      	Session sess2_2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+      	
+      	MessageConsumer cons2_2 = sess2_2.createConsumer(queue[2]);
+      	
+      	TextMessage tm2_2 = null;
+      	
+      	for (int i = 0; i < numMessages / 6; i++)
+      	{
+      		tm2_2 = (TextMessage)cons2_2.receive(5000);
+      		
+      		assertNotNull(tm2_2);
+      		
+      		msgIds.add(tm2_2.getText());
+      	}
+      	
+      	tm2_2.acknowledge();
+      	
+      	cons2_2.close();
+      	
+      	assertEquals(numMessages, msgIds.size());
+      	
+      	for (int i = 0; i < numMessages; i++)
+      	{
+      		assertTrue(msgIds.contains("message-" + i));
+      	}      	      
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+         
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   public void testMixedSuck() throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+         
+         conn0.start();
+         conn2.start();
+
+         final int NUM_MESSAGES = 300;
+
+         
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+         MessageProducer prod2 = sess2.createProducer(queue[2]);
+
+         //Send more messages at node 0 and node 2
+         
+         boolean persistent = false;
+         for (int i = 0; i < NUM_MESSAGES / 2 ; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message4-" + i);
+
+            prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+            
+            prod0.send(tm);
+            
+            persistent = !persistent;
+         }
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message4-" + i);
+
+            prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                        
+            prod2.send(tm);
+            
+            persistent = !persistent;
+         }
+         
+         //consume them on node 2 - we will get messages from both nodes so the order is undefined
+         
+         Set msgs = new HashSet();
+         
+         TextMessage tm = null;
+         
+         do
+         {
+            tm = (TextMessage)cons2.receive(5000);
+            
+            if (tm != null)
+            {                     
+	            msgs.add(tm.getText());
+            }
+         }           
+         while (tm != null);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+         	assertTrue(msgs.contains("message4-" + i));
+         }
+         
+         assertEquals(NUM_MESSAGES, msgs.size());
+                
+         cons2.close();
+         
+         sess2.close();
+         
+         sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         cons2 = sess2.createConsumer(queue[2]);
+                  
+         Message msg = cons2.receive(5000);
+         
+         assertNull(msg);                            
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+
+   // Package private ---------------------------------------------
+   
+   // protected ----------------------------------------------------
+   
+   protected void setUp() throws Exception
+   {
+      nodeCount = 3;
+
+      super.setUp();
+   }
+
+   // private -----------------------------------------------------
+
+
+   private void clusteredQueue(boolean persistent) throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+         
+         conn0.start();
+         conn1.start();
+         conn2.start();
+
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message0-" + i);
+
+            prod0.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons0.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message0-" + i, tm.getText());
+         }                 
+
+         Message m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+
+         // Send at node 1
+
+         MessageProducer prod1 = sess1.createProducer(queue[1]);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message1-" + i);
+
+            prod1.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message1-" + i, tm.getText());
+         }
+
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+
+         // Send at node 2
+         
+         MessageProducer prod2 = sess2.createProducer(queue[2]);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message2-" + i);
+
+            prod2.send(tm);
+         }
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+
+            assertEquals("message2-" + i, tm.getText());
+         }
+
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         m = cons1.receive(2000);
+
+         assertNull(m);
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         
+         //Now close the consumers at node 0 and node 1
+         
+         cons0.close();
+         
+         cons1.close();
+         
+         //Send more messages at node 0
+
+         String messageIdCorrelate[] = new String[NUM_MESSAGES];
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message3-" + i);
+
+            prod0.send(tm);
+
+            messageIdCorrelate[i] = tm.getJMSMessageID();
+
+            log.info("SetID[" + i + "]=" + tm.getJMSMessageID());
+
+         }
+              
+         // consume them on node2
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+            assertEquals(messageIdCorrelate[i], tm.getJMSMessageID());
+
+            assertEquals("message3-" + i, tm.getText());
+         }                 
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         //Send more messages at node 0 and node 1
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message4-" + i);
+
+            prod0.send(tm);
+         }
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message4-" + i);
+
+            prod2.send(tm);
+         }
+         
+         //consume them on node 2 - we will get messages from both nodes so the order is undefined
+         
+         Set msgs = new HashSet();
+         
+         TextMessage tm = null;
+         
+         do
+         {
+            tm = (TextMessage)cons2.receive(1000);
+            
+            if (tm != null)
+            {                     
+	            msgs.add(tm.getText());
+            }
+         }           
+         while (tm != null);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+         	assertTrue(msgs.contains("message4-" + i));
+         }
+         
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
+         msgs.clear();
+         
+         // Now repeat but this time creating the consumer after send
+         
+         cons2.close();
+         
+         //	Send more messages at node 0 and node 1
+         
+         for (int i = 0; i < NUM_MESSAGES / 2; i++)
+         {
+            tm = sess0.createTextMessage("message5-" + i);
+
+            prod0.send(tm);
+         }
+         
+         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
+         {
+            tm = sess1.createTextMessage("message5-" + i);
+
+            prod2.send(tm);
+         }
+         
+         cons2 = sess2.createConsumer(queue[2]);
+         
+         //consume them on node 2 - we will get messages from both nodes so the order is undefined
+         
+         msgs = new HashSet();
+         
+         do
+         {
+            tm = (TextMessage)cons2.receive(1000);
+            
+            if (tm != null)
+            {            
+	            msgs.add(tm.getText());
+            }
+         }     
+         while (tm != null);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+         	assertTrue(msgs.contains("message5-" + i));
+         }
+         
+         assertEquals(NUM_MESSAGES, msgs.size());
+         
+         msgs.clear();
+         
+         
+         //Now send messages at node 0 - but consume from node 1 AND node 2
+         
+         //order is undefined
+         
+         cons2.close();
+         
+         cons1 = sess1.createConsumer(queue[1]);
+         
+         cons2 = sess2.createConsumer(queue[2]);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = sess0.createTextMessage("message6-" + i);
+
+            prod0.send(tm);
+         }
+         
+         msgs = new HashSet();
+         
+         int count = 0;
+         
+         do
+         {
+            tm = (TextMessage)cons1.receive(1000);
+            
+            if (tm != null)
+            {                
+	            msgs.add(tm.getText());
+	            
+	            count++;
+            }
+         }
+         while (tm != null);
+         
+         do
+         {
+            tm = (TextMessage)cons2.receive(1000);
+            
+            if (tm != null)
+            {            
+	            msgs.add(tm.getText());
+	            
+	            count++;
+            }
+         } 
+         while (tm != null);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+         	assertTrue(msgs.contains("message6-" + i));
+         }
+         
+         assertEquals(NUM_MESSAGES, count);
+         
+         msgs.clear();
+         
+         //as above but start consumers AFTER sending
+         
+         cons1.close();
+         
+         cons2.close();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = sess0.createTextMessage("message7-" + i);
+
+            prod0.send(tm);
+         }
+         
+         cons1 = sess1.createConsumer(queue[1]);
+         
+         cons2 = sess2.createConsumer(queue[2]);
+         
+         
+         msgs = new HashSet();
+         
+         count = 0;
+         
+         do
+         {
+            tm = (TextMessage)cons1.receive(1000);
+            
+            if (tm != null)
+            {            
+	            msgs.add(tm.getText());
+	            
+	            count++;
+            }
+         }
+         while (tm != null);
+         
+         do
+         {
+            tm = (TextMessage)cons2.receive(1000);
+            
+            if (tm != null)
+            {
+	            msgs.add(tm.getText());
+	            
+	            count++;
+            }
+         } 
+         while (tm != null);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+         	assertTrue(msgs.contains("message7-" + i));
+         }
+         
+         assertEquals(NUM_MESSAGES, count);         
+         
+         msgs.clear();
+         
+         
+         // Now send message on node 0, consume on node2, then cancel, consume on node1, cancel, consume on node 0
+         
+         cons1.close();
+         
+         cons2.close();
+         
+         sess2.close();
+         
+         sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         cons2 = sess2.createConsumer(queue[2]);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = sess0.createTextMessage("message8-" + i);
+
+            prod0.send(tm);
+         }
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = (TextMessage)cons2.receive(1000);
+            
+            assertNotNull(tm);
+               
+            assertEquals("message8-" + i, tm.getText());
+         } 
+         
+         sess2.close(); // messages should go back on queue
+         
+         //Now try on node 1
+         
+         sess1.close();
+         
+         sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         
+         cons1 = sess1.createConsumer(queue[1]);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = (TextMessage)cons1.receive(1000);
+            
+            assertNotNull(tm);
+               
+            assertEquals("message8-" + i, tm.getText());
+         } 
+         
+         sess1.close(); // messages should go back on queue
+         
+         //Now try on node 0
+         
+         cons0 = sess0.createConsumer(queue[0]);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            tm = (TextMessage)cons0.receive(1000);
+            
+            assertNotNull(tm);
+               
+            assertEquals("message8-" + i, tm.getText());
+         }     
+         
+         Message msg = cons0.receive(5000);
+         
+         assertNull(msg);                          
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   private void messagePropertiesPreservedOnSuck(boolean persistent) throws Exception
+   {
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+
+      try
+      {
+
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
+         
+         conn0.start();
+         conn2.start();
+
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+         
+
+
+         TextMessage tm = sess0.createTextMessage("blahmessage");
+            
+         prod0.setPriority(7);
+         
+         prod0.setTimeToLive(1 * 60 * 60 * 1000);
+
+         prod0.send(tm);
+         
+         long expiration = tm.getJMSExpiration();
+         
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+                         
+
+         tm = (TextMessage)cons2.receive(1000);
+         
+         assertNotNull(tm);
+         
+         assertEquals("blahmessage", tm.getText());
+
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+         assertEquals(7, tm.getJMSPriority());
+        
+         assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
+                  
+         Message m = cons2.receive(5000);
+         
+         assertNull(m);
+         
+         
+         //Now do one with expiration = 0
+         
+         
+         tm = sess0.createTextMessage("blahmessage2");
+         
+         prod0.setPriority(7);
+         
+         prod0.setTimeToLive(0);
+
+         prod0.send(tm);
+         
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+                         
+
+         tm = (TextMessage)cons2.receive(1000);
+         
+         assertNotNull(tm);
+         
+         assertEquals("blahmessage2", tm.getText());
+
+         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
+         
+         assertEquals(7, tm.getJMSPriority());
+        
+         assertEquals(0, tm.getJMSExpiration());
+                  
+         m = cons2.receive(5000);
+         
+         assertNull(m);                          
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   
+   /* Check that non clustered queues behave properly when deployed on a cluster */
+   private void localQueue(boolean persistent) throws Exception
+   {
+   	Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+      
+      //Deploy three non clustered queues with same name on different nodes
+          
+      try
+      {
+         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 0, false);
+         
+         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 1, false);
+         
+         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 2, false);
+         
+         Queue queue0 = (Queue)ic[0].lookup("/nonClusteredQueue");
+         Queue queue1 = (Queue)ic[1].lookup("/nonClusteredQueue");
+         Queue queue2 = (Queue)ic[2].lookup("/nonClusteredQueue");
+      	
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn0 = this.createConnectionOnServer(cf, 0);
+         conn1 = this.createConnectionOnServer(cf, 1);
+         conn2 = this.createConnectionOnServer(cf, 2);
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn0.start();
+         conn1.start();
+         conn2.start();
+
+         // ==============
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue0);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message" + i);
+
+            prod0.send(tm);
+         }
+         
+         // Try and consume at node 1
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         
+         Message m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         //And at node 2
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+         
+         // Now consume at node 0
+         
+         MessageConsumer cons0 = sess0.createConsumer(queue0);
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons0.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         // ==============
+         // Send at node 1
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod1.send(tm);
+         }
+         
+         // Try and consume at node 0
+         
+         cons0 = sess0.createConsumer(queue0);
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         //And at node 2
+         
+         cons2 = sess2.createConsumer(queue2);
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+         
+         // Now consume at node 1
+         
+         cons1 = sess1.createConsumer(queue1);
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         // ==============
+         // Send at node 2
+
+         MessageProducer prod2 = sess2.createProducer(queue2);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+
+            prod2.send(tm);
+         }
+         
+         // Try and consume at node 0
+         
+         cons0 = sess0.createConsumer(queue0);
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         //And at node 1
+         
+         cons1 = sess1.createConsumer(queue1);
+         
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         // Now consume at node 2
+         
+         cons2 = sess2.createConsumer(queue2);
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+           
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+         
+         ServerManagement.undeployQueue("nonClusteredQueue", 0);
+         
+         ServerManagement.undeployQueue("nonClusteredQueue", 1);
+         
+         ServerManagement.undeployQueue("nonClusteredQueue", 2);
+      }
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   
+}

Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTestBase.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,1202 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import javax.jms.Connection;
-import javax.jms.DeliveryMode;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.jms.message.JBossMessage;
-import org.jboss.jms.message.TextMessageProxy;
-
-
-/**
- * 
- * A DistributedQueueTest
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 2796 $</tt>
- *
- * $Id: DistributedDestinationsTest.java 2796 2007-06-25 22:24:41Z timfox $
- *
- */
-public abstract class DistributedQueueTestBase extends ClusteringTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public DistributedQueueTestBase(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-   
-   public void testMessagePropertiesPreservedOnSuckPersistent() throws Exception
-   {
-   	this.messagePropertiesPreservedOnSuck(true);
-   }
-   
-   public void testMessagePropertiesPreservedOnSuckNonPersistent() throws Exception
-   {
-   	this.messagePropertiesPreservedOnSuck(false);
-   }
-
-   public void testClusteredQueueNonPersistent() throws Exception
-   {
-      clusteredQueue(false);
-   }
-
-   public void testClusteredQueuePersistent() throws Exception
-   {
-      clusteredQueue(true);
-   }
-   
-   public void testLocalNonPersistent() throws Exception
-   {
-      localQueue(false);
-   }
-
-   public void testLocalPersistent() throws Exception
-   {
-      localQueue(true);
-   }   
-   
-   public void testWithConnectionsOnAllNodesClientAck() throws Exception
-   {
-      Connection conn0 = createConnectionOnServer(cf, 0);
-      
-      Connection conn1 = createConnectionOnServer(cf, 1);
-      
-      Connection conn2 = createConnectionOnServer(cf, 2);
-      
-      try
-      {
-      	conn0.start();
-      	
-      	conn1.start();
-      	
-      	conn2.start();
-      	
-      	//Send a load of messages on node 0
-      	      	         
-      	Session sess0_1 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      	
-      	MessageConsumer cons0_1 = sess0_1.createConsumer(queue[0]);
-      	
-      	MessageProducer prod0 = sess0_1.createProducer(queue[0]);
-      	
-      	Set msgIds = new HashSet();
-      	
-      	final int numMessages = 60;
-      	      	      	 
-      	for (int i = 0; i < numMessages; i++)
-      	{
-      		TextMessage tm = sess0_1.createTextMessage("message-" + i);
-      		
-      		prod0.send(tm);      		
-      	}
-      		
-      	TextMessage tm0_1 = null;
-      	
-      	for (int i = 0; i < numMessages / 6; i++)
-      	{
-      		tm0_1 = (TextMessage)cons0_1.receive(5000);
-      		
-      		assertNotNull(tm0_1);
-      		
-      		msgIds.add(tm0_1.getText());
-      	}
-      	
-      	tm0_1.acknowledge();
-      	
-      	cons0_1.close();
-      	
-      	Session sess0_2 = conn0.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      	
-      	MessageConsumer cons0_2 = sess0_2.createConsumer(queue[0]);
-      	
-      	TextMessage tm0_2 = null;
-      	
-      	for (int i = 0; i < numMessages / 6; i++)
-      	{
-      		tm0_2 = (TextMessage)cons0_2.receive(5000);
-      		
-      		assertNotNull(tm0_2);
-      		
-      		msgIds.add(tm0_2.getText());
-      	}
-      	
-      	tm0_2.acknowledge();
-      	
-      	cons0_2.close();
-      	
-      	
-      	//Two on node 1
-      	
-      	Session sess1_1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      	
-      	MessageConsumer cons1_1 = sess1_1.createConsumer(queue[1]);      	
-      	
-      	TextMessage tm1_1 = null;
-      	
-      	for (int i = 0; i < numMessages / 6; i++)
-      	{
-      		tm1_1 = (TextMessage)cons1_1.receive(5000);
-      		
-      		assertNotNull(tm1_1);
-      		
-      		msgIds.add(tm1_1.getText());
-      	}
-      	
-      	tm1_1.acknowledge();
-      	
-      	cons1_1.close();
-     
-      	Session sess1_2 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      	
-      	MessageConsumer cons1_2 = sess1_2.createConsumer(queue[1]);
-      	
-      	TextMessage tm1_2 = null;
-      	
-      	for (int i = 0; i < numMessages / 6; i++)
-      	{
-      		tm1_2 = (TextMessage)cons1_2.receive(5000);
-      		      		      		
-      		assertNotNull(tm1_2);
-      		
-      		msgIds.add(tm1_2.getText());
-      	}
-      	
-      	tm1_2.acknowledge();
-      	
-      	cons1_2.close();
-      	
-      	
-      	//Two on node 2
-      	
-      	Session sess2_1 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      	
-      	MessageConsumer cons2_1 = sess2_1.createConsumer(queue[2]);
-      	
-      	TextMessage tm2_1 = null;
-      	
-      	for (int i = 0; i < numMessages / 6; i++)
-      	{
-      		tm2_1 = (TextMessage)cons2_1.receive(5000);
-      		
-      		assertNotNull(tm2_1);
-      		
-      		msgIds.add(tm2_1.getText());
-      	}
-      	
-      	tm2_1.acknowledge();
-      	
-      	cons2_1.close();
-      	
-      	Session sess2_2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-      	
-      	MessageConsumer cons2_2 = sess2_2.createConsumer(queue[2]);
-      	
-      	TextMessage tm2_2 = null;
-      	
-      	for (int i = 0; i < numMessages / 6; i++)
-      	{
-      		tm2_2 = (TextMessage)cons2_2.receive(5000);
-      		
-      		assertNotNull(tm2_2);
-      		
-      		msgIds.add(tm2_2.getText());
-      	}
-      	
-      	tm2_2.acknowledge();
-      	
-      	cons2_2.close();
-      	
-      	assertEquals(numMessages, msgIds.size());
-      	
-      	for (int i = 0; i < numMessages; i++)
-      	{
-      		assertTrue(msgIds.contains("message-" + i));
-      	}      	      
-      }
-      finally
-      {
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-         
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-         
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-      }
-   }
-   
-   public void testMixedSuck() throws Exception
-   {
-      Connection conn0 = null;
-      Connection conn1 = null;
-      Connection conn2 = null;
-
-      try
-      {
-
-         conn0 = this.createConnectionOnServer(cf, 0);
-         conn1 = this.createConnectionOnServer(cf, 1);
-         conn2 = this.createConnectionOnServer(cf, 2);
-         
-         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
-         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
-         
-         conn0.start();
-         conn2.start();
-
-         final int NUM_MESSAGES = 300;
-
-         
-         // Send at node 0
-
-         MessageProducer prod0 = sess0.createProducer(queue[0]);
-
-         MessageProducer prod2 = sess2.createProducer(queue[2]);
-
-         //Send more messages at node 0 and node 2
-         
-         boolean persistent = false;
-         for (int i = 0; i < NUM_MESSAGES / 2 ; i++)
-         {
-            TextMessage tm = sess0.createTextMessage("message4-" + i);
-
-            prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-            
-            prod0.send(tm);
-            
-            persistent = !persistent;
-         }
-         
-         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message4-" + i);
-
-            prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-                        
-            prod2.send(tm);
-            
-            persistent = !persistent;
-         }
-         
-         //consume them on node 2 - we will get messages from both nodes so the order is undefined
-         
-         Set msgs = new HashSet();
-         
-         TextMessage tm = null;
-         
-         do
-         {
-            tm = (TextMessage)cons2.receive(5000);
-            
-            if (tm != null)
-            {                     
-	            msgs.add(tm.getText());
-            }
-         }           
-         while (tm != null);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-         	assertTrue(msgs.contains("message4-" + i));
-         }
-         
-         assertEquals(NUM_MESSAGES, msgs.size());
-                
-         cons2.close();
-         
-         sess2.close();
-         
-         sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         
-         cons2 = sess2.createConsumer(queue[2]);
-                  
-         Message msg = cons2.receive(5000);
-         
-         assertNull(msg);                            
-      }
-      finally
-      {
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-      }
-   }
-
-   // Package private ---------------------------------------------
-   
-   // protected ----------------------------------------------------
-   
-   protected void setUp() throws Exception
-   {
-      nodeCount = 3;
-
-      super.setUp();
-   }
-
-   // private -----------------------------------------------------
-
-
-   private void clusteredQueue(boolean persistent) throws Exception
-   {
-      Connection conn0 = null;
-      Connection conn1 = null;
-      Connection conn2 = null;
-
-      try
-      {
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn0 = this.createConnectionOnServer(cf, 0);
-         conn1 = this.createConnectionOnServer(cf, 1);
-         conn2 = this.createConnectionOnServer(cf, 2);
-         
-         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
-         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons0 = sess0.createConsumer(queue[0]);
-         MessageConsumer cons1 = sess1.createConsumer(queue[1]);
-         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
-         
-         conn0.start();
-         conn1.start();
-         conn2.start();
-
-         // Send at node 0
-
-         MessageProducer prod0 = sess0.createProducer(queue[0]);
-
-         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess0.createTextMessage("message0-" + i);
-
-            prod0.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons0.receive(1000);
-
-            assertNotNull(tm);
-            
-            assertEquals("message0-" + i, tm.getText());
-         }                 
-
-         Message m = cons0.receive(2000);
-
-         assertNull(m);
-         
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-
-         // Send at node 1
-
-         MessageProducer prod1 = sess1.createProducer(queue[1]);
-
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message1-" + i);
-
-            prod1.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message1-" + i, tm.getText());
-         }
-
-         m = cons0.receive(2000);
-
-         assertNull(m);
-         
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-
-         // Send at node 2
-         
-         MessageProducer prod2 = sess2.createProducer(queue[2]);
-
-         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message2-" + i);
-
-            prod2.send(tm);
-         }
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-
-            assertEquals("message2-" + i, tm.getText());
-         }
-
-         m = cons0.receive(2000);
-
-         assertNull(m);
-         
-         m = cons1.receive(2000);
-
-         assertNull(m);
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-         
-         
-         //Now close the consumers at node 0 and node 1
-         
-         cons0.close();
-         
-         cons1.close();
-         
-         //Send more messages at node 0
-
-         String messageIdCorrelate[] = new String[NUM_MESSAGES];
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess0.createTextMessage("message3-" + i);
-
-            prod0.send(tm);
-
-            messageIdCorrelate[i] = tm.getJMSMessageID();
-
-            log.info("SetID[" + i + "]=" + tm.getJMSMessageID());
-
-         }
-              
-         // consume them on node2
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-            assertEquals(messageIdCorrelate[i], tm.getJMSMessageID());
-
-            assertEquals("message3-" + i, tm.getText());
-         }                 
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-         
-         //Send more messages at node 0 and node 1
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            TextMessage tm = sess0.createTextMessage("message4-" + i);
-
-            prod0.send(tm);
-         }
-         
-         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message4-" + i);
-
-            prod2.send(tm);
-         }
-         
-         //consume them on node 2 - we will get messages from both nodes so the order is undefined
-         
-         Set msgs = new HashSet();
-         
-         TextMessage tm = null;
-         
-         do
-         {
-            tm = (TextMessage)cons2.receive(1000);
-            
-            if (tm != null)
-            {                     
-	            msgs.add(tm.getText());
-            }
-         }           
-         while (tm != null);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-         	assertTrue(msgs.contains("message4-" + i));
-         }
-         
-         assertEquals(NUM_MESSAGES, msgs.size());
-         
-         msgs.clear();
-         
-         // Now repeat but this time creating the consumer after send
-         
-         cons2.close();
-         
-         //	Send more messages at node 0 and node 1
-         
-         for (int i = 0; i < NUM_MESSAGES / 2; i++)
-         {
-            tm = sess0.createTextMessage("message5-" + i);
-
-            prod0.send(tm);
-         }
-         
-         for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++)
-         {
-            tm = sess1.createTextMessage("message5-" + i);
-
-            prod2.send(tm);
-         }
-         
-         cons2 = sess2.createConsumer(queue[2]);
-         
-         //consume them on node 2 - we will get messages from both nodes so the order is undefined
-         
-         msgs = new HashSet();
-         
-         do
-         {
-            tm = (TextMessage)cons2.receive(1000);
-            
-            if (tm != null)
-            {            
-	            msgs.add(tm.getText());
-            }
-         }     
-         while (tm != null);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-         	assertTrue(msgs.contains("message5-" + i));
-         }
-         
-         assertEquals(NUM_MESSAGES, msgs.size());
-         
-         msgs.clear();
-         
-         
-         //Now send messages at node 0 - but consume from node 1 AND node 2
-         
-         //order is undefined
-         
-         cons2.close();
-         
-         cons1 = sess1.createConsumer(queue[1]);
-         
-         cons2 = sess2.createConsumer(queue[2]);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            tm = sess0.createTextMessage("message6-" + i);
-
-            prod0.send(tm);
-         }
-         
-         msgs = new HashSet();
-         
-         int count = 0;
-         
-         do
-         {
-            tm = (TextMessage)cons1.receive(1000);
-            
-            if (tm != null)
-            {                
-	            msgs.add(tm.getText());
-	            
-	            count++;
-            }
-         }
-         while (tm != null);
-         
-         do
-         {
-            tm = (TextMessage)cons2.receive(1000);
-            
-            if (tm != null)
-            {            
-	            msgs.add(tm.getText());
-	            
-	            count++;
-            }
-         } 
-         while (tm != null);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-         	assertTrue(msgs.contains("message6-" + i));
-         }
-         
-         assertEquals(NUM_MESSAGES, count);
-         
-         msgs.clear();
-         
-         //as above but start consumers AFTER sending
-         
-         cons1.close();
-         
-         cons2.close();
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            tm = sess0.createTextMessage("message7-" + i);
-
-            prod0.send(tm);
-         }
-         
-         cons1 = sess1.createConsumer(queue[1]);
-         
-         cons2 = sess2.createConsumer(queue[2]);
-         
-         
-         msgs = new HashSet();
-         
-         count = 0;
-         
-         do
-         {
-            tm = (TextMessage)cons1.receive(1000);
-            
-            if (tm != null)
-            {            
-	            msgs.add(tm.getText());
-	            
-	            count++;
-            }
-         }
-         while (tm != null);
-         
-         do
-         {
-            tm = (TextMessage)cons2.receive(1000);
-            
-            if (tm != null)
-            {
-	            msgs.add(tm.getText());
-	            
-	            count++;
-            }
-         } 
-         while (tm != null);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-         	assertTrue(msgs.contains("message7-" + i));
-         }
-         
-         assertEquals(NUM_MESSAGES, count);         
-         
-         msgs.clear();
-         
-         
-         // Now send message on node 0, consume on node2, then cancel, consume on node1, cancel, consume on node 0
-         
-         cons1.close();
-         
-         cons2.close();
-         
-         sess2.close();
-         
-         sess2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         
-         cons2 = sess2.createConsumer(queue[2]);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            tm = sess0.createTextMessage("message8-" + i);
-
-            prod0.send(tm);
-         }
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            tm = (TextMessage)cons2.receive(1000);
-            
-            assertNotNull(tm);
-               
-            assertEquals("message8-" + i, tm.getText());
-         } 
-         
-         sess2.close(); // messages should go back on queue
-         
-         //Now try on node 1
-         
-         sess1.close();
-         
-         sess1 = conn1.createSession(false, Session.CLIENT_ACKNOWLEDGE);
-         
-         cons1 = sess1.createConsumer(queue[1]);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            tm = (TextMessage)cons1.receive(1000);
-            
-            assertNotNull(tm);
-               
-            assertEquals("message8-" + i, tm.getText());
-         } 
-         
-         sess1.close(); // messages should go back on queue
-         
-         //Now try on node 0
-         
-         cons0 = sess0.createConsumer(queue[0]);
-         
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            tm = (TextMessage)cons0.receive(1000);
-            
-            assertNotNull(tm);
-               
-            assertEquals("message8-" + i, tm.getText());
-         }     
-         
-         Message msg = cons0.receive(5000);
-         
-         assertNull(msg);                          
-      }
-      finally
-      {
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-      }
-   }
-   
-   private void messagePropertiesPreservedOnSuck(boolean persistent) throws Exception
-   {
-      Connection conn0 = null;
-      Connection conn1 = null;
-      Connection conn2 = null;
-
-      try
-      {
-
-         conn0 = this.createConnectionOnServer(cf, 0);
-         conn1 = this.createConnectionOnServer(cf, 1);
-         conn2 = this.createConnectionOnServer(cf, 2);
-         
-         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
-         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue[2]);
-         
-         conn0.start();
-         conn2.start();
-
-         // Send at node 0
-
-         MessageProducer prod0 = sess0.createProducer(queue[0]);
-
-         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-         
-
-
-         TextMessage tm = sess0.createTextMessage("blahmessage");
-            
-         prod0.setPriority(7);
-         
-         prod0.setTimeToLive(1 * 60 * 60 * 1000);
-
-         prod0.send(tm);
-         
-         long expiration = tm.getJMSExpiration();
-         
-         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-         
-                         
-
-         tm = (TextMessage)cons2.receive(1000);
-         
-         assertNotNull(tm);
-         
-         assertEquals("blahmessage", tm.getText());
-
-         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-         
-         assertEquals(7, tm.getJMSPriority());
-        
-         assertTrue(Math.abs(expiration - tm.getJMSExpiration()) < 100);
-                  
-         Message m = cons2.receive(5000);
-         
-         assertNull(m);
-         
-         
-         //Now do one with expiration = 0
-         
-         
-         tm = sess0.createTextMessage("blahmessage2");
-         
-         prod0.setPriority(7);
-         
-         prod0.setTimeToLive(0);
-
-         prod0.send(tm);
-         
-         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-         
-                         
-
-         tm = (TextMessage)cons2.receive(1000);
-         
-         assertNotNull(tm);
-         
-         assertEquals("blahmessage2", tm.getText());
-
-         assertEquals(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT, tm.getJMSDeliveryMode());
-         
-         assertEquals(7, tm.getJMSPriority());
-        
-         assertEquals(0, tm.getJMSExpiration());
-                  
-         m = cons2.receive(5000);
-         
-         assertNull(m);                          
-      }
-      finally
-      {
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-      }
-   }
-   
-   
-   /* Check that non clustered queues behave properly when deployed on a cluster */
-   private void localQueue(boolean persistent) throws Exception
-   {
-   	Connection conn0 = null;
-      Connection conn1 = null;
-      Connection conn2 = null;
-      
-      //Deploy three non clustered queues with same name on different nodes
-          
-      try
-      {
-         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 0, false);
-         
-         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 1, false);
-         
-         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 2, false);
-         
-         Queue queue0 = (Queue)ic[0].lookup("/nonClusteredQueue");
-         Queue queue1 = (Queue)ic[1].lookup("/nonClusteredQueue");
-         Queue queue2 = (Queue)ic[2].lookup("/nonClusteredQueue");
-      	
-         //This will create 3 different connection on 3 different nodes, since
-         //the cf is clustered
-         conn0 = this.createConnectionOnServer(cf, 0);
-         conn1 = this.createConnectionOnServer(cf, 1);
-         conn2 = this.createConnectionOnServer(cf, 2);
-         
-         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
-
-         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-         
-         conn0.start();
-         conn1.start();
-         conn2.start();
-
-         // ==============
-         // Send at node 0
-
-         MessageProducer prod0 = sess0.createProducer(queue0);
-
-         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         final int NUM_MESSAGES = 100;
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess0.createTextMessage("message" + i);
-
-            prod0.send(tm);
-         }
-         
-         // Try and consume at node 1
-         
-         MessageConsumer cons1 = sess1.createConsumer(queue1);
-         
-         Message m = cons1.receive(2000);
-
-         assertNull(m);
-         
-         cons1.close();
-         
-         //And at node 2
-         
-         MessageConsumer cons2 = sess2.createConsumer(queue2);
-         
-         m = cons2.receive(2000);
-
-         assertNull(m);
-         
-         cons2.close();
-         
-         // Now consume at node 0
-         
-         MessageConsumer cons0 = sess0.createConsumer(queue0);
-          
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons0.receive(1000);
-
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }                 
-
-         m = cons0.receive(2000);
-
-         assertNull(m);
-         
-         cons0.close();
-         
-         // ==============
-         // Send at node 1
-
-         MessageProducer prod1 = sess1.createProducer(queue1);
-
-         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess1.createTextMessage("message" + i);
-
-            prod1.send(tm);
-         }
-         
-         // Try and consume at node 0
-         
-         cons0 = sess0.createConsumer(queue0);
-         
-         m = cons0.receive(2000);
-
-         assertNull(m);
-         
-         cons0.close();
-         
-         //And at node 2
-         
-         cons2 = sess2.createConsumer(queue2);
-         
-         m = cons2.receive(2000);
-
-         assertNull(m);
-         
-         cons2.close();
-         
-         // Now consume at node 1
-         
-         cons1 = sess1.createConsumer(queue1);
-          
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons1.receive(1000);
-
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }                 
-
-         m = cons1.receive(2000);
-
-         assertNull(m);
-         
-         cons1.close();
-         
-         // ==============
-         // Send at node 2
-
-         MessageProducer prod2 = sess2.createProducer(queue2);
-
-         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = sess2.createTextMessage("message" + i);
-
-            prod2.send(tm);
-         }
-         
-         // Try and consume at node 0
-         
-         cons0 = sess0.createConsumer(queue0);
-         
-         m = cons0.receive(2000);
-
-         assertNull(m);
-         
-         cons0.close();
-         
-         //And at node 1
-         
-         cons1 = sess1.createConsumer(queue1);
-         
-         m = cons1.receive(2000);
-
-         assertNull(m);
-         
-         cons1.close();
-         
-         // Now consume at node 2
-         
-         cons2 = sess2.createConsumer(queue2);
-          
-         for (int i = 0; i < NUM_MESSAGES; i++)
-         {
-            TextMessage tm = (TextMessage)cons2.receive(1000);
-
-            assertNotNull(tm);
-            
-            assertEquals("message" + i, tm.getText());
-         }                 
-
-         m = cons2.receive(2000);
-
-         assertNull(m);
-         
-         cons2.close();
-           
-      }
-      finally
-      {
-         if (conn0 != null)
-         {
-            conn0.close();
-         }
-
-         if (conn1 != null)
-         {
-            conn1.close();
-         }
-
-         if (conn2 != null)
-         {
-            conn2.close();
-         }
-         
-         ServerManagement.undeployQueue("nonClusteredQueue", 0);
-         
-         ServerManagement.undeployQueue("nonClusteredQueue", 1);
-         
-         ServerManagement.undeployQueue("nonClusteredQueue", 2);
-      }
-   }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-   
-}

Deleted: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java	2007-11-16 09:32:34 UTC (rev 3336)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueUseXATest.java	2007-11-16 12:22:58 UTC (rev 3337)
@@ -1,73 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.messaging.jms.clustering;
-
-import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
-
-/**
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: $</tt>10 Jul 2007
- *
- * $Id: $
- *
- */
-public class DistributedQueueUseXATest extends DistributedQueueTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   public DistributedQueueUseXATest(String name)
-   {
-      super(name);
-   }
-
-   // Public --------------------------------------------------------
-   
-
-   // Package private ---------------------------------------------
-   
-   // protected ----------------------------------------------------
-   
-   protected void setUp() throws Exception
-   {
-      this.overrides = new ServiceAttributeOverrides();
-      
-      overrides.put(ServiceContainer.SERVER_PEER_OBJECT_NAME, "UseXAForMessagePull", "true");
-   	   	
-      super.setUp();
-   }
-
-   // private -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-   
-}




More information about the jboss-cvs-commits mailing list