[jboss-cvs] JBoss Messaging SVN: r7883 - in branches/Branch_1_4: docs/userguide/en/modules and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Oct 29 07:15:22 EDT 2009
Author: gaohoward
Date: 2009-10-29 07:15:21 -0400 (Thu, 29 Oct 2009)
New Revision: 7883
Added:
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
Modified:
branches/Branch_1_4/.classpath
branches/Branch_1_4/build-messaging.xml
branches/Branch_1_4/build.properties
branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml
branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml
branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
JBMESSAGING-1742
Modified: branches/Branch_1_4/.classpath
===================================================================
--- branches/Branch_1_4/.classpath 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/.classpath 2009-10-29 11:15:21 UTC (rev 7883)
@@ -42,7 +42,7 @@
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-runtime.jar"/>
<classpathentry kind="lib" path="thirdparty/retrotranslator/lib/retrotranslator-transformer.jar"/>
<classpathentry kind="lib" path="thirdparty/trove/lib/trove.jar"/>
- <classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar" sourcepath="thirdparty/jboss/remoting/lib/jboss-remoting-src.jar"/>
+ <classpathentry kind="lib" path="thirdparty/jboss/remoting/lib/jboss-remoting.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/jbossts14/lib/jbossjta.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
@@ -62,6 +62,5 @@
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-local-jdbc.jar"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-xml-binding.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
- <classpathentry kind="var" path="ANT_HOME"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: branches/Branch_1_4/build-messaging.xml
===================================================================
--- branches/Branch_1_4/build-messaging.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/build-messaging.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -51,11 +51,11 @@
<property name="messaging.version.major" value="1"/>
<property name="messaging.version.minor" value="4"/>
- <property name="messaging.version.revision" value="6"/>
- <property name="messaging.version.incrementing" value="36"/>
- <property name="messaging.version.tag" value="GA"/>
+ <property name="messaging.version.revision" value="0"/>
+ <property name="messaging.version.incrementing" value="37"/>
+ <property name="messaging.version.tag" value="SP3_CP10"/>
<property name="messaging.version.name" value="platypus"/>
- <property name="messaging.version.cvstag" value="JBossMessaging_1_4_6_GA"/>
+ <property name="messaging.version.cvstag" value="JBossMessaging_1_4_0_SP3_CP10"/>
<property name="module.name" value="messaging"/>
<property name="module.Name" value="JBoss Messaging"/>
<property name="module.version" value="${messaging.version.major}.${messaging.version.minor}.${messaging.version.revision}.${messaging.version.tag}"/>
Modified: branches/Branch_1_4/build.properties
===================================================================
--- branches/Branch_1_4/build.properties 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/build.properties 2009-10-29 11:15:21 UTC (rev 7883)
@@ -8,5 +8,8 @@
# AS4 - integration against JBoss 4.x
#############################################################################################################
-integration.base=AS5
-#integration.base=EAP4
+#integration.base=AS5
+
+integration.base=EAP4
+
+#integration.base=EAP5
Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -1663,6 +1663,22 @@
<para>The number of consumers currently consuming from the
queue.</para>
</section>
+
+ <section id="conf.destination.queue.attributes.dropoldmessageonredeploy">
+ <title>DropOldMessageOnRedeploy</title>
+
+ <para>When you re-deploy a queue service with a clustered attribute different from the one with which it has been
+ previously deployed, all remaining messages in the queue will be deleted after the re-deployment if you set this
+ parameter to true. Otherwise the messages will be reserved. Default is false.
+ <warning>
+ When you re-deploy a destination, you need to shut
+ down all the nodes in the cluster, make proper configuration change and then restart the nodes.
+ Redeploying from a non-clustered queue to a clustered one requires you set the Clustered attribute to true, and
+ add the queue service configuration to each node. Redeploying from a clustered queue to a non-clustered requires
+ you set the Clustered attribute to false in one of the queue configurations and delete all others in the cluster.
+ </warning>
+ </para>
+ </section>
</section>
<section id="conf.destination.queue.operations">
@@ -1948,6 +1964,22 @@
<para>The count of all non durable subscriptions on this
topic</para>
</section>
+
+ <section id="conf.destination.topic.attributes.dropoldmessageonredeploy">
+ <title>DropOldMessageOnRedeploy</title>
+
+ <para>When you re-deploy a topic service with a clustered attribute different from the one with which it has been
+ previously deployed, all remaining messages of its durable subscribers will be deleted after the re-deployment if you set this
+ parameter to true. Otherwise the messages will be reserved. Default is false.
+ <warning>
+ When you re-deploy a destination, you need to shut
+ down all the nodes in the cluster, make proper configuration change and then restart the nodes.
+ Redeploying from a non-clustered topic to a clustered one requires you set the Clustered attribute to true, and
+ add the topic service configuration to each node. Redeploying from a clustered topic to a non-clustered one requires
+ you set the Clustered attribute to false in one of the topic configurations and delete all others in the cluster.
+ </warning>
+ </para>
+ </section>
</section>
<section id="conf.destination.topic.operations">
Modified: branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/AS5/etc/xmdesc/Queue-xmbean.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -127,8 +127,14 @@
<description>Is this a clustered destination?</description>
<name>Clustered</name>
<type>boolean</type>
+ </attribute>
+
+ <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+ <description>When you redeploy a queue after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+ <name>DropOldMessageOnRedeploy</name>
+ <type>boolean</type>
</attribute>
-
+
<attribute access="read-only" getMethod="getMessageCounter">
<description>Get the message counter for the queue</description>
<name>MessageCounter</name>
Modified: branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/AS5/etc/xmdesc/Topic-xmbean.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -109,6 +109,12 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+ <description>When you redeploy a topic after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+ <name>DropOldMessageOnRedeploy</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getMessageCounterHistoryDayLimit" setMethod="setMessageCounterHistoryDayLimit">
<description>The day limit for the message counters of this topic</description>
<name>MessageCounterHistoryDayLimit</name>
Modified: branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/etc/server/default/deploy/mysql-persistence-service.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -83,6 +83,8 @@
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
+ DELETE_CHANNEL_MESSAGE_REF=DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?
+ DELETE_CHANNEL_MESSAGE=DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?
]]></attribute>
<!-- The maximum number of parameters to include in a prepared statement -->
Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/Queue-xmbean.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -129,6 +129,12 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+ <description>When you redeploy a queue after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+ <name>DropOldMessageOnRedeploy</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-only" getMethod="getMessageCounter">
<description>Get the message counter for the queue</description>
<name>MessageCounter</name>
Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/Topic-xmbean.xml 2009-10-29 11:15:21 UTC (rev 7883)
@@ -109,6 +109,12 @@
<type>boolean</type>
</attribute>
+ <attribute access="read-write" getMethod="isDropOldMessageOnRedeploy" setMethod="setDropOldMessageOnRedeploy">
+ <description>When you redeploy a topic after changing its clustered state, do you want to keep or abandon its existing messages?</description>
+ <name>DropOldMessageOnRedeploy</name>
+ <type>boolean</type>
+ </attribute>
+
<attribute access="read-write" getMethod="getMessageCounterHistoryDayLimit" setMethod="setMessageCounterHistoryDayLimit">
<description>The day limit for the message counters of this topic</description>
<name>MessageCounterHistoryDayLimit</name>
Modified: branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -737,6 +737,34 @@
sc.invoke(deston, "start", new Object[0], new String[0]);
}
+ public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ String config = "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") +
+ "\"" +
+ " name=\"jboss.messaging.destination:service=" +
+ (isQueue ? "Queue" : "Topic") +
+ ",name=" +
+ name +
+ "\"" +
+ " xmbean-dd=\"xmdesc/" +
+ (isQueue ? "Queue" : "Topic") +
+ "-xmbean.xml\">" +
+ (jndiName != null ? " <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ " <attribute name=\"Clustered\">" +
+ String.valueOf(clustered) +
+ "</attribute>" +
+ " <attribute name=\"DropOldMessageOnRedeploy\">" +
+ String.valueOf(!keepMessage) +
+ "</attribute>" +
+ "</mbean>";
+
+ MBeanConfigurationElement mbean = new MBeanConfigurationElement(XMLUtil.stringToElement(config));
+ ObjectName deston = sc.registerAndConfigureService(mbean);
+ sc.invoke(deston, "create", new Object[0], new String[0]);
+ sc.invoke(deston, "start", new Object[0], new String[0]);
+ }
+
public void deployDestination(boolean isQueue,
String name,
String jndiName,
@@ -1100,6 +1128,17 @@
}
}
+ public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ deployDestination(true, name, jndiName, clustered, keepMessage);
+ }
+
+ public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ deployDestination(false, name, jndiName, clustered, keepMessage);
+
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/DestinationServiceSupport.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -632,6 +632,16 @@
protected abstract boolean isQueue();
+ public void setDropOldMessageOnRedeploy(boolean dropOldMessageOnRedeploy)
+ {
+ destination.setDropOldMessageOnRedeploy(dropOldMessageOnRedeploy);
+ }
+
+ public boolean isDropOldMessageOnRedeploy()
+ {
+ return destination.isDropOldMessageOnRedeploy();
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/ManagedDestination.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -90,6 +90,8 @@
protected int messageCounterHistoryDayLimit = -1;
protected int maxDeliveryAttempts = -1;
+
+ protected boolean dropOldMessageOnRedeploy;
public ManagedDestination()
{
@@ -318,4 +320,14 @@
{
//NOOP
}
+
+ public void setDropOldMessageOnRedeploy(boolean drop)
+ {
+ dropOldMessageOnRedeploy = drop;
+ }
+
+ public boolean isDropOldMessageOnRedeploy()
+ {
+ return dropOldMessageOnRedeploy;
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/QueueService.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -87,20 +87,23 @@
//Sanity check - currently it is not possible to change the clustered attribute of a destination
//See http://jira.jboss.org/jira/browse/JBMESSAGING-1235
+ //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ if (po.isClustered())
+ {
+ if (destination.isClustered() != queue.isClustered())
+ {
+
+ log.warn("Queue " + destination.getName()
+ + " previous clustered attribute is " + queue.isClustered()
+ + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+
+ queue = po.convertDestination(destination, queue.getName());
+ }
+ }
- boolean actuallyClustered = po.isClustered() && destination.isClustered();
-
- if (actuallyClustered != queue.isClustered())
- {
- throw new IllegalArgumentException("Queue " + destination.getName() + " is already deployed as clustered = " +
- queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
- " . You must delete the destination first before redeploying");
-
- }
-
queue.setPagingParams(destination.getFullSize(),
- destination.getPageSize(),
- destination.getDownCacheSize());
+ destination.getPageSize(),
+ destination.getDownCacheSize());
queue.load();
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/destination/TopicService.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -27,6 +27,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:alex.fu at novell.com">Alex Fu</a>
* @author <a href="mailto:juha at jboss.org">Juha Lindfors</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
*
* @version <tt>$Revision$</tt>
*
@@ -71,64 +72,73 @@
Collection queues = po.getQueuesForCondition(new JMSCondition(false, destination.getName()), true);
Iterator iter = queues.iterator();
+
while (iter.hasNext())
{
Queue queue = (Queue)iter.next();
- boolean actuallyClustered = po.isClustered() && destination.isClustered();
-
- if (actuallyClustered != queue.isClustered())
- {
- throw new IllegalArgumentException("Topic " + destination.getName() + " is already deployed as clustered = " +
- queue.isClustered() + " so cannot redeploy as clustered=" + destination.isClustered() +
- " . You must delete the destination first before redeploying");
-
- }
+ //See http://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ if (po.isClustered())
+ {
+ if (destination.isClustered() != queue.isClustered())
+ {
+ log.warn("Topic " + destination.getName()
+ + " previous clustered attribute is " + queue.isClustered()
+ + ". Now re-deploying it with clustered attribute: " + destination.isClustered());
+
+ queue = po.convertDestination(destination, queue.getName());
+ }
+ }
//TODO We need to set the paging params this way since the post office doesn't store them
//instead we should never create queues inside the postoffice - only do it at deploy time
- queue.setPagingParams(destination.getFullSize(), destination.getPageSize(), destination.getDownCacheSize());
- queue.load();
-
- queue.activate();
-
- //Must be done after load
- queue.setMaxSize(destination.getMaxSize());
-
- //Create a counter
- String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
-
- String subName = MessageQueueNameHelper.createHelper(queue.getName()).getSubName();
-
- int dayLimitToUse = destination.getMessageCounterHistoryDayLimit();
- if (dayLimitToUse == -1)
+ //if the queue is redeployed as clustered from a non-clustered state, the queue is already activated.
+ if (!queue.isActive())
{
- //Use override on server peer
- dayLimitToUse = serverPeer.getDefaultMessageCounterHistoryDayLimit();
+ queue.setPagingParams(destination.getFullSize(),
+ destination.getPageSize(),
+ destination.getDownCacheSize());
+
+ queue.load();
+
+ queue.activate();
+
+ // Must be done after load
+ queue.setMaxSize(destination.getMaxSize());
+
+ // Create a counter
+ String counterName = SUBSCRIPTION_MESSAGECOUNTER_PREFIX + queue.getName();
+
+ String subName = MessageQueueNameHelper.createHelper(queue.getName()).getSubName();
+
+ int dayLimitToUse = destination.getMessageCounterHistoryDayLimit();
+ if (dayLimitToUse == -1)
+ {
+ // Use override on server peer
+ dayLimitToUse = serverPeer.getDefaultMessageCounterHistoryDayLimit();
+ }
+
+ MessageCounter counter = new MessageCounter(counterName, subName, queue, true, true, dayLimitToUse);
+
+ serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);
+
+ // Now we need to trigger a delivery - this is because message suckers might have
+ // been create *before* the queue was deployed - this is because message suckers can be
+ // created when the clusterpullconnectionfactory deploy is detected which then causes
+ // the clusterconnectionmanager to inspect the bindings for queues to create suckers
+ // to - but these bindings will exist before the queue or topic is deployed and before
+ // it has had its messages loaded
+ // Therefore we need to trigger a delivery now so remote suckers get messages
+ // See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+ // For JBM we should remove the distinction between activation and deployment to
+ // remove these annoyances and edge cases.
+ // The post office should load(=deploy) all bindings on startup including loading their
+ // state before adding the binding - there should be no separate deployment stage
+ // If the queue can be undeployed there should be a separate flag for this on the
+ // binding
+ queue.deliver();
}
-
- MessageCounter counter =
- new MessageCounter(counterName, subName, queue, true, true,
- dayLimitToUse);
-
- serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);
-
- //Now we need to trigger a delivery - this is because message suckers might have
- //been create *before* the queue was deployed - this is because message suckers can be
- //created when the clusterpullconnectionfactory deploy is detected which then causes
- //the clusterconnectionmanager to inspect the bindings for queues to create suckers
- //to - but these bindings will exist before the queue or topic is deployed and before
- //it has had its messages loaded
- //Therefore we need to trigger a delivery now so remote suckers get messages
- //See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
- //For JBM we should remove the distinction between activation and deployment to
- //remove these annoyances and edge cases.
- //The post office should load(=deploy) all bindings on startup including loading their
- //state before adding the binding - there should be no separate deployment stage
- //If the queue can be undeployed there should be a separate flag for this on the
- //binding
- queue.deliver();
}
serverPeer.getDestinationManager().registerDestination(destination);
@@ -139,6 +149,8 @@
started = true;
log.info(this + " started, fullSize=" + destination.getFullSize() + ", pageSize=" + destination.getPageSize() + ", downCacheSize=" + destination.getDownCacheSize());
+
+
}
catch (Throwable t)
{
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PersistenceManager.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -87,6 +87,13 @@
void addTransaction(Transaction tx);
+ //drop all messages that belong to a channel.
+ void dropChannelMessages(long channelID) throws Exception;
+
+ //merge messages from one channel to another.
+ void mergeChannelMessage(long fromID, long toID) throws Exception;
+
+
// Interface value classes ----------------------------------------------------------------------
class MessageChannelPair
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/PostOffice.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Set;
+import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.messaging.core.impl.tx.Transaction;
/**
@@ -155,5 +156,10 @@
Map getRecoveryArea(String queueName);
int getRecoveryMapSize(String queueName);
+
+ /**
+ * change the destination's clustered state.
+ */
+ Queue convertDestination(ManagedDestination destination, String queueName) throws Throwable;
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/contract/Queue.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -94,4 +94,8 @@
//Optimisation for shared database
Delivery handleMove(MessageReference ref, long sourceChannelID);
+
+ void setClustered(boolean isClustered);
+
+ void staticMerge(Queue queue) throws Exception;
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -1134,6 +1134,169 @@
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.contract.PersistenceManager#dropChannelMessages(long)
+ */
+ public void dropChannelMessages(final long channelID) throws Exception
+ {
+ class ChannelDropRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ PreparedStatement ps2 = null;
+ ResultSet rs = null;
+
+ try
+ {
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+ ps.setLong(1, channelID);
+ rs = ps.executeQuery();
+ int rows;
+
+ ps2 = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE"));
+ while (rs.next())
+ {
+ long mid = rs.getLong(1);
+ ps2.setLong(1, mid);
+ ps2.executeUpdate();
+ }
+ ps2.close();
+
+ ps.close();
+
+ ps = conn.prepareStatement(getSQLStatement("DELETE_CHANNEL_MESSAGE_REF"));
+ ps.setLong(1, channelID);
+ rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update page ord updated " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return null;
+ }
+ }
+
+ new ChannelDropRunner().executeWithRetry();
+ }
+
+ /* (non-Javadoc)
+ * load messages from the channel (fromID) in the DB and
+ * add the messages to the channel (toID)
+ * @see org.jboss.messaging.core.contract.PersistenceManager#mergeChannelMessage(long, long)
+ */
+ public void mergeChannelMessage(final long fromID, final long toID) throws Exception
+ {
+
+ if (fromID == toID) { throw new IllegalArgumentException(
+ "Cannot merge queues - they have the same channel id!!"); }
+
+ class ChannelMergeRunner extends JDBCTxRunner2
+ {
+ public Object doTransaction() throws Exception
+ {
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ PreparedStatement ps2 = null;
+
+ try
+ {
+ //first get max page order of toID channel
+ ps = conn.prepareStatement(getSQLStatement("SELECT_MIN_MAX_PAGE_ORD"));
+
+ ps.setLong(1, toID);
+
+ rs = ps.executeQuery();
+
+ rs.next();
+
+ Long maxOrdering = new Long(rs.getLong(2));
+
+ long pageCount = 0;
+
+ if (rs.wasNull())
+ {
+ maxOrdering = null;
+ }
+ else
+ {
+ //If maxOrdering is not null, update the page order
+ pageCount = maxOrdering + 1;
+ }
+
+ rs.close();
+
+ ps.close();
+
+ if (pageCount > 0)
+ {
+ //update paging
+ ps = conn.prepareStatement(getSQLStatement("LOAD_REFS"));
+
+ ps2 = conn
+ .prepareStatement(getSQLStatement("UPDATE_PAGE_ORDER"));
+
+ ps.setLong(1, fromID);
+
+ rs = ps.executeQuery();
+
+ while (rs.next())
+ {
+ long msgId = rs.getLong(1);
+
+ ps2.setLong(1, pageCount);
+
+ ps2.setLong(2, msgId);
+
+ ps2.setLong(3, fromID);
+
+ int rows = ps2.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update page ord updated " + rows + " rows");
+ }
+
+ pageCount++;
+ }
+ ps2.close();
+ ps.close();
+ }
+
+ //update channel id
+ ps = conn.prepareStatement(getSQLStatement("UPDATE_CHANNEL_ID"));
+
+ ps.setLong(1, toID);
+
+ ps.setLong(2, fromID);
+
+ int rows = ps.executeUpdate();
+
+ if (trace)
+ {
+ log.trace("Update channel id updated " + rows + " rows");
+ }
+ }
+ finally
+ {
+ closeResultSet(rs);
+ closeStatement(ps);
+ closeStatement(ps2);
+ }
+ return null;
+ }
+ }
+
+ new ChannelMergeRunner().executeWithRetry();
+ }
+
public InitialLoadInfo mergeAndLoad(final long fromChannelID,
final long toChannelID, final int numberToLoad,
final long firstPagingOrder, final long nextPagingOrder)
@@ -2668,6 +2831,10 @@
map.put("MESSAGE_ID_COLUMN", "MESSAGE_ID");
map.put("DELETE_MESSAGE",
"DELETE FROM JBM_MSG WHERE MESSAGE_ID = ? AND NOT EXISTS (SELECT JBM_MSG_REF.MESSAGE_ID FROM JBM_MSG_REF WHERE JBM_MSG_REF.MESSAGE_ID = ?)");
+ map.put("DELETE_CHANNEL_MESSAGE_REF", "DELETE FROM JBM_MSG_REF WHERE CHANNEL_ID=?");
+ map.put("DELETE_CHANNEL_MESSAGE", "DELETE FROM JBM_MSG WHERE MESSAGE_ID = ?");
+
+
// Transaction
map.put("INSERT_TRANSACTION",
"INSERT INTO JBM_TX (NODE_ID, TRANSACTION_ID, BRANCH_QUAL, FORMAT_ID, GLOBAL_TXID) "
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/MessagingQueue.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -201,6 +201,13 @@
return remoteDistributor;
}
+ public void staticMerge(Queue fromQueue) throws Exception
+ {
+ if (trace) { log.trace("Merging queue " + channelID + " into " + this +
+ " statically."); }
+ pm.mergeChannelMessage(fromQueue.getChannelID(), channelID);
+ }
+
/**
* Merge the contents of one queue with another - this happens at failover when a queue is failed
* over to another node, but a queue with the same name already exists. In this case we merge the
@@ -708,4 +715,9 @@
}
}
}
+
+ public void setClustered(boolean isClustered)
+ {
+ clustered = isClustered;
+ }
}
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/NullPersistenceManager.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -211,6 +211,14 @@
return timeMark;
}
+ public void dropChannelMessages(long channelID) throws Exception
+ {
+ }
+
+ public void mergeChannelMessage(long fromID, long toID) throws Exception
+ {
+ }
+
}
class IDCounter
Modified: branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -49,7 +49,9 @@
import javax.transaction.TransactionManager;
import org.jboss.jms.client.container.JMSClientVMIdentifier;
+import org.jboss.jms.server.JMSCondition;
import org.jboss.jms.server.ServerPeer;
+import org.jboss.jms.server.destination.ManagedDestination;
import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Binding;
@@ -93,6 +95,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
* @version <tt>$Revision: 2782 $</tt>
*
* $Id: DefaultClusteredPostOffice.java 2782 2007-06-14 12:16:17Z timfox $
@@ -456,17 +459,17 @@
public String getOfficeName()
{
- return officeName;
+ return officeName + thisNodeID;
}
-
public boolean addBinding(Binding binding, boolean allNodes) throws Exception
{
+
if (allNodes && !binding.queue.isClustered())
{
throw new IllegalArgumentException("Cannot bind a non clustered queue on all nodes");
}
-
+
boolean added = internalAddBinding(binding, allNodes, true);
if (added && allNodes && clustered && binding.queue.isClustered())
@@ -484,7 +487,7 @@
return added;
}
-
+
public Binding removeBinding(String queueName, boolean allNodes) throws Throwable
{
Binding binding = internalRemoveBinding(queueName, allNodes, true);
@@ -755,7 +758,95 @@
}
}
- public void injectServerPeer(ServerPeer serverPeer)
+ /*
+ * Convert an existing destination:
+ * if it is clustered, change it to be non-clustered.
+ * if it is non-clustered, change it to be clustered.
+ *
+ * Note: this method should be called during the destination loading time, i.e.
+ * the queue is not activated yet.
+ */
+ public Queue convertDestination(ManagedDestination newDest, String queueName) throws Throwable
+ {
+ Binding b = getBindingForQueueName(queueName);
+
+ if (b.queue.isActive())
+ {
+ throw new IllegalStateException(this + " cannot convert the destination " + b.queue + " because it is in active state");
+ }
+
+ //if a destination changes from clustered to standalone, collect the messages from all channels
+ Collection allBindings = getAllBindingsForQueueName(queueName);
+ if (newDest.isDropOldMessageOnRedeploy())
+ {
+ removeDBChannelMessages(allBindings);
+ }
+ else
+ {
+ if ( !newDest.isClustered() )
+ {
+ mergeDBChannelMessages(allBindings, b.queue);
+ }
+ }
+
+ //if the old destination is clustered, we need to remove from all nodes (queue and durable subs)
+ b = removeBinding(queueName, !newDest.isClustered());
+
+ b.queue.setClustered(newDest.isClustered());
+
+ if (newDest.isQueue())
+ {
+ JMSCondition queueCond = new JMSCondition(true, queueName);
+ addBinding(new Binding(queueCond, b.queue, false), false);
+ }
+ else
+ {
+ b.queue.setClustered(newDest.isClustered());
+ JMSCondition queueCond = new JMSCondition(false, newDest.getName());
+ addBinding(new Binding(queueCond, b.queue, true), newDest.isClustered());
+ }
+
+ return b.queue;
+ }
+
+ private void removeDBChannelMessages(final Collection allBindings) throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+ Iterator itBindings = allBindings.iterator();
+
+ while (itBindings.hasNext())
+ {
+ Binding bd = (Binding)itBindings.next();
+ long channelID = bd.queue.getChannelID();
+ pm.dropChannelMessages(channelID);
+ }
+ }
+
+ private void mergeDBChannelMessages(final Collection allBindings, Queue localQueue) throws Exception
+ {
+ if (ds == null)
+ {
+ return;
+ }
+
+ //do a staticMerge
+ Iterator itBindings = allBindings.iterator();
+
+ while (itBindings.hasNext())
+ {
+ Binding bd = (Binding)itBindings.next();
+ long fromChannelID = bd.queue.getChannelID();
+ if (fromChannelID != localQueue.getChannelID())
+ {
+ localQueue.staticMerge(bd.queue);
+ }
+ }
+ }
+
+ public void injectServerPeer(ServerPeer serverPeer)
{
this.serverPeer = serverPeer;
}
@@ -2620,7 +2711,6 @@
groupMember.unicastData(request, address);
}
-
private Map getBindingsFromStorage() throws Exception
{
if (ds == null)
@@ -2676,7 +2766,7 @@
true, filter, bindingClustered && clustered);
if (trace) { log.trace(this + " loaded binding from storage: " + queueName); }
-
+
Condition condition = conditionFactory.createCondition(conditionText);
Binding binding = new Binding(condition, queue, allNodes);
@@ -2707,7 +2797,7 @@
while (iter.hasNext())
{
Binding binding = (Binding)iter.next();
-
+
addBindingInMemory(binding);
Queue queue = binding.queue;
Copied: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java (from rev 7882, branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java)
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java (rev 0)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -0,0 +1,1091 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.jms.tx.MessagingXid;
+import org.jboss.test.messaging.tools.ServerManagement;
+
+/**
+ * Test for https://jira.jboss.org/jira/browse/JBMESSAGING-1742
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ *
+ */
+public class DestinationRedeployTest extends ClusteringTestBase
+{
+ //clustered2NonclusteredQueue
+ private Queue cQueue;
+
+ //nonclustered2ClusteredQueue
+ private Queue nQueue;
+
+ //clustered2NonclusteredTopic
+ private Topic cTopic;
+
+ //nonclustered2ClusteredTopic
+ private Topic nTopic;
+
+ public DestinationRedeployTest(String name)
+ {
+ super(name);
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ //do a redeploy and test the queues work normally by sending some messages and receiving them.
+ public void testRedeployQueue() throws Exception
+ {
+ String msgBase = "testRedeployQueue";
+ int numMsg = 50;
+
+ deployDestinations();
+ redeployDestinations(true);
+
+ sendMessages(0, cQueue, msgBase, numMsg);
+ sendMessages(1, nQueue, msgBase, numMsg);
+
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //do a redeploy and test the topics work normally by sending some messages and receiving them.
+ public void testRedeployTopic() throws Exception
+ {
+ String msgBase = "testRedeployTopic";
+ int numMsg = 50;
+
+ deployDestinations();
+ redeployDestinations(true);
+
+ Connection conn = null;
+ Session sess = null;
+ try
+ {
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer sub1 = sess.createConsumer(cTopic);
+ MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ conn.start();
+
+ sendMessages(0, cTopic, msgBase, numMsg);
+ sendMessages(1, nTopic, msgBase, numMsg);
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub2.close();
+ sess.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testRedeployTopicNoMessageLoss() throws Exception
+ {
+ String msgBase = "testRedeployTopicNoMessageLoss";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ Connection conn = null;
+ Session sess = null;
+ try
+ {
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+ MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ conn.close();
+
+ sendMessages(2, cTopic, msgBase, numMsg);
+ sendMessages(0, nTopic, msgBase, numMsg);
+
+ redeployDestinations(true);
+
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+
+ sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub1.close();
+ sub2.close();
+ sess.unsubscribe("sub1");
+ sess.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testRedeployTopicWithMessageLoss() throws Exception
+ {
+ String msgBase = "testRedeployTopicWithMessageLoss";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ Connection conn = null;
+ Session sess = null;
+ try
+ {
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ sess.createDurableSubscriber(cTopic, "sub1");
+ sess.createDurableSubscriber(nTopic, "sub2");
+
+ conn.close();
+
+ sendMessages(2, cTopic, msgBase, numMsg);
+ sendMessages(0, nTopic, msgBase, numMsg);
+
+ redeployDestinations(false);
+
+ checkEmpty(cTopic);
+ checkEmpty(nTopic);
+
+ sendMessages(0, cTopic, msgBase, numMsg);
+ sendMessages(2, nTopic, msgBase, numMsg);
+
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+
+ MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+ MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub1.close();
+ sub2.close();
+
+ sess.unsubscribe("sub1");
+ sess.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ //send some messages to topics and receive a few of them. Then do redeploy and try to receive the rest.
+ public void testRedeployTopicNoMessageLoss2() throws Exception
+ {
+ String msgBase = "testRedeployTopicNoMessageLoss2";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ Connection conn = null;
+ Session sess = null;
+ try
+ {
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+ MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ sendMessages(2, cTopic, msgBase, numMsg);
+ sendMessages(0, nTopic, msgBase, numMsg);
+
+ //receive 10
+ conn.start();
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ conn.close();
+
+ redeployDestinations(true);
+
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+
+ sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ for (int i = 10; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub1.close();
+ sub2.close();
+ sess.unsubscribe("sub1");
+ sess.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+ public void testRedeployTopicNoMessageLossTX() throws Exception
+ {
+ String msgBase = "testRedeployTopicNoMessageLossTX";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ Connection conn = null;
+ Session sess1 = null;
+ Session sess2 = null;
+
+ try
+ {
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess1 = conn.createSession(true, Session.SESSION_TRANSACTED);
+ sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+ MessageConsumer sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+ sendMessages(2, cTopic, msgBase, numMsg);
+ sendMessages(0, nTopic, msgBase, numMsg);
+
+ //receive 10
+ conn.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sess1.commit();
+ sess2.rollback();
+
+ conn.close();
+
+ redeployDestinations(true);
+
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn.start();
+
+ sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+
+ sub2 = sess1.createDurableSubscriber(nTopic, "sub2");
+
+ for (int i = 10; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub1.close();
+ sub2.close();
+ sess1.unsubscribe("sub1");
+ sess1.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ //send some messages to topics and receive a few within a tx, then redeply and receive them all.
+ public void testRedeployTopicNoMessageLossXA() throws Exception
+ {
+ String msgBase = "testRedeployTopicNoMessageLossXA";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ XAConnection xaconn1 = null;
+ XASession xasess1 = null;
+ XAResource xres1 = null;
+ Session sess1 = null;
+
+ XAConnection xaconn2 = null;
+ XASession xasess2 = null;
+ XAResource xres2 = null;
+ Session sess2 = null;
+
+ Xid xid1 = null;
+ Xid xid2 = null;
+
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ try
+ {
+ xaconn1 = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, 1);
+ xaconn1.setClientID("client-id-0");
+ xasess1 = xaconn1.createXASession();
+ xres1 = xasess1.getXAResource();
+ xaconn1.start();
+ sess1 = xasess1.getSession();
+
+ xaconn2 = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, 0);
+ xaconn2.setClientID("client-id-1");
+ xasess2 = xaconn2.createXASession();
+ xres2 = xasess2.getXAResource();
+ xaconn2.start();
+ sess2 = xasess2.getSession();
+
+ xid1 = new MessagingXid(("bq1" + cTopic).getBytes(), 42, cTopic.toString().getBytes());
+ xid2 = new MessagingXid(("bq1" + nTopic).getBytes(), 42, nTopic.toString().getBytes());
+
+ xres1.start(xid1, XAResource.TMNOFLAGS);
+ xres2.start(xid2, XAResource.TMNOFLAGS);
+
+ MessageConsumer sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+ MessageConsumer sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+ sendMessages(2, cTopic, msgBase, numMsg);
+ sendMessages(0, nTopic, msgBase, numMsg);
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ xres1.end(xid1, XAResource.TMSUCCESS);
+ xres2.end(xid2, XAResource.TMSUCCESS);
+
+ xres1.commit(xid1, true);
+ xres2.rollback(xid2);
+
+ sub1.close();
+ sub2.close();
+
+ xaconn1.close();
+ xaconn2.close();
+
+ redeployDestinations(true);
+
+ conn1 = createConnectionOnServer(cf, 0);
+ conn1.setClientID("client-id-0");
+ sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn2 = createConnectionOnServer(cf, 0);
+ conn2.setClientID("client-id-1");
+ sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn1.start();
+ conn2.start();
+
+ sub1 = sess1.createDurableSubscriber(cTopic, "sub1");
+
+ sub2 = sess2.createDurableSubscriber(nTopic, "sub2");
+
+ for (int i = 10; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub1.close();
+ sub2.close();
+ sess1.unsubscribe("sub1");
+ sess2.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ }
+ }
+
+ //send some messages to queues and do redeploy, then receiving them.
+ public void testRedeployQueueNoMessageLoss() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLoss";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and do redeploy, dropping all messages
+ public void testRedeployQueueWithMessageLoss() throws Exception
+ {
+ String msgBase = "testRedeployQueueWithMessageLoss";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ redeployDestinations(false);
+
+ checkEmpty(cQueue);
+ checkEmpty(nQueue);
+
+ sendMessages(0, cQueue, msgBase, numMsg);
+ sendMessages(1, nQueue, msgBase, numMsg);
+
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
+ public void testRedeployQueueNoMessageLoss2() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLoss2";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessages(0, cQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+ receiveMessages(0, nQueue, msgBase, 0, 10, Session.AUTO_ACKNOWLEDGE, false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 10, numMsg - 10, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and receive a few within a tx, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossTX() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossTX";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, false, "commit", false);
+ receiveMessagesTX(0, nQueue, msgBase, 0, 10, false, "rollback", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "commit", false);
+ receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "rollback", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA2() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA2";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(0, cQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "prepared", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 10, numMsg - 10, Session.AUTO_ACKNOWLEDGE, false);
+
+ recoverMessages(0, cQueue, msgBase, 0, 10);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA3() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA3";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, nQueue, msgBase, 0, 15, true, "prepared", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(2, nQueue, msgBase, 15, numMsg - 15, Session.CLIENT_ACKNOWLEDGE, true);
+
+ //have to recover from node 0.
+ recoverMessages(0, nQueue, msgBase, 0, 15);
+ }
+
+ //send some messages to queues and receive a few within a XA, then redeply and receive them all.
+ public void testRedeployQueueNoMessageLossXA4() throws Exception
+ {
+ String msgBase = "testRedeployQueueNoMessageLossXA4";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ receiveMessagesTX(0, cQueue, msgBase, 0, 10, true, "noaction", false);
+ receiveMessagesTX(0, nQueue, msgBase, 0, 10, true, "noaction", false);
+
+ redeployDestinations(true);
+
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
+ /*
+ * Deploy the following destinations:
+ *
+ * 1. clustered2NonclusteredQueue : a clustered queue used to be re-deployed as non-clustered.
+ * 2. nonclustered2ClusteredQueue : a non-clustered queue (at node0) to be re-deployed as clustered.
+ * 3. clustered2NonclusteredTopic : a clustered topic used to be re-deployed as non-clustered.
+ * 4. nonclustered2ClusteredTopic : a non-clustered topic (at node0) to be re-deployed as clustered.
+ *
+ */
+ private void deployDestinations() throws Exception
+ {
+ for (int i = 0; i < nodeCount; i++)
+ {
+ ServerManagement.deployQueue("clustered2NonclusteredQueue", i);
+ ServerManagement.deployTopic("clustered2NonclusteredTopic", i);
+ }
+ ServerManagement.deployQueue("nonclustered2ClusteredQueue");
+ ServerManagement.deployTopic("nonclustered2ClusteredTopic");
+
+ cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+ nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+ cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+ nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
+
+ Queue anotherQueue = null;
+ try
+ {
+ anotherQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+ assertNotNull(anotherQueue);
+ }
+ catch (NamingException e)
+ {
+ fail("The queue " + anotherQueue + " should not exist after redeploy");
+ }
+
+ Topic anotherTopic = null;
+ try
+ {
+ anotherTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+ assertNotNull(anotherTopic);
+ }
+ catch (NamingException e)
+ {
+ fail("The topic " + anotherTopic + " should not exist after redeploy");
+ }
+ }
+
+ private void redeployDestinations(boolean keepMessage) throws Exception
+ {
+ if (keepMessage)
+ {
+ redeployDestinationsWithMessage();
+ }
+ else
+ {
+ redeployDestinationsNoMessage();
+ }
+ }
+
+ private void redeployDestinationsNoMessage() throws Exception
+ {
+ for (int i = 0; i < nodeCount; i++)
+ {
+ ServerManagement.stop(i);
+ }
+
+ //Restart nodes
+ for (int i = 0; i < nodeCount; i++)
+ {
+ startDefaultServer(i, overrides, false);
+ ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+ }
+
+ //redeploy
+ for (int i = 0; i < nodeCount; i++)
+ {
+ ServerManagement.deployQueue("nonclustered2ClusteredQueue", i, false);
+ ServerManagement.deployTopic("nonclustered2ClusteredTopic", i, false);
+ }
+ ServerManagement.deployQueue("clustered2NonclusteredQueue", false);
+ ServerManagement.deployTopic("clustered2NonclusteredTopic", false);
+
+
+ cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+ nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+ cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+ nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
+
+ cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+ try
+ {
+ Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+ fail("The queue " + nonExistQueue + " should not exist after redeploy");
+ }
+ catch (NamingException e)
+ {
+ //ok
+ }
+
+ try
+ {
+ Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+ fail("The topic " + nonExistTopic + " should not exist after redeploy");
+ }
+ catch (NamingException e)
+ {
+ //ok
+ }
+
+
+ }
+
+ private void redeployDestinationsWithMessage() throws Exception
+ {
+ for (int i = 0; i < nodeCount; i++)
+ {
+ ServerManagement.stop(i);
+ }
+
+ //Restart nodes
+ for (int i = 0; i < nodeCount; i++)
+ {
+ startDefaultServer(i, overrides, false);
+ ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+ }
+
+ //redeploy
+ for (int i = 0; i < nodeCount; i++)
+ {
+ ServerManagement.deployQueue("nonclustered2ClusteredQueue", i);
+ ServerManagement.deployTopic("nonclustered2ClusteredTopic", i);
+ }
+ ServerManagement.deployQueue("clustered2NonclusteredQueue");
+ ServerManagement.deployTopic("clustered2NonclusteredTopic");
+
+
+ cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+ nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+ cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+ nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
+
+ cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+ try
+ {
+ Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+ fail("The queue " + nonExistQueue + " should not exist after redeploy");
+ }
+ catch (NamingException e)
+ {
+ //ok
+ }
+
+ try
+ {
+ Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+ fail("The topic " + nonExistTopic + " should not exist after redeploy");
+ }
+ catch (NamingException e)
+ {
+ //ok
+ }
+
+
+ }
+
+ private void sendMessages(int serverIndex, Destination dest, String msgBase, int numMsg) throws Exception
+ {
+ Connection conn = null;
+ try
+ {
+ conn = createConnectionOnServer(cf, serverIndex);
+ Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = sess.createProducer(dest);
+ log.info("-----Sending messages to: " + dest);
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage msg = sess.createTextMessage(msgBase + i);
+ producer.send(msg);
+ }
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+
+ private void receiveMessages(int serverIndex, Destination dest, String msgBase, int startIndex,
+ int numMsg, int ack, boolean checkEmpty) throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ Session sess = null;
+
+ conn = createConnectionOnServer(cf, serverIndex);
+ sess = conn.createSession(false, ack);
+ conn.start();
+
+ MessageConsumer receiver = sess.createConsumer(dest);
+ TextMessage msg = null;
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ msg = (TextMessage)receiver.receive(5000);
+ assertEquals(msgBase + (startIndex + i), msg.getText());
+ }
+
+ if (ack == Session.CLIENT_ACKNOWLEDGE)
+ {
+ msg.acknowledge();
+ }
+
+ if (checkEmpty)
+ {
+ if (dest instanceof Queue)
+ {
+ checkEmpty((Queue)dest);
+ }
+ else
+ {
+ checkEmpty((Topic)dest);
+ }
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ /*
+ * receive messages transactionally.
+ *
+ * outcome values:
+ *
+ * commit -- commit the transaction
+ * rollback -- rollback the transaction
+ * prepared -- parepared the transaction but not commit.
+ *
+ */
+ private void receiveMessagesTX(int serverIndex, Destination dest, String msgBase, int startIndex,
+ int numMsg, boolean isXA, String outcome, boolean checkEmpty) throws Exception
+ {
+ Connection conn = null;
+ XAConnection xaconn = null;
+ Session sess = null;
+ XASession xasess = null;
+ XAResource xres = null;
+ Xid xid = null;
+ try
+ {
+ if (isXA)
+ {
+ xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+ xasess = xaconn.createXASession();
+ xres = xasess.getXAResource();
+ xaconn.start();
+ sess = xasess.getSession();
+
+ xid = new MessagingXid(("bq1" + dest).getBytes(), 42, dest.toString().getBytes());
+
+ xres.start(xid, XAResource.TMNOFLAGS);
+ }
+ else
+ {
+ //local tx
+ conn = createConnectionOnServer(cf, serverIndex);
+ sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ conn.start();
+ }
+
+ //starting receiving
+ MessageConsumer cons = sess.createConsumer(dest);
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)cons.receive(5000);
+ assertEquals(msgBase + (i + startIndex), rm.getText());
+ }
+
+ //ending
+ if (isXA)
+ {
+ xres.end(xid, XAResource.TMSUCCESS);
+
+ if ("commit".equals(outcome))
+ {
+ //just one-phase is enough for the test
+ xres.commit(xid, true);
+ }
+ else if ("rollback".equals(outcome))
+ {
+ xres.rollback(xid);
+ }
+ else if ("prepared".equals(outcome))
+ {
+ xres.prepare(xid);
+ }
+ }
+ else
+ {
+ //local
+ if ("commit".equals(outcome))
+ {
+ sess.commit();
+ }
+ else if ("rollback".equals(outcome))
+ {
+ sess.rollback();
+ }
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ }
+ }
+
+ //recover the messages in transactions by rollback.
+ private void recoverMessages(int serverIndex, Destination dest,
+ String msgBase, int startIndex, int numMsg) throws Exception
+ {
+ Connection conn = null;
+ XAConnection xaconn = null;
+ Session sess = null;
+ XASession xasess = null;
+ XAResource xres = null;
+
+ try
+ {
+ xaconn = (XAConnection)this.createXAConnectionOnServer((XAConnectionFactory)cf, serverIndex);
+ xasess = xaconn.createXASession();
+ xres = xasess.getXAResource();
+ xaconn.start();
+
+ Xid[] xids = xres.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+
+ Xid[] xids2 = xres.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, xids2.length);
+
+ conn = this.createConnectionOnServer(cf, serverIndex);
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+
+ MessageConsumer cons = sess.createConsumer(dest);
+ TextMessage rm = (TextMessage)cons.receive(5000);
+ assertNull(rm);
+
+ xres.rollback(xids[0]);
+
+ conn.stop();
+ conn.start();
+ for (int i = 0; i < numMsg; i++)
+ {
+ rm = (TextMessage)cons.receive(5000);
+ assertEquals(msgBase + (startIndex + i), rm.getText());
+ }
+
+ if (dest instanceof Queue)
+ {
+ checkEmpty((Queue)dest);
+ }
+ else
+ {
+ checkEmpty((Topic)dest);
+ }
+ }
+ finally
+ {
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/FakeClusterConnectionManager.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -392,6 +392,14 @@
{
return null;
}
+
+ public void setClustered(boolean isClustered)
+ {
+ }
+
+ public void staticMerge(org.jboss.messaging.core.contract.Queue queue) throws Exception
+ {
+ }
}
// Inner classes -------------------------------------------------
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -1357,4 +1357,28 @@
}
}
+ public static void deployQueue(String name, int serverIndex, boolean keepMessage) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployQueue(name, null, true, keepMessage);
+ }
+
+ public static void deployTopic(String name, int serverIndex, boolean keepMessage) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployTopic(name, null, true, keepMessage);
+ }
+
+ public static void deployQueue(String name, boolean keepMessage) throws Exception
+ {
+ insureStarted();
+ servers[0].getServer().deployQueue(name, null, false, keepMessage);
+ }
+
+ public static void deployTopic(String name, boolean keepMessage) throws Exception
+ {
+ insureStarted();
+ servers[0].getServer().deployTopic(name, null, false, keepMessage);
+ }
+
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -561,5 +561,21 @@
return namingDelegate;
}
+ /* (non-Javadoc)
+ * @see org.jboss.test.messaging.tools.container.Server#deployQueue(java.lang.String, java.lang.String, boolean, boolean)
+ */
+ public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ server.deployQueue(name, jndiName, clustered, keepMessage);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.test.messaging.tools.container.Server#deployTopic(java.lang.String, java.lang.String, boolean, boolean)
+ */
+ public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ server.deployTopic(name, jndiName, clustered, keepMessage);
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java 2009-10-29 08:40:02 UTC (rev 7882)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/tools/container/Server.java 2009-10-29 11:15:21 UTC (rev 7883)
@@ -298,4 +298,8 @@
ObjectName getPostOfficeObjectName() throws Exception;
+ void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
+ void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
}
More information about the jboss-cvs-commits
mailing list