[jboss-cvs] JBoss Messaging SVN: r7879 - in branches/JBMESSAGING-1742: src/main/org/jboss/messaging/core/impl/postoffice and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Oct 28 12:06:32 EDT 2009
Author: gaohoward
Date: 2009-10-28 12:06:30 -0400 (Wed, 28 Oct 2009)
New Revision: 7879
Modified:
branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java
Log:
test good
Modified: branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java
===================================================================
--- branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/integration/EAP4/tests-src/org/jboss/test/messaging/tools/container/LocalTestServer.java 2009-10-28 16:06:30 UTC (rev 7879)
@@ -737,6 +737,34 @@
sc.invoke(deston, "start", new Object[0], new String[0]);
}
+ public void deployDestination(boolean isQueue, String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ String config = "<mbean code=\"org.jboss.jms.server.destination." + (isQueue ? "QueueService" : "TopicService") +
+ "\"" +
+ " name=\"jboss.messaging.destination:service=" +
+ (isQueue ? "Queue" : "Topic") +
+ ",name=" +
+ name +
+ "\"" +
+ " xmbean-dd=\"xmdesc/" +
+ (isQueue ? "Queue" : "Topic") +
+ "-xmbean.xml\">" +
+ (jndiName != null ? " <attribute name=\"JNDIName\">" + jndiName + "</attribute>" : "") +
+ " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>" +
+ " <attribute name=\"Clustered\">" +
+ String.valueOf(clustered) +
+ "</attribute>" +
+ " <attribute name=\"DropOldMessageOnRedeploy\">" +
+ String.valueOf(!keepMessage) +
+ "</attribute>" +
+ "</mbean>";
+
+ MBeanConfigurationElement mbean = new MBeanConfigurationElement(XMLUtil.stringToElement(config));
+ ObjectName deston = sc.registerAndConfigureService(mbean);
+ sc.invoke(deston, "create", new Object[0], new String[0]);
+ sc.invoke(deston, "start", new Object[0], new String[0]);
+ }
+
public void deployDestination(boolean isQueue,
String name,
String jndiName,
@@ -1100,6 +1128,17 @@
}
}
+ public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ deployDestination(true, name, jndiName, clustered, keepMessage);
+ }
+
+ public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ deployDestination(false, name, jndiName, clustered, keepMessage);
+
+ }
+
// Inner classes --------------------------------------------------------------------------------
}
Modified: branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2009-10-28 16:06:30 UTC (rev 7879)
@@ -775,23 +775,21 @@
{
Binding b = getBindingForQueueName(queueName);
- logger.log(thisNodeID + "> wwwwwww new Dest: " + newDest.getName() + " queue found: " + queueName);
-
if (b.queue.isActive())
{
throw new IllegalStateException(this + " cannot convert the destination " + b.queue + " because it is in active state");
}
//if a destination changes from clustered to standalone, collect the messages from all channels
- if ( !newDest.isClustered() )
+ Collection allBindings = getAllBindingsForQueueName(queueName);
+ if (newDest.isDropOldMessageOnRedeploy())
{
- Collection allBindings = getAllBindingsForQueueName(queueName);
- if (newDest.isDropOldMessageOnRedeploy())
+ removeDBChannelMessages(allBindings);
+ }
+ else
+ {
+ if ( !newDest.isClustered() )
{
- removeDBChannelMessages(allBindings);
- }
- else
- {
mergeDBChannelMessages(allBindings, b.queue);
}
}
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/jms/clustering/DestinationRedeployTest.java 2009-10-28 16:06:30 UTC (rev 7879)
@@ -36,17 +36,12 @@
import javax.jms.XASession;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.tx.MessagingXid;
-import org.jboss.test.messaging.jms.clustering.XAFailoverTest.DummyXAResource;
import org.jboss.test.messaging.tools.ServerManagement;
-import org.jboss.test.messaging.tools.container.InVMInitialContextFactory;
-import org.jboss.test.messaging.tools.container.ServiceContainer;
/**
* Test for https://jira.jboss.org/jira/browse/JBMESSAGING-1742
@@ -67,13 +62,7 @@
//nonclustered2ClusteredTopic
private Topic nTopic;
-
- private ServiceContainer sc;
- private TransactionManager tm;
-
- private Transaction suspended;
-
public DestinationRedeployTest(String name)
{
super(name);
@@ -82,28 +71,11 @@
protected void setUp() throws Exception
{
super.setUp();
- sc = new ServiceContainer("transaction");
-
- //Don't drop the tables again!
- sc.start(false);
-
- InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
-
- tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
-
- suspended = tm.suspend();
}
protected void tearDown() throws Exception
{
super.tearDown();
-
- sc.stop();
-
- if (suspended != null)
- {
- tm.resume(suspended);
- }
}
//do a redeploy and test the queues work normally by sending some messages and receiving them.
@@ -118,8 +90,8 @@
sendMessages(0, cQueue, msgBase, numMsg);
sendMessages(1, nQueue, msgBase, numMsg);
- receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, false);
- receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, false);
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
}
//do a redeploy and test the topics work normally by sending some messages and receiving them.
@@ -225,6 +197,69 @@
}
}
+ public void testRedeployTopicWithMessageLoss() throws Exception
+ {
+ String msgBase = "testRedeployTopicWithMessageLoss";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ Connection conn = null;
+ Session sess = null;
+ try
+ {
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ sess.createDurableSubscriber(cTopic, "sub1");
+ sess.createDurableSubscriber(nTopic, "sub2");
+
+ conn.close();
+
+ sendMessages(2, cTopic, msgBase, numMsg);
+ sendMessages(0, nTopic, msgBase, numMsg);
+
+ redeployDestinations(false);
+
+ checkEmpty(cTopic);
+ checkEmpty(nTopic);
+
+ sendMessages(0, cTopic, msgBase, numMsg);
+ sendMessages(2, nTopic, msgBase, numMsg);
+
+ conn = createConnectionOnServer(cf, 0);
+ conn.setClientID("client-id-0");
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ conn.start();
+
+ MessageConsumer sub1 = sess.createDurableSubscriber(cTopic, "sub1");
+ MessageConsumer sub2 = sess.createDurableSubscriber(nTopic, "sub2");
+
+ for (int i = 0; i < numMsg; i++)
+ {
+ TextMessage rm = (TextMessage)sub1.receive(5000);
+ log.info("--Message received: " + rm);
+ assertEquals(msgBase + i, rm.getText());
+ rm = (TextMessage)sub2.receive(5000);
+ assertEquals(msgBase + i, rm.getText());
+ }
+
+ sub1.close();
+ sub2.close();
+
+ sess.unsubscribe("sub1");
+ sess.unsubscribe("sub2");
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
//send some messages to topics and receive a few of them. Then do redeploy and try to receive the rest.
public void testRedeployTopicNoMessageLoss2() throws Exception
{
@@ -512,6 +547,29 @@
receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
}
+ //send some messages to queues and do redeploy, dropping all messages
+ public void testRedeployQueueWithMessageLoss() throws Exception
+ {
+ String msgBase = "testRedeployQueueWithMessageLoss";
+ int numMsg = 50;
+
+ deployDestinations();
+
+ sendMessages(1, cQueue, msgBase, numMsg);
+ sendMessages(0, nQueue, msgBase, numMsg);
+
+ redeployDestinations(false);
+
+ checkEmpty(cQueue);
+ checkEmpty(nQueue);
+
+ sendMessages(0, cQueue, msgBase, numMsg);
+ sendMessages(1, nQueue, msgBase, numMsg);
+
+ receiveMessages(0, cQueue, msgBase, 0, numMsg, Session.AUTO_ACKNOWLEDGE, true);
+ receiveMessages(2, nQueue, msgBase, 0, numMsg, Session.CLIENT_ACKNOWLEDGE, true);
+ }
+
//send some messages to queues and receive a few of them. Then do redeploy and try to receive the rest.
public void testRedeployQueueNoMessageLoss2() throws Exception
{
@@ -680,6 +738,18 @@
private void redeployDestinations(boolean keepMessage) throws Exception
{
+ if (keepMessage)
+ {
+ redeployDestinationsWithMessage();
+ }
+ else
+ {
+ redeployDestinationsNoMessage();
+ }
+ }
+
+ private void redeployDestinationsNoMessage() throws Exception
+ {
for (int i = 0; i < nodeCount; i++)
{
ServerManagement.stop(i);
@@ -695,6 +765,60 @@
//redeploy
for (int i = 0; i < nodeCount; i++)
{
+ ServerManagement.deployQueue("nonclustered2ClusteredQueue", i, false);
+ ServerManagement.deployTopic("nonclustered2ClusteredTopic", i, false);
+ }
+ ServerManagement.deployQueue("clustered2NonclusteredQueue", false);
+ ServerManagement.deployTopic("clustered2NonclusteredTopic", false);
+
+
+ cQueue = (Queue)ic[0].lookup("queue/clustered2NonclusteredQueue");
+ nQueue = (Queue)ic[0].lookup("queue/nonclustered2ClusteredQueue");
+ cTopic = (Topic)ic[0].lookup("topic/clustered2NonclusteredTopic");
+ nTopic = (Topic)ic[0].lookup("topic/nonclustered2ClusteredTopic");
+
+ cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
+
+ try
+ {
+ Queue nonExistQueue = (Queue)ic[1].lookup("queue/clustered2NonclusteredQueue");
+ fail("The queue " + nonExistQueue + " should not exist after redeploy");
+ }
+ catch (NamingException e)
+ {
+ //ok
+ }
+
+ try
+ {
+ Topic nonExistTopic = (Topic)ic[1].lookup("topic/clustered2NonclusteredTopic");
+ fail("The topic " + nonExistTopic + " should not exist after redeploy");
+ }
+ catch (NamingException e)
+ {
+ //ok
+ }
+
+
+ }
+
+ private void redeployDestinationsWithMessage() throws Exception
+ {
+ for (int i = 0; i < nodeCount; i++)
+ {
+ ServerManagement.stop(i);
+ }
+
+ //Restart nodes
+ for (int i = 0; i < nodeCount; i++)
+ {
+ startDefaultServer(i, overrides, false);
+ ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
+ }
+
+ //redeploy
+ for (int i = 0; i < nodeCount; i++)
+ {
ServerManagement.deployQueue("nonclustered2ClusteredQueue", i);
ServerManagement.deployTopic("nonclustered2ClusteredTopic", i);
}
@@ -774,12 +898,9 @@
MessageConsumer receiver = sess.createConsumer(dest);
TextMessage msg = null;
- log.info("<<<<<<<<<<<< " + numMsg + " >>>>>>>>>>>>>>");
-
for (int i = 0; i < numMsg; i++)
{
msg = (TextMessage)receiver.receive(5000);
- log.info("---------------------------------------i: " + i);
assertEquals(msgBase + (startIndex + i), msg.getText());
}
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2009-10-28 16:06:30 UTC (rev 7879)
@@ -1357,4 +1357,28 @@
}
}
+ public static void deployQueue(String name, int serverIndex, boolean keepMessage) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployQueue(name, null, true, keepMessage);
+ }
+
+ public static void deployTopic(String name, int serverIndex, boolean keepMessage) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployTopic(name, null, true, keepMessage);
+ }
+
+ public static void deployQueue(String name, boolean keepMessage) throws Exception
+ {
+ insureStarted();
+ servers[0].getServer().deployQueue(name, null, false, keepMessage);
+ }
+
+ public static void deployTopic(String name, boolean keepMessage) throws Exception
+ {
+ insureStarted();
+ servers[0].getServer().deployTopic(name, null, false, keepMessage);
+ }
+
}
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/RMITestServer.java 2009-10-28 16:06:30 UTC (rev 7879)
@@ -561,5 +561,21 @@
return namingDelegate;
}
+ /* (non-Javadoc)
+ * @see org.jboss.test.messaging.tools.container.Server#deployQueue(java.lang.String, java.lang.String, boolean, boolean)
+ */
+ public void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ server.deployQueue(name, jndiName, clustered, keepMessage);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.test.messaging.tools.container.Server#deployTopic(java.lang.String, java.lang.String, boolean, boolean)
+ */
+ public void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception
+ {
+ server.deployTopic(name, jndiName, clustered, keepMessage);
+ }
+
// Inner classes -------------------------------------------------
}
Modified: branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java
===================================================================
--- branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java 2009-10-28 14:54:35 UTC (rev 7878)
+++ branches/JBMESSAGING-1742/tests/src/org/jboss/test/messaging/tools/container/Server.java 2009-10-28 16:06:30 UTC (rev 7879)
@@ -298,4 +298,8 @@
ObjectName getPostOfficeObjectName() throws Exception;
+ void deployQueue(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
+ void deployTopic(String name, String jndiName, boolean clustered, boolean keepMessage) throws Exception;
+
}
More information about the jboss-cvs-commits
mailing list