[jboss-cvs] JBoss Messaging SVN: r7883 - in branches/Branch_1_4: docs/userguide/en/modules and 11 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Oct 29 07:15:22 EDT 2009


Author: gaohoward
Date: 2009-10-29 07:15:21 -0400 (Thu, 29 Oct 2009)
New Revision: 7883

Added:
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Modified:
   branches/Branch_1_4/.classpath
   branches/Branch_1_4/build-messaging.xml
   branches/Branch_1_4/build.properties
   branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
   branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
   branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml
   branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml
   branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
   branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
JBMESSAGING-1742


Modified: branches/Branch_1_4/.classpath
===================================================================
--- branches/Branch_1_4/.classpath	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/.classpath	2009-10-29 11:15:21 UTC (rev 7883)
@@ -42,7 +42,7 @@
 	<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-runtime.jar"/>
 	<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-transformer.jar"/>
 	<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
-	<classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar" sourcepath="thirdparty/jboss/remoting/lib/jboss-remoting-src.jar"/>
+	<classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
@@ -62,6 +62,5 @@
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-local-jdbc.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-xml-binding.jar"/>
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
-	<classpathentry kind="var" path="ANT_HOME"/>
 	<classpathentry kind="output" path="bin"/>
 </classpath>

Modified: branches/Branch_1_4/build-messaging.xml
===================================================================
--- branches/Branch_1_4/build-messaging.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/build-messaging.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -51,11 +51,11 @@
 
    <property name="messaging.version.major" value="1"/>
    <property name="messaging.version.minor" value="4"/>
-   <property name="messaging.version.revision" value="6"/>
-   <property name="messaging.version.incrementing" value="36"/>
-   <property name="messaging.version.tag" value="GA"/>
+   <property name="messaging.version.revision" value="0"/>
+   <property name="messaging.version.incrementing" value="37"/>
+   <property name="messaging.version.tag" value="SP3_CP10"/>
    <property name="messaging.version.name" value="platypus"/>
-   <property name="messaging.version.cvstag" value="JBossMessaging_1_4_6_GA"/>
+   <property name="messaging.version.cvstag" value="JBossMessaging_1_4_0_SP3_CP10"/>
    <property name="module.name" value="messaging"/>
    <property name="module.Name" value="JBoss Messaging"/>
    <property name="module.version" value="${messaging.version.major}.${messaging.version.minor}.${messaging.version.revision}.${messaging.version.tag}"/>

Modified: branches/Branch_1_4/build.properties
===================================================================
--- branches/Branch_1_4/build.properties	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/build.properties	2009-10-29 11:15:21 UTC (rev 7883)
@@ -8,5 +8,8 @@
 # AS4 - integration against JBoss 4.x
 #############################################################################################################
 
-integration.base=AS5
-#integration.base=EAP4
+#integration.base=AS5
+
+integration.base=EAP4
+
+#integration.base=EAP5

Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -1663,6 +1663,22 @@
           <para>The number of consumers currently consuming from the
           queue.</para>
         </section>
+
+        <section id="conf.destination.queue.attributes.dropoldmessageonredeploy">
+          <title>DropOldMessageOnRedeploy</title>
+
+          <para>When you re-deploy a queue service with a clustered attribute different from the one with which it has been 
+          previously deployed, all remaining messages in the queue will be deleted after the re-deployment if you set this
+          parameter to true. Otherwise the messages will be reserved. Default is false.
+          <warning>
+               When you re-deploy a destination, you need to shut
+               down all the nodes in the cluster, make proper configuration change and then restart the nodes. 
+               Redeploying from a non-clustered queue to a clustered one requires you set the Clustered attribute to true, and
+               add the queue service configuration to each node. Redeploying from a clustered queue to a non-clustered requires 
+               you set the Clustered attribute to false in one of the queue configurations and delete all others in the cluster.
+          </warning>
+          </para>
+        </section>
       </section>
 
       <section id="conf.destination.queue.operations">
@@ -1948,6 +1964,22 @@
           <para>The count of all non durable subscriptions on this
           topic</para>
         </section>
+
+        <section id="conf.destination.topic.attributes.dropoldmessageonredeploy">
+          <title>DropOldMessageOnRedeploy</title>
+
+          <para>When you re-deploy a topic service with a clustered attribute different from the one with which it has been 
+          previously deployed, all remaining messages of its durable subscribers will be deleted after the re-deployment if you set this
+          parameter to true. Otherwise the messages will be reserved. Default is false.
+          <warning>
+               When you re-deploy a destination, you need to shut
+               down all the nodes in the cluster, make proper configuration change and then restart the nodes. 
+               Redeploying from a non-clustered topic to a clustered one requires you set the Clustered attribute to true, and
+               add the topic service configuration to each node. Redeploying from a clustered topic to a non-clustered one requires 
+               you set the Clustered attribute to false in one of the topic configurations and delete all others in the cluster.
+          </warning>
+          </para>
+        </section>
       </section>
 
       <section id="conf.destination.topic.operations">

Modified: branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -127,8 +127,14 @@
       <description>Is this a clustered destination?</description>
       <name>Clustered</name>
       <type>boolean</type>
+   </attribute>
+   
+   <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+      <description>When you redeploy a queue after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+      <name>DropOldMessageOnRedeploy</name>
+      <type>boolean</type>
    </attribute>   
-   
+
    <attribute access="read-only" getMethod="getMessageCounter">
       <description>Get the message counter for the queue</description>
       <name>MessageCounter</name>

Modified: branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -109,6 +109,12 @@
       <type>boolean</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+      <description>When you redeploy a topic after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+      <name>DropOldMessageOnRedeploy</name>
+      <type>boolean</type>
+   </attribute>
+
    <attribute access="read-write" getMethod="getMessageCounterHistoryDayLimit" setMethod="setMessageCounterHistoryDayLimit">
       <description>The day limit for the message counters of this topic</description>
       <name>MessageCounterHistoryDayLimit</name>

Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -83,6 +83,8 @@
    UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
    INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
    LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
+   DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
+   DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
       ]]></attribute>
 
       <!-- The maximum number of parameters to include in a prepared statement -->

Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -129,6 +129,12 @@
       <type>boolean</type>
    </attribute>   
    
+   <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+      <description>When you redeploy a queue after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+      <name>DropOldMessageOnRedeploy</name>
+      <type>boolean</type>
+   </attribute>   
+   
    <attribute access="read-only" getMethod="getMessageCounter">
       <description>Get the message counter for the queue</description>
       <name>MessageCounter</name>

Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml	2009-10-29 11:15:21 UTC (rev 7883)
@@ -109,6 +109,12 @@
       <type>boolean</type>
    </attribute>
    
+   <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+      <description>When you redeploy a topic after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+      <name>DropOldMessageOnRedeploy</name>
+      <type>boolean</type>
+   </attribute>   
+   
    <attribute access="read-write" getMethod="getMessageCounterHistoryDayLimit" setMethod="setMessageCounterHistoryDayLimit">
       <description>The day limit for the message counters of this topic</description>
       <name>MessageCounterHistoryDayLimit</name>

Modified: branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -737,6 +737,34 @@
       sc.invoke(deston, "start", new Object[0], new String[0]);
    }
 
+   public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      String config = "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") +
+                      "\"" +
+                      "       name=\"jboss.messaging.destination:service=" +
+                      (isQueue ? "Queue" : "Topic") +
+                      ",name=" +
+                      name +
+                      "\"" +
+                      "       xmbean-dd=\"xmdesc/" +
+                      (isQueue ? "Queue" : "Topic") +
+                      "-xmbean.xml\">" +
+                      (jndiName != null ? "    <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
+                      "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+                      "       <attribute name=\"Clustered\">" +
+                      String.valueOf(clustered) +
+                      "</attribute>" +
+                      "       <attribute name=\"DropOldMessageOnRedeploy\">" +
+                      String.valueOf(!keepMessage) +
+                      "</attribute>" +                      
+                      "</mbean>";
+
+      MBeanConfigurationElement mbean = new MBeanConfigurationElement(XMLUtil.stringToElement(config));
+      ObjectName deston = sc.registerAndConfigureService(mbean);
+      sc.invoke(deston, "create", new Object[0], new String[0]);
+      sc.invoke(deston, "start", new Object[0], new String[0]);
+   }
+
    public void deployDestination(boolean isQueue,
                                  String name,
                                  String jndiName,
@@ -1100,6 +1128,17 @@
       }
    }
 
+   public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      deployDestination(true, name, jndiName, clustered, keepMessage);
+   }
+
+   public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      deployDestination(false, name, jndiName, clustered, keepMessage);
+      
+   }
+
    // Inner classes --------------------------------------------------------------------------------
 
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -632,6 +632,16 @@
 
    protected abstract boolean isQueue();
 
+   public void setDropOldMessageOnRedeploy(boolean dropOldMessageOnRedeploy)
+   {
+      destination.setDropOldMessageOnRedeploy(dropOldMessageOnRedeploy);
+   }
+
+   public boolean isDropOldMessageOnRedeploy()
+   {
+      return destination.isDropOldMessageOnRedeploy();
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -90,6 +90,8 @@
    protected int messageCounterHistoryDayLimit = -1;
    
    protected int maxDeliveryAttempts = -1;
+   
+   protected boolean dropOldMessageOnRedeploy;
     
    public ManagedDestination()
    {      
@@ -318,4 +320,14 @@
    {   
       //NOOP
    }
+
+   public void setDropOldMessageOnRedeploy(boolean drop)
+   {
+      dropOldMessageOnRedeploy = drop;
+   }
+
+   public boolean isDropOldMessageOnRedeploy()
+   {
+      return dropOldMessageOnRedeploy;
+   }
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -87,20 +87,23 @@
          	
          	//Sanity check - currently it is not possible to change the clustered attribute of a destination
          	//See http://jira.jboss.org/jira/browse/JBMESSAGING-1235
+            //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+            if (po.isClustered())
+            {
+               if (destination.isClustered() != queue.isClustered())
+               {
+                  
+                  log.warn("Queue " + destination.getName() 
+                           + " previous clustered attribute is " + queue.isClustered() 
+                           + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+
+                  queue = po.convertDestination(destination, queue.getName());
+               }
+            }
          	
-         	boolean actuallyClustered = po.isClustered() && destination.isClustered();
-         	
-         	if (actuallyClustered != queue.isClustered())
-         	{
-         		throw new IllegalArgumentException("Queue " + destination.getName() + " is already deployed as clustered = " +
-         				                             queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
-         				                             " . You must delete the destination first before redeploying");
-         		
-         	}
-         	
             queue.setPagingParams(destination.getFullSize(),
-                                  destination.getPageSize(),
-                                  destination.getDownCacheSize());  
+                               destination.getPageSize(),
+                               destination.getDownCacheSize());  
             
             queue.load();
                

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -27,6 +27,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:alex.fu at novell.com">Alex Fu</a>
  * @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  *
  * @version <tt>$Revision$</tt>
  *
@@ -71,64 +72,73 @@
          Collection queues = po.getQueuesForCondition(new JMSCondition(false, destination.getName()), true);
       	
          Iterator iter = queues.iterator();
+
          while (iter.hasNext())
          {
             Queue queue = (Queue)iter.next();
             
-            boolean actuallyClustered = po.isClustered() && destination.isClustered();
-            
-            if (actuallyClustered != queue.isClustered())
-         	{
-         		throw new IllegalArgumentException("Topic " + destination.getName() + " is already deployed as clustered = " +
-         				                             queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
-         				                             " . You must delete the destination first before redeploying");
-         		
-         	}
+            //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+            if (po.isClustered())
+            {
+               if (destination.isClustered() != queue.isClustered())
+               {
+                  log.warn("Topic " + destination.getName() 
+                           + " previous clustered attribute is " + queue.isClustered() 
+                           + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+                  
+                  queue = po.convertDestination(destination, queue.getName());
+               }
+            }
                      
             //TODO We need to set the paging params this way since the post office doesn't store them
             //instead we should never create queues inside the postoffice - only do it at deploy time
-            queue.setPagingParams(destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
             
-            queue.load();
-                        
-            queue.activate();  
-               
-            //Must be done after load
-            queue.setMaxSize(destination.getMaxSize());  
-            
-            //Create a counter
-            String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
-            
-            String subName = MessageQueueNameHelper.createHelper(queue.getName()).getSubName();
-            
-            int dayLimitToUse = destination.getMessageCounterHistoryDayLimit();
-            if (dayLimitToUse == -1)
+            //if the queue is redeployed as clustered from a non-clustered state, the queue is already activated.
+            if (!queue.isActive())
             {
-               //Use override on server peer
-               dayLimitToUse = serverPeer.getDefaultMessageCounterHistoryDayLimit();
+               queue.setPagingParams(destination.getFullSize(),
+                                     destination.getPageSize(),
+                                     destination.getDownCacheSize());
+
+               queue.load();
+
+               queue.activate();
+
+               // Must be done after load
+               queue.setMaxSize(destination.getMaxSize());
+
+               // Create a counter
+               String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
+
+               String subName = MessageQueueNameHelper.createHelper(queue.getName()).getSubName();
+
+               int dayLimitToUse = destination.getMessageCounterHistoryDayLimit();
+               if (dayLimitToUse == -1)
+               {
+                  // Use override on server peer
+                  dayLimitToUse = serverPeer.getDefaultMessageCounterHistoryDayLimit();
+               }
+
+               MessageCounter counter = new MessageCounter(counterName, subName, queue, true, true, dayLimitToUse);
+
+               serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);
+
+               // Now we need to trigger a delivery - this is because message suckers might have
+               // been create *before* the queue was deployed - this is because message suckers can be
+               // created when the clusterpullconnectionfactory deploy is detected which then causes
+               // the clusterconnectionmanager to inspect the bindings for queues to create suckers
+               // to - but these bindings will exist before the queue or topic is deployed and before
+               // it has had its messages loaded
+               // Therefore we need to trigger a delivery now so remote suckers get messages
+               // See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+               // For JBM we should remove the distinction between activation and deployment to
+               // remove these annoyances and edge cases.
+               // The post office should load(=deploy) all bindings on startup including loading their
+               // state before adding the binding - there should be no separate deployment stage
+               // If the queue can be undeployed there should be a separate flag for this on the
+               // binding
+               queue.deliver();
             }
-            
-            MessageCounter counter =
-               new MessageCounter(counterName, subName, queue, true, true,
-                                  dayLimitToUse);
-            
-            serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);    
-            
-            //Now we need to trigger a delivery - this is because message suckers might have
-            //been create *before* the queue was deployed - this is because message suckers can be
-            //created when the clusterpullconnectionfactory deploy is detected which then causes
-            //the clusterconnectionmanager to inspect the bindings for queues to create suckers
-            //to - but these bindings will exist before the queue or topic is deployed and before
-            //it has had its messages loaded
-            //Therefore we need to trigger a delivery now so remote suckers get messages
-            //See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
-            //For JBM we should remove the distinction between activation and deployment to
-            //remove these annoyances and edge cases.
-            //The post office should load(=deploy) all bindings on startup including loading their
-            //state before adding the binding - there should be no separate deployment stage
-            //If the queue can be undeployed there should be a separate flag for this on the
-            //binding
-            queue.deliver();
          }
 
          serverPeer.getDestinationManager().registerDestination(destination);
@@ -139,6 +149,8 @@
          started = true;
          
          log.info(this + " started, fullSize=" + destination.getFullSize() + ", pageSize=" + destination.getPageSize() + ", downCacheSize=" + destination.getDownCacheSize());
+         
+         
       }
       catch (Throwable t)
       {

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -87,6 +87,13 @@
 
    void addTransaction(Transaction tx);
 
+   //drop all messages that belong to a channel.
+   void dropChannelMessages(long channelID) throws Exception;
+
+   //merge messages from one channel to another.
+   void mergeChannelMessage(long fromID, long toID) throws Exception;
+
+
    // Interface value classes ----------------------------------------------------------------------
 
    class MessageChannelPair

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -25,6 +25,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.jboss.jms.server.destination.ManagedDestination;
 import org.jboss.messaging.core.impl.tx.Transaction;
 
 /**
@@ -155,5 +156,10 @@
 	Map getRecoveryArea(String queueName);
    
    int getRecoveryMapSize(String queueName);
+
+   /**
+    * change the destination's clustered state.
+    */
+   Queue convertDestination(ManagedDestination destination, String queueName) throws Throwable;
 }
 

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -94,4 +94,8 @@
 
    //Optimisation for shared database
    Delivery handleMove(MessageReference ref, long sourceChannelID);
+
+   void setClustered(boolean isClustered);
+
+   void staticMerge(Queue queue) throws Exception;
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -1134,6 +1134,169 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.contract.PersistenceManager#dropChannelMessages(long)
+    */
+   public void dropChannelMessages(final long channelID) throws Exception
+   {
+      class ChannelDropRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            PreparedStatement ps2 = null;
+            ResultSet rs = null;
+            
+            try
+            {
+               ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+               ps.setLong(1, channelID);
+               rs = ps.executeQuery();
+               int rows;
+               
+               ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+               while (rs.next())
+               {
+                  long mid = rs.getLong(1);
+                  ps2.setLong(1, mid);
+                  ps2.executeUpdate();
+               }
+               ps2.close();
+               
+               ps.close();
+               
+               ps = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+               ps.setLong(1, channelID);
+               rows = ps.executeUpdate();
+               
+               if (trace)
+               {
+                  log.trace("Update page ord updated " + rows + " rows");
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      
+      new ChannelDropRunner().executeWithRetry();
+   }
+
+   /* (non-Javadoc)
+    * load messages from the channel (fromID) in the DB and
+    * add the messages to the channel (toID)
+    * @see org.jboss.messaging.core.contract.PersistenceManager#mergeChannelMessage(long, long)
+    */
+   public void mergeChannelMessage(final long fromID, final long toID) throws Exception
+   {
+
+      if (fromID == toID) { throw new IllegalArgumentException(
+            "Cannot merge queues - they have the same channel id!!"); }
+
+      class ChannelMergeRunner extends JDBCTxRunner2
+      {
+         public Object doTransaction() throws Exception
+         {
+            PreparedStatement ps = null;
+            ResultSet rs = null;
+            PreparedStatement ps2 = null;
+
+            try
+            {
+               //first get max page order of toID channel
+               ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+               ps.setLong(1, toID);
+
+               rs = ps.executeQuery();
+
+               rs.next();
+
+               Long maxOrdering = new Long(rs.getLong(2));
+
+               long pageCount = 0;
+
+               if (rs.wasNull())
+               {
+                  maxOrdering = null;
+               }
+               else
+               {
+                  //If maxOrdering is not null, update the page order
+                  pageCount = maxOrdering + 1;
+               }
+
+               rs.close();
+
+               ps.close();
+               
+               if (pageCount > 0)
+               {
+                  //update paging
+                  ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+                  
+                  ps2 = conn
+                     .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+                  ps.setLong(1, fromID);
+
+                  rs = ps.executeQuery();
+
+                  while (rs.next())
+                  {
+                     long msgId = rs.getLong(1);
+
+                     ps2.setLong(1, pageCount);
+
+                     ps2.setLong(2, msgId);
+
+                     ps2.setLong(3, fromID);
+                     
+                     int rows = ps2.executeUpdate();
+
+                     if (trace)
+                     {
+                        log.trace("Update page ord updated " + rows + " rows");
+                     }
+
+                     pageCount++;
+                  }
+                  ps2.close();
+                  ps.close();
+               }
+               
+               //update channel id
+               ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+               ps.setLong(1, toID);
+
+               ps.setLong(2, fromID);
+
+               int rows = ps.executeUpdate();
+
+               if (trace)
+               {
+                  log.trace("Update channel id updated " + rows + " rows");
+               }
+            }
+            finally
+            {
+               closeResultSet(rs);
+               closeStatement(ps);
+               closeStatement(ps2);
+            }
+            return null;
+         }
+      }
+      
+      new ChannelMergeRunner().executeWithRetry();
+   }
+
    public InitialLoadInfo mergeAndLoad(final long fromChannelID,
          final long toChannelID, final int numberToLoad,
          final long firstPagingOrder, final long nextPagingOrder)
@@ -2668,6 +2831,10 @@
       map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
       map.put("DELETE_MESSAGE",
               "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+      map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
+      map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?");
+
+      
       // Transaction
       map.put("INSERT_TRANSACTION",
               "INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -201,6 +201,13 @@
    	return remoteDistributor;
    }
    
+   public void staticMerge(Queue fromQueue) throws Exception
+   {
+      if (trace) { log.trace("Merging queue " + channelID + " into " + this + 
+                             " statically."); }
+      pm.mergeChannelMessage(fromQueue.getChannelID(), channelID);
+   }
+   
    /**
     * Merge the contents of one queue with another - this happens at failover when a queue is failed
     * over to another node, but a queue with the same name already exists. In this case we merge the
@@ -708,4 +715,9 @@
 			}
 		}   	
    }
+
+   public void setClustered(boolean isClustered)
+   {
+      clustered = isClustered;
+   }
 }

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -211,6 +211,14 @@
       return timeMark;
    }
 
+   public void dropChannelMessages(long channelID) throws Exception
+   {
+   }
+
+   public void mergeChannelMessage(long fromID, long toID) throws Exception
+   {
+   }
+
 }
 
 class IDCounter

Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -49,7 +49,9 @@
 import javax.transaction.TransactionManager;
 
 import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.server.JMSCondition;
 import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.destination.ManagedDestination;
 import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Binding;
@@ -93,6 +95,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  * @version <tt>$Revision: 2782 $</tt>
  *
  * $Id: DefaultClusteredPostOffice.java 2782 2007-06-14 12:16:17Z timfox $
@@ -456,17 +459,17 @@
    
    public String getOfficeName()
    {
-   	return officeName;
+   	return officeName + thisNodeID;
    }
-    
    
    public boolean addBinding(Binding binding, boolean allNodes) throws Exception
    {
+      
    	if (allNodes && !binding.queue.isClustered())
    	{
    		throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
    	}
-   		
+      
    	boolean added = internalAddBinding(binding, allNodes, true);
    	
    	if (added && allNodes && clustered && binding.queue.isClustered())
@@ -484,7 +487,7 @@
    	
    	return added;
    }
-                
+
    public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
    {
    	Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -755,7 +758,95 @@
 	   }
 	}
 	
-	public void injectServerPeer(ServerPeer serverPeer)
+	/*
+	 * Convert an existing destination: 
+	 * if it is clustered, change it to be non-clustered.
+	 * if it is non-clustered, change it to be clustered.
+	 * 
+	 * Note: this method should be called during the destination loading time, i.e.
+	 * the queue is not activated yet.
+	 */
+   public Queue convertDestination(ManagedDestination newDest, String queueName) throws Throwable
+   {
+      Binding b = getBindingForQueueName(queueName);
+
+      if (b.queue.isActive())
+      {
+         throw new IllegalStateException(this + " cannot convert the destination " + b.queue + " because it is in active state");
+      }
+      
+      //if a destination changes from clustered to standalone, collect the messages from all channels
+      Collection allBindings = getAllBindingsForQueueName(queueName);
+      if (newDest.isDropOldMessageOnRedeploy())
+      {
+         removeDBChannelMessages(allBindings);
+      }
+      else
+      {
+         if ( !newDest.isClustered() )
+         {
+            mergeDBChannelMessages(allBindings, b.queue);
+         }
+      }
+
+      //if the old destination is clustered, we need to remove from all nodes (queue and durable subs)
+      b = removeBinding(queueName, !newDest.isClustered());
+
+      b.queue.setClustered(newDest.isClustered());
+   
+      if (newDest.isQueue())
+      {
+         JMSCondition queueCond = new JMSCondition(true, queueName);
+         addBinding(new Binding(queueCond, b.queue, false), false);
+      }
+      else
+      {
+         b.queue.setClustered(newDest.isClustered());
+         JMSCondition queueCond = new JMSCondition(false, newDest.getName());
+         addBinding(new Binding(queueCond, b.queue, true), newDest.isClustered());
+      }
+      
+      return b.queue;
+   }
+   
+   private void removeDBChannelMessages(final Collection allBindings) throws Exception
+   {
+      if (ds == null)
+      {
+         return;
+      }
+      Iterator itBindings = allBindings.iterator();
+
+      while (itBindings.hasNext())
+      {
+         Binding bd = (Binding)itBindings.next();
+         long channelID = bd.queue.getChannelID();
+         pm.dropChannelMessages(channelID);
+      }      
+   }
+   
+   private void mergeDBChannelMessages(final Collection allBindings, Queue localQueue) throws Exception
+   {
+      if (ds == null)
+      {
+         return;
+      }
+      
+      //do a staticMerge
+      Iterator itBindings = allBindings.iterator();
+
+      while (itBindings.hasNext())
+      {
+         Binding bd = (Binding)itBindings.next();
+         long fromChannelID = bd.queue.getChannelID();
+         if (fromChannelID != localQueue.getChannelID())
+         {
+            localQueue.staticMerge(bd.queue);
+         }
+      }
+   }
+
+   public void injectServerPeer(ServerPeer serverPeer)
 	{
 		this.serverPeer = serverPeer;
 	}
@@ -2620,7 +2711,6 @@
       groupMember.unicastData(request, address);
    }
    
-   
    private Map getBindingsFromStorage() throws Exception
    {
    	if (ds == null)
@@ -2676,7 +2766,7 @@
                                                    true, filter, bindingClustered && clustered);
 
                   if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
-
+                  
                   Condition condition = conditionFactory.createCondition(conditionText);
 
                   Binding binding = new Binding(condition, queue, allNodes);
@@ -2707,7 +2797,7 @@
       while (iter.hasNext())
       {
       	Binding binding = (Binding)iter.next();
-      	
+
       	addBindingInMemory(binding);    
       	
       	Queue queue = binding.queue;

Copied: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java (from rev 7882, branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java)
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	                        (rev 0)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -0,0 +1,1091 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * Test for https://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class DestinationRedeployTest extends ClusteringTestBase
+{
+   //clustered2NonclusteredQueue
+   private Queue cQueue;
+   
+   //nonclustered2ClusteredQueue
+   private Queue nQueue;
+   
+   //clustered2NonclusteredTopic
+   private Topic cTopic;
+   
+   //nonclustered2ClusteredTopic
+   private Topic nTopic;
+   
+   public DestinationRedeployTest(String name)
+   {
+      super(name);
+   }
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   //do a redeploy and test the queues work normally by sending some messages and receiving them.
+   public void testRedeployQueue() throws Exception
+   {
+      String msgBase = "testRedeployQueue";
+      int numMsg = 50;
+
+      deployDestinations();
+      redeployDestinations(true);
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      sendMessages(1, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //do a redeploy and test the topics work normally by sending some messages and receiving them.
+   public void testRedeployTopic() throws Exception
+   {
+      String msgBase = "testRedeployTopic";
+      int numMsg = 50;
+
+      deployDestinations();
+      redeployDestinations(true);
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer sub1 = sess.createConsumer(cTopic);
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.start();
+
+         sendMessages(0, cTopic, msgBase, numMsg);
+         sendMessages(1, nTopic, msgBase, numMsg);
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub2.close();
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   public void testRedeployTopicNoMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.close();
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         redeployDestinations(true);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn.start();
+
+         sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   public void testRedeployTopicWithMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployTopicWithMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         sess.createDurableSubscriber(cTopic, "sub1");
+         sess.createDurableSubscriber(nTopic, "sub2");
+         
+         conn.close();
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         redeployDestinations(false);
+         
+         checkEmpty(cTopic);
+         checkEmpty(nTopic);
+
+         sendMessages(0, cTopic, msgBase, numMsg);
+         sendMessages(2, nTopic, msgBase, numMsg);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn.start();
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   //send some messages to topics and receive a few of them. Then do redeploy and try to receive the rest.
+   public void testRedeployTopicNoMessageLoss2() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLoss2";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         //receive 10
+         conn.start();
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         conn.close();
+         
+         redeployDestinations(true);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn.start();
+
+         sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess.unsubscribe("sub1");
+         sess.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployTopicNoMessageLossTX() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLossTX";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      Connection conn = null;
+      Session sess1 = null;
+      Session sess2 = null;
+      
+      try
+      {
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess1 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+         
+         MessageConsumer sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+         
+         //receive 10
+         conn.start();
+         
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sess1.commit();
+         sess2.rollback();
+         
+         conn.close();
+         
+         redeployDestinations(true);
+
+         conn = createConnectionOnServer(cf, 0);
+         conn.setClientID("client-id-0");
+         sess1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn.start();
+
+         sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess1.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess1.unsubscribe("sub1");
+         sess1.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+   //send some messages to topics and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployTopicNoMessageLossXA() throws Exception
+   {
+      String msgBase = "testRedeployTopicNoMessageLossXA";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      XAConnection xaconn1 = null;
+      XASession xasess1 = null;
+      XAResource xres1 = null;
+      Session sess1 = null;
+
+      XAConnection xaconn2 = null;
+      XASession xasess2 = null;
+      XAResource xres2 = null;
+      Session sess2 = null;
+      
+      Xid xid1 = null;
+      Xid xid2 = null;
+      
+      Connection conn1 = null;
+      Connection conn2 = null;
+      
+      try
+      {
+         xaconn1 = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, 1);
+         xaconn1.setClientID("client-id-0");
+         xasess1 = xaconn1.createXASession();
+         xres1 = xasess1.getXAResource();
+         xaconn1.start();
+         sess1 = xasess1.getSession();
+
+         xaconn2 = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, 0);
+         xaconn2.setClientID("client-id-1");
+         xasess2 = xaconn2.createXASession();
+         xres2 = xasess2.getXAResource();
+         xaconn2.start();
+         sess2 = xasess2.getSession();
+
+         xid1 = new MessagingXid(("bq1" + cTopic).getBytes(), 42, cTopic.toString().getBytes());
+         xid2 = new MessagingXid(("bq1" + nTopic).getBytes(), 42, nTopic.toString().getBytes());
+
+         xres1.start(xid1, XAResource.TMNOFLAGS);
+         xres2.start(xid2, XAResource.TMNOFLAGS);
+         
+         MessageConsumer sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+         MessageConsumer sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+         sendMessages(2, cTopic, msgBase, numMsg);
+         sendMessages(0, nTopic, msgBase, numMsg);
+                  
+         for (int i = 0; i < 10; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            log.info("--Message received: " + rm);
+            assertEquals(msgBase + i, rm.getText());
+            rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         xres1.end(xid1, XAResource.TMSUCCESS);
+         xres2.end(xid2, XAResource.TMSUCCESS);
+
+         xres1.commit(xid1, true);
+         xres2.rollback(xid2);
+         
+         sub1.close();
+         sub2.close();
+         
+         xaconn1.close();
+         xaconn2.close();
+         
+         redeployDestinations(true);
+
+         conn1 = createConnectionOnServer(cf, 0);
+         conn1.setClientID("client-id-0");
+         sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn2 = createConnectionOnServer(cf, 0);
+         conn2.setClientID("client-id-1");
+         sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn1.start();
+         conn2.start();
+
+         sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+
+         sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+         
+         for (int i = 10; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub1.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage rm = (TextMessage)sub2.receive(5000);
+            assertEquals(msgBase + i, rm.getText());
+         }
+         
+         sub1.close();
+         sub2.close();
+         sess1.unsubscribe("sub1");
+         sess2.unsubscribe("sub2");
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+      }
+   }
+   
+   //send some messages to queues and do redeploy, then receiving them.
+   public void testRedeployQueueNoMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+   
+   //send some messages to queues and do redeploy, dropping all messages
+   public void testRedeployQueueWithMessageLoss() throws Exception
+   {
+      String msgBase = "testRedeployQueueWithMessageLoss";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      redeployDestinations(false);
+      
+      checkEmpty(cQueue);
+      checkEmpty(nQueue);
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      sendMessages(1, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
+   public void testRedeployQueueNoMessageLoss2() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLoss2";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessages(0, cQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+      receiveMessages(0, nQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+      
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+   
+   //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossTX() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossTX";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, false, "commit", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, false, "rollback", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "commit", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "rollback", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA2() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA2";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(0, cQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "prepared", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, false);
+      
+      recoverMessages(0, cQueue, msgBase, 0, 10);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA3() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA3";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, nQueue, msgBase, 0, 15, true, "prepared", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(2, nQueue, msgBase, 15, numMsg - 15, Session.CLIENT_ACKNOWLEDGE, true);
+      
+      //have to recover from node 0.
+      recoverMessages(0, nQueue, msgBase, 0, 15);
+   }
+
+   //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+   public void testRedeployQueueNoMessageLossXA4() throws Exception
+   {
+      String msgBase = "testRedeployQueueNoMessageLossXA4";
+      int numMsg = 50;
+
+      deployDestinations();
+      
+      sendMessages(1, cQueue, msgBase, numMsg);
+      sendMessages(0, nQueue, msgBase, numMsg);
+      
+      receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "noaction", false);
+      receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "noaction", false);
+
+      redeployDestinations(true);
+      
+      receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+      receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+   }
+
+   /*
+    * Deploy the following destinations:
+    * 
+    * 1. clustered2NonclusteredQueue : a clustered queue used to be re-deployed as non-clustered.
+    * 2. nonclustered2ClusteredQueue : a non-clustered queue (at node0) to be re-deployed as clustered.
+    * 3. clustered2NonclusteredTopic : a clustered topic used to be re-deployed as non-clustered.
+    * 4. nonclustered2ClusteredTopic : a non-clustered topic (at node0) to be re-deployed as clustered.
+    * 
+    */
+   private void deployDestinations() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("clustered2NonclusteredQueue", i);
+         ServerManagement.deployTopic("clustered2NonclusteredTopic", i);
+      }
+      ServerManagement.deployQueue("nonclustered2ClusteredQueue");
+      ServerManagement.deployTopic("nonclustered2ClusteredTopic");
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
+
+      Queue anotherQueue = null;
+      try
+      {
+         anotherQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         assertNotNull(anotherQueue);
+      }
+      catch (NamingException e)
+      {
+         fail("The queue " + anotherQueue + " should not exist after redeploy");
+      }
+      
+      Topic anotherTopic = null;
+      try
+      {
+         anotherTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         assertNotNull(anotherTopic);
+      }
+      catch (NamingException e)
+      {
+         fail("The topic " + anotherTopic + " should not exist after redeploy");
+      }
+   }
+
+   private void redeployDestinations(boolean keepMessage) throws Exception
+   {
+      if (keepMessage)
+      {
+         redeployDestinationsWithMessage();
+      }
+      else
+      {
+         redeployDestinationsNoMessage();
+      }
+   }
+   
+   private void redeployDestinationsNoMessage() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.stop(i);
+      }
+      
+      //Restart nodes
+      for (int i = 0; i < nodeCount; i++)
+      {
+         startDefaultServer(i, overrides, false);
+         ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+      }
+      
+      //redeploy
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("nonclustered2ClusteredQueue", i, false);
+         ServerManagement.deployTopic("nonclustered2ClusteredTopic", i, false);
+      }
+      ServerManagement.deployQueue("clustered2NonclusteredQueue", false);
+      ServerManagement.deployTopic("clustered2NonclusteredTopic", false);      
+
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");  
+      
+      cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+      try
+      {
+         Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         fail("The queue " + nonExistQueue + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+      
+      try
+      {
+         Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         fail("The topic " + nonExistTopic + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+
+
+   }
+   
+   private void redeployDestinationsWithMessage() throws Exception
+   {
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.stop(i);
+      }
+      
+      //Restart nodes
+      for (int i = 0; i < nodeCount; i++)
+      {
+         startDefaultServer(i, overrides, false);
+         ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+      }
+      
+      //redeploy
+      for (int i = 0; i < nodeCount; i++)
+      {
+         ServerManagement.deployQueue("nonclustered2ClusteredQueue", i);
+         ServerManagement.deployTopic("nonclustered2ClusteredTopic", i);
+      }
+      ServerManagement.deployQueue("clustered2NonclusteredQueue");
+      ServerManagement.deployTopic("clustered2NonclusteredTopic");      
+
+
+      cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+      nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+      cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+      nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");  
+      
+      cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+      try
+      {
+         Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+         fail("The queue " + nonExistQueue + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+      
+      try
+      {
+         Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+         fail("The topic " + nonExistTopic + " should not exist after redeploy");
+      }
+      catch (NamingException e)
+      {
+         //ok
+      }
+
+
+   }
+
+   private void sendMessages(int serverIndex, Destination dest, String msgBase, int numMsg) throws Exception
+   {
+      Connection conn = null;
+      try
+      {
+         conn = createConnectionOnServer(cf, serverIndex);
+         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = sess.createProducer(dest);
+         log.info("-----Sending messages to: " + dest);
+         for (int i = 0; i < numMsg; i++)
+         {
+            TextMessage msg = sess.createTextMessage(msgBase + i);
+            producer.send(msg);
+         }
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         throw e;
+      }
+      finally
+      {
+         conn.close();
+      }
+   }
+
+   private void receiveMessages(int serverIndex, Destination dest, String msgBase, int startIndex,
+                                int numMsg, int ack, boolean checkEmpty) throws Exception
+   {
+      Connection conn = null;
+      
+      try
+      {
+         Session sess = null;
+
+         conn = createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(false, ack);
+         conn.start();
+         
+         MessageConsumer receiver = sess.createConsumer(dest);
+         TextMessage msg = null;
+         
+         for (int i = 0; i < numMsg; i++)
+         {
+            msg = (TextMessage)receiver.receive(5000);
+            assertEquals(msgBase + (startIndex + i), msg.getText());
+         }
+
+         if (ack == Session.CLIENT_ACKNOWLEDGE)
+         {
+            msg.acknowledge();
+         }
+         
+         if (checkEmpty)
+         {
+            if (dest instanceof Queue)
+            {
+               checkEmpty((Queue)dest);
+            }
+            else
+            {
+               checkEmpty((Topic)dest);
+            }
+         }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+   
+   /*
+    * receive messages transactionally.
+    * 
+    * outcome values:
+    * 
+    * commit -- commit the transaction
+    * rollback -- rollback the transaction
+    * prepared -- parepared the transaction but not commit.
+    * 
+    */
+   private void receiveMessagesTX(int serverIndex, Destination dest, String msgBase, int startIndex, 
+                                  int numMsg, boolean isXA, String outcome, boolean checkEmpty) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      Session sess = null;
+      XASession xasess = null;
+      XAResource xres = null;
+      Xid xid = null;
+      try
+      {
+      if (isXA)
+      {
+         xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+         xasess = xaconn.createXASession();
+         xres = xasess.getXAResource();
+         xaconn.start();
+         sess = xasess.getSession();
+
+         xid = new MessagingXid(("bq1" + dest).getBytes(), 42, dest.toString().getBytes());
+
+         xres.start(xid, XAResource.TMNOFLAGS);
+      }
+      else
+      {
+         //local tx
+         conn = createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+         conn.start();
+      }
+      
+      //starting receiving
+      MessageConsumer cons = sess.createConsumer(dest);
+      for (int i = 0; i < numMsg; i++)
+      {
+         TextMessage rm = (TextMessage)cons.receive(5000);
+         assertEquals(msgBase + (i + startIndex), rm.getText());
+      }
+      
+      //ending
+      if (isXA)
+      {
+         xres.end(xid, XAResource.TMSUCCESS);
+         
+         if ("commit".equals(outcome))
+         {
+            //just one-phase is enough for the test
+            xres.commit(xid, true);
+         }
+         else if ("rollback".equals(outcome))
+         {
+            xres.rollback(xid);
+         }
+         else if ("prepared".equals(outcome))
+         {
+            xres.prepare(xid);
+         }
+      }
+      else
+      {
+         //local
+         if ("commit".equals(outcome))
+         {
+            sess.commit();
+         }
+         else if ("rollback".equals(outcome))
+         {
+            sess.rollback();
+         }
+      }
+      }
+      finally
+      {
+         if (conn != null)
+         {
+            conn.close();
+         }
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+      }
+   }
+
+   //recover the messages in transactions by rollback.
+   private void recoverMessages(int serverIndex, Destination dest, 
+                                String msgBase, int startIndex, int numMsg) throws Exception
+   {
+      Connection conn = null;
+      XAConnection xaconn = null;
+      Session sess = null;
+      XASession xasess = null;
+      XAResource xres = null;
+
+      try
+      {
+         xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+         xasess = xaconn.createXASession();
+         xres = xasess.getXAResource();
+         xaconn.start();
+         
+         Xid[] xids = xres.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, xids.length);
+
+         Xid[] xids2 = xres.recover(XAResource.TMENDRSCAN);
+         assertEquals(0, xids2.length);
+         
+         conn = this.createConnectionOnServer(cf, serverIndex);
+         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         conn.start();
+         
+         MessageConsumer cons = sess.createConsumer(dest);
+         TextMessage rm = (TextMessage)cons.receive(5000);
+         assertNull(rm);
+                  
+         xres.rollback(xids[0]);
+
+         conn.stop();
+         conn.start();
+         for (int i = 0; i < numMsg; i++)
+         {
+            rm = (TextMessage)cons.receive(5000);
+            assertEquals(msgBase + (startIndex + i), rm.getText());
+         }
+
+         if (dest instanceof Queue)
+         {
+            checkEmpty((Queue)dest);
+         }
+         else
+         {
+            checkEmpty((Topic)dest);
+         }
+      }
+      finally
+      {
+         if (xaconn != null)
+         {
+            xaconn.close();
+         }
+         if (conn != null)
+         {
+            conn.close();
+         }
+      }
+   }
+
+
+}

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -392,6 +392,14 @@
       {
          return null;
       }
+
+      public void setClustered(boolean isClustered)
+      {
+      }
+
+      public void staticMerge(org.jboss.messaging.core.contract.Queue queue) throws Exception
+      {
+      }
    }
 
    // Inner classes -------------------------------------------------

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -1357,4 +1357,28 @@
       }
    }
 
+   public static void deployQueue(String name, int serverIndex, boolean keepMessage) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployQueue(name, null, true, keepMessage);
+   }
+
+   public static void deployTopic(String name, int serverIndex, boolean keepMessage) throws Exception
+   {
+      insureStarted(serverIndex);
+      servers[serverIndex].getServer().deployTopic(name, null, true, keepMessage);
+   }
+
+   public static void deployQueue(String name, boolean keepMessage) throws Exception
+   {
+      insureStarted();
+      servers[0].getServer().deployQueue(name, null, false, keepMessage);
+   }
+
+   public static void deployTopic(String name, boolean keepMessage) throws Exception
+   {
+      insureStarted();
+      servers[0].getServer().deployTopic(name, null, false, keepMessage);
+   }
+
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -561,5 +561,21 @@
       return namingDelegate;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.test.messaging.tools.container.Server#deployQueue(java.lang.String, java.lang.String, boolean, boolean)
+    */
+   public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      server.deployQueue(name, jndiName, clustered, keepMessage);
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.test.messaging.tools.container.Server#deployTopic(java.lang.String, java.lang.String, boolean, boolean)
+    */
+   public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+   {
+      server.deployTopic(name, jndiName, clustered, keepMessage);
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java	2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java	2009-10-29 11:15:21 UTC (rev 7883)
@@ -298,4 +298,8 @@
 
    ObjectName getPostOfficeObjectName() throws Exception;
 
+   void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
+   void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
 }




More information about the jboss-cvs-commits mailing list