[jboss-cvs] JBoss Messaging SVN: r2832 - in trunk: src/main/org/jboss/messaging/core/impl/postoffice and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 3 17:53:32 EDT 2007
Author: timfox
Date: 2007-07-03 17:53:31 -0400 (Tue, 03 Jul 2007)
New Revision: 2832
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
Add some more local queues in cluster tests and minor fix
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-07-03 21:53:31 UTC (rev 2832)
@@ -390,7 +390,7 @@
{
// Temporary queues must be unbound on ALL nodes of the cluster
- postOffice.removeBinding(dest.getName(), true);
+ postOffice.removeBinding(dest.getName(), postOffice.isClustered());
}
else
{
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-07-03 21:53:31 UTC (rev 2832)
@@ -734,7 +734,7 @@
}
}
- postOffice.removeBinding(sub.getName(), sub.isClustered());
+ postOffice.removeBinding(sub.getName(), sub.isClustered() && postOffice.isClustered());
String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + sub.getName();
@@ -1366,7 +1366,8 @@
// Durable subs must be bound on ALL nodes of the cluster (if clustered)
- postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), postOffice.isClustered());
+ postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true),
+ postOffice.isClustered() && mDest.isClustered());
queue.activate();
@@ -1440,7 +1441,7 @@
// Durable subs must be unbound on ALL nodes of the cluster
- postOffice.removeBinding(queue.getName(), mDest.isClustered());
+ postOffice.removeBinding(queue.getName(), postOffice.isClustered() && mDest.isClustered());
// create a fresh new subscription
@@ -1453,7 +1454,8 @@
// Durable subs must be bound on ALL nodes of the cluster
- postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), postOffice.isClustered());
+ postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true),
+ postOffice.isClustered() && mDest.isClustered());
queue.activate();
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-07-03 21:53:31 UTC (rev 2832)
@@ -1551,7 +1551,7 @@
if (trace) { log.trace(this + " is a remote queue"); }
- if (!queue.isRecoverable())
+ if (!queue.isRecoverable() && queue.isClustered())
{
//When we send to the cluster we never send to reliable queues
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java 2007-07-03 21:53:31 UTC (rev 2832)
@@ -29,10 +29,13 @@
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.jboss.test.messaging.tools.ServerManagement;
+
/**
*
* A DistributedQueueTest
@@ -70,7 +73,17 @@
{
clusteredQueue(true);
}
+
+ public void testLocalNonPersistent() throws Exception
+ {
+ localQueue(false);
+ }
+ public void testLocalPersistent() throws Exception
+ {
+ localQueue(true);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -528,7 +541,232 @@
}
}
}
+
+
+ /* Check that non clustered queues behave properly when deployed on a cluster */
+ protected void localQueue(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ //Deploy three non clustered queues with same name on different nodes
+
+ try
+ {
+ ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 0, false);
+
+ ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 1, false);
+
+ ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 2, false);
+
+ Queue queue0 = (Queue)ic[0].lookup("/nonClusteredQueue");
+ Queue queue1 = (Queue)ic[1].lookup("/nonClusteredQueue");
+ Queue queue2 = (Queue)ic[2].lookup("/nonClusteredQueue");
+
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = cf.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn0.start();
+ conn1.start();
+ conn2.start();
+
+ // ==============
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(queue0);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ // Try and consume at node 1
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ Message m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ //And at node 2
+
+ MessageConsumer cons2 = sess2.createConsumer(queue2);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ // Now consume at node 0
+
+ MessageConsumer cons0 = sess0.createConsumer(queue0);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ // ==============
+ // Send at node 1
+
+ MessageProducer prod1 = sess1.createProducer(queue1);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ // Try and consume at node 0
+
+ cons0 = sess0.createConsumer(queue0);
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ //And at node 2
+
+ cons2 = sess2.createConsumer(queue2);
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ // Now consume at node 1
+
+ cons1 = sess1.createConsumer(queue1);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ // ==============
+ // Send at node 2
+
+ MessageProducer prod2 = sess2.createProducer(queue2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ // Try and consume at node 0
+
+ cons0 = sess0.createConsumer(queue0);
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ //And at node 1
+
+ cons1 = sess1.createConsumer(queue1);
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ // Now consume at node 2
+
+ cons2 = sess2.createConsumer(queue2);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ ServerManagement.undeployQueue("nonClusteredQueue", 0);
+
+ ServerManagement.undeployQueue("nonClusteredQueue", 1);
+
+ ServerManagement.undeployQueue("nonClusteredQueue", 2);
+ }
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java 2007-07-03 21:53:31 UTC (rev 2832)
@@ -29,8 +29,11 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
+import javax.jms.Topic;
+import org.jboss.test.messaging.tools.ServerManagement;
+
/**
*
* A DistributedTopicTest
@@ -108,7 +111,27 @@
{
clusteredTopicSharedDurableNoLocalSub(true);
}
+
+ public void testNonClusteredTopicNonDurableNonPersistent() throws Exception
+ {
+ nonClusteredTopicNonDurable(false);
+ }
+ public void testNonClusteredTopicNonDurablePersistent() throws Exception
+ {
+ nonClusteredTopicNonDurable(true);
+ }
+
+ public void testNonClusteredTopicDurableNonPersistent() throws Exception
+ {
+ nonClusteredTopicDurable(false);
+ }
+
+ public void testNonClusteredTopicDurablePersistent() throws Exception
+ {
+ nonClusteredTopicDurable(true);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -146,8 +169,6 @@
conn1 = cf.createConnection();
conn2 = cf.createConnection();
- log.info("Created connections");
-
checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -185,8 +206,6 @@
assertNotNull(tm);
- log.info("Got message " + tm.getText());
-
assertEquals("message" + i, tm.getText());
}
@@ -200,8 +219,7 @@
assertNotNull(tm);
- log.info("Got message " + tm.getText());
-
+
assertEquals("message" + i, tm.getText());
}
@@ -215,8 +233,6 @@
assertNotNull(tm);
- log.info("Got message " + tm.getText());
-
assertEquals("message" + i, tm.getText());
}
@@ -230,8 +246,6 @@
assertNotNull(tm);
- log.info("Got message " + tm.getText());
-
assertEquals("message" + i, tm.getText());
}
@@ -245,8 +259,6 @@
assertNotNull(tm);
- log.info("Got message " + tm.getText());
-
assertEquals("message" + i, tm.getText());
}
@@ -292,8 +304,6 @@
conn1 = cf.createConnection();
conn2 = cf.createConnection();
- log.info("Created connections");
-
checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -442,8 +452,6 @@
conn1 = cf.createConnection();
conn2 = cf.createConnection();
- log.info("Created connections");
-
checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
conn0.setClientID("wib1");
@@ -480,33 +488,20 @@
}
catch (Exception ignore) {}
- log.info("creating subs");
-
MessageConsumer alpha = sess0.createDurableSubscriber(topic[0], "alpha");
- log.info("created 0");
-
MessageConsumer beta = sess1.createDurableSubscriber(topic[1], "beta");
- log.info("created 1");
-
MessageConsumer gamma = sess2.createDurableSubscriber(topic[2], "gamma");
- log.info("created 2");
MessageConsumer delta = sess0.createDurableSubscriber(topic[0], "delta");
- log.info("created 3");
-
MessageConsumer epsilon = sess1.createDurableSubscriber(topic[1], "epsilon");
- log.info("created 4");
-
conn0.start();
conn1.start();
conn2.start();
- log.info("started");
-
// Send at node 0 - and make sure the messages are consumable from all the durable subs
MessageProducer prod = sess0.createProducer(topic[0]);
@@ -515,25 +510,18 @@
final int NUM_MESSAGES = 100;
- log.info("sending messages");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
prod.send(sess0.createTextMessage("message" + i));
}
- log.info("Sent messages");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = (TextMessage)alpha.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message" + i, tm.getText());
}
- log.info("got 1");
-
Message msg = alpha.receive(1000);
assertNull(msg);
@@ -541,12 +529,9 @@
{
TextMessage tm = (TextMessage)beta.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message" + i, tm.getText());
}
- log.info("got 2");
-
msg = beta.receive(1000);
assertNull(msg);
@@ -554,12 +539,9 @@
{
TextMessage tm = (TextMessage)gamma.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message" + i, tm.getText());
}
- log.info("got 3");
-
msg = gamma.receive(1000);
assertNull(msg);
@@ -567,12 +549,9 @@
{
TextMessage tm = (TextMessage)delta.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message" + i, tm.getText());
}
- log.info("got 4");
-
msg = delta.receive(1000);
assertNull(msg);
@@ -580,23 +559,15 @@
{
TextMessage tm = (TextMessage)epsilon.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message" + i, tm.getText());
}
-
-
- log.info("got 5");
-
+
msg = epsilon.receive(1000);
assertNull(msg);
-
-
+
//close beta
beta.close();
-
-
- log.info("Sent messages");
-
+
// Create another beta - this one node 0
MessageConsumer beta0 = sess0.createDurableSubscriber(topic[0], "beta");
@@ -609,8 +580,6 @@
prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
- log.info("sending more messages");
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
prod2.send(sess1.createTextMessage("message2-" + i));
@@ -624,8 +593,7 @@
{
TextMessage tm = (TextMessage)beta0.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
-
+
if (tm.getText().substring("message2-".length()).equals("1"))
{
offset = 1;
@@ -650,7 +618,6 @@
{
TextMessage tm = (TextMessage)beta1.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message2-" + (i * 2 + offset), tm.getText());
}
@@ -670,7 +637,6 @@
{
TextMessage tm = (TextMessage)beta0.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message3-" + i, tm.getText());
}
@@ -688,16 +654,12 @@
delta.close();
epsilon.close();
- log.info("got 6");
-
sess0.unsubscribe("alpha");
sess1.unsubscribe("beta");
sess2.unsubscribe("gamma");
sess0.unsubscribe("delta");
sess1.unsubscribe("epsilon");
- log.info("got 7");
-
}
finally
{
@@ -737,8 +699,6 @@
conn2 = cf.createConnection();
conn3 = cf.createConnection();
- log.info("Created connections");
-
checkConnectionsDifferentServers(new Connection[] {conn1, conn2, conn3});
conn1.setClientID("wib1");
conn2.setClientID("wib1");
@@ -764,13 +724,11 @@
}
catch (Exception ignore) {}
- log.info("** creating 1");
MessageConsumer cons1 = sess1.createDurableSubscriber(topic[0], "sub");
- log.info("** creating 2");
+
MessageConsumer cons2 = sess2.createDurableSubscriber(topic[1], "sub");
- log.info("** creating 3");
+
MessageConsumer cons3 = sess3.createDurableSubscriber(topic[2], "sub");
- log.info("** creating 4");
conn1.start();
conn2.start();
@@ -953,8 +911,6 @@
conn2 = cf.createConnection();
conn3 = cf.createConnection();
- log.info("Created connections");
-
checkConnectionsDifferentServers(new Connection[] {conn1, conn2, conn3});
conn2.setClientID("wib1");
@@ -1000,8 +956,7 @@
{
TextMessage tm = (TextMessage)cons1.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
-
+
if (tm.getText().substring("message2-".length()).equals("1"))
{
offset = 1;
@@ -1026,7 +981,6 @@
{
TextMessage tm = (TextMessage)cons2.receive(1000);
assertNotNull(tm);
- log.info("**** got message" + tm.getText());
assertEquals("message2-" + (i * 2 + offset), tm.getText());
}
@@ -1067,7 +1021,438 @@
}
}
}
+
+
+ private void nonClusteredTopicNonDurable(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ // Deploy three non clustered topics with same name on different nodes
+
+ try
+ {
+ ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 0, false);
+
+ ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 1, false);
+
+ ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 2, false);
+
+ Topic topic0 = (Topic)ic[0].lookup("/nonClusteredTopic");
+ Topic topic1 = (Topic)ic[1].lookup("/nonClusteredTopic");
+ Topic topic2 = (Topic)ic[2].lookup("/nonClusteredTopic");
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = cf.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn0.start();
+ conn1.start();
+ conn2.start();
+
+ MessageConsumer cons0 = sess0.createConsumer(topic0);
+ MessageConsumer cons1 = sess1.createConsumer(topic1);
+ MessageConsumer cons2 = sess2.createConsumer(topic2);
+
+ // ==============
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(topic0);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ // Try and consume at node 1
+
+ Message m = cons1.receive(2000);
+
+ assertNull(m);
+
+ //And at node 2
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ // Now consume at node 0
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ // ==============
+ // Send at node 1
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ // Try and consume at node 0
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ //And at node 2
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ // Now consume at node 1
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ // ==============
+ // Send at node 2
+
+ MessageProducer prod2 = sess2.createProducer(topic2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ // Try and consume at node 0
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ //And at node 1
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ // Now consume at node 2
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ ServerManagement.undeployQueue("nonClusteredTopic", 0);
+
+ ServerManagement.undeployQueue("nonClusteredTopic", 1);
+
+ ServerManagement.undeployQueue("nonClusteredTopic", 2);
+ }
+ }
+
+
+ private void nonClusteredTopicDurable(boolean persistent) throws Exception
+ {
+ Connection conn0 = null;
+ Connection conn1 = null;
+ Connection conn2 = null;
+
+ // Deploy three non clustered topics with same name on different nodes
+
+ try
+ {
+ ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 0, false);
+
+ ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 1, false);
+
+ ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 2, false);
+
+ Topic topic0 = (Topic)ic[0].lookup("/nonClusteredTopic");
+ Topic topic1 = (Topic)ic[1].lookup("/nonClusteredTopic");
+ Topic topic2 = (Topic)ic[2].lookup("/nonClusteredTopic");
+
+ //This will create 3 different connection on 3 different nodes, since
+ //the cf is clustered
+ conn0 = cf.createConnection();
+ conn1 = cf.createConnection();
+ conn2 = cf.createConnection();
+
+ conn0.setClientID("cl123");
+ conn1.setClientID("cl123");
+ conn2.setClientID("cl123");
+
+ checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ conn0.start();
+ conn1.start();
+ conn2.start();
+
+ MessageConsumer cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+ MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+ MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+ cons0.close();
+ cons1.close();
+ cons2.close();
+
+ // ==============
+ // Send at node 0
+
+ MessageProducer prod0 = sess0.createProducer(topic0);
+
+ prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ final int NUM_MESSAGES = 100;
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod0.send(tm);
+ }
+
+ // Try and consume at node 1
+
+ cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+
+ Message m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ //And at node 2
+
+ cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ // Now consume at node 0
+
+ cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons0.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ // ==============
+ // Send at node 1
+
+ MessageProducer prod1 = sess1.createProducer(topic1);
+
+ prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess1.createTextMessage("message" + i);
+
+ prod1.send(tm);
+ }
+
+ cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+
+ // Try and consume at node 0
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ //And at node 2
+
+ cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ // Now consume at node 1
+
+ cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ // ==============
+ // Send at node 2
+
+ MessageProducer prod2 = sess2.createProducer(topic2);
+
+ prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess2.createTextMessage("message" + i);
+
+ prod2.send(tm);
+ }
+
+ cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+
+ // Try and consume at node 0
+
+ m = cons0.receive(2000);
+
+ assertNull(m);
+
+ cons0.close();
+
+ sess0.unsubscribe("mysub1");
+
+ //And at node 1
+
+ cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+
+ m = cons1.receive(2000);
+
+ assertNull(m);
+
+ cons1.close();
+
+ sess1.unsubscribe("mysub1");
+
+ // Now consume at node 2
+
+ cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons2.receive(1000);
+
+ assertNotNull(tm);
+
+ assertEquals("message" + i, tm.getText());
+ }
+
+ m = cons2.receive(2000);
+
+ assertNull(m);
+
+ cons2.close();
+
+ sess2.unsubscribe("mysub1");
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+
+ ServerManagement.undeployQueue("nonClusteredTopic", 0);
+
+ ServerManagement.undeployQueue("nonClusteredTopic", 1);
+
+ ServerManagement.undeployQueue("nonClusteredTopic", 2);
+ }
+ }
+
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-07-03 21:53:31 UTC (rev 2832)
@@ -868,6 +868,13 @@
insureStarted();
servers[0].getServer().deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, false);
}
+
+ public static void deployTopic(String name, String jndiName, int fullSize, int pageSize,
+ int downCacheSize, int serverIndex, boolean clustered) throws Exception
+ {
+ insureStarted();
+ servers[serverIndex].getServer().deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+ }
/**
* Simulates a topic un-deployment (deleting the topic descriptor from the deploy directory).
More information about the jboss-cvs-commits
mailing list