[jboss-cvs] JBoss Messaging SVN: r2832 - in trunk: src/main/org/jboss/messaging/core/impl/postoffice and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 3 17:53:32 EDT 2007


Author: timfox
Date: 2007-07-03 17:53:31 -0400 (Tue, 03 Jul 2007)
New Revision: 2832

Modified:
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
   trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
Add some more local queues in cluster tests and minor fix


Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2007-07-03 21:53:31 UTC (rev 2832)
@@ -390,7 +390,7 @@
                {     
                	// Temporary queues must be unbound on ALL nodes of the cluster
                	
-               	postOffice.removeBinding(dest.getName(), true);               	
+               	postOffice.removeBinding(dest.getName(), postOffice.isClustered());               	
                }
                else
                {

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2007-07-03 21:53:31 UTC (rev 2832)
@@ -734,7 +734,7 @@
          	}
          }
          
-         postOffice.removeBinding(sub.getName(), sub.isClustered());         
+         postOffice.removeBinding(sub.getName(), sub.isClustered() && postOffice.isClustered());         
          
          String counterName = TopicService.SUBSCRIPTION_MESSAGECOUNTER_PREFIX + sub.getName();
          
@@ -1366,7 +1366,8 @@
                
                // Durable subs must be bound on ALL nodes of the cluster (if clustered)
                
-               postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), postOffice.isClustered());
+               postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true),
+                                     postOffice.isClustered() && mDest.isClustered());
                
                queue.activate();
                   
@@ -1440,7 +1441,7 @@
                   
                   // Durable subs must be unbound on ALL nodes of the cluster
                   
-                  postOffice.removeBinding(queue.getName(), mDest.isClustered());                  
+                  postOffice.removeBinding(queue.getName(), postOffice.isClustered() && mDest.isClustered());                  
                   
                   // create a fresh new subscription
                                     
@@ -1453,7 +1454,8 @@
                   
                   // Durable subs must be bound on ALL nodes of the cluster
                   
-                  postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true), postOffice.isClustered());
+                  postOffice.addBinding(new Binding(new JMSCondition(false, jmsDestination.getName()), queue, true),
+                  		                postOffice.isClustered() && mDest.isClustered());
   
                   queue.activate();                  
                   

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-07-03 21:53:31 UTC (rev 2832)
@@ -1551,7 +1551,7 @@
          			
          			if (trace) { log.trace(this + " is a remote queue"); }
          			
-         			if (!queue.isRecoverable())
+         			if (!queue.isRecoverable() && queue.isClustered())
          			{	         			
          				//When we send to the cluster we never send to reliable queues
          				

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedQueueTest.java	2007-07-03 21:53:31 UTC (rev 2832)
@@ -29,10 +29,13 @@
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import org.jboss.test.messaging.tools.ServerManagement;
 
+
 /**
  * 
  * A DistributedQueueTest
@@ -70,7 +73,17 @@
    {
       clusteredQueue(true);
    }
+   
+   public void testLocalNonPersistent() throws Exception
+   {
+      localQueue(false);
+   }
 
+   public void testLocalPersistent() throws Exception
+   {
+      localQueue(true);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -528,7 +541,232 @@
          }
       }
    }
+   
+   
+   /* Check that non clustered queues behave properly when deployed on a cluster */
+   protected void localQueue(boolean persistent) throws Exception
+   {
+   	Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+      
+      //Deploy three non clustered queues with same name on different nodes
+          
+      try
+      {
+         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 0, false);
+         
+         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 1, false);
+         
+         ServerManagement.deployQueue("nonClusteredQueue", "nonClusteredQueue", 200000, 2000, 2000, 2, false);
+         
+         Queue queue0 = (Queue)ic[0].lookup("/nonClusteredQueue");
+         Queue queue1 = (Queue)ic[1].lookup("/nonClusteredQueue");
+         Queue queue2 = (Queue)ic[2].lookup("/nonClusteredQueue");
+      	
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn0 = cf.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
 
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         conn0.start();
+         conn1.start();
+         conn2.start();
+
+         // ==============
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(queue0);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message" + i);
+
+            prod0.send(tm);
+         }
+         
+         // Try and consume at node 1
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         
+         Message m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         //And at node 2
+         
+         MessageConsumer cons2 = sess2.createConsumer(queue2);
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+         
+         // Now consume at node 0
+         
+         MessageConsumer cons0 = sess0.createConsumer(queue0);
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons0.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         // ==============
+         // Send at node 1
+
+         MessageProducer prod1 = sess1.createProducer(queue1);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod1.send(tm);
+         }
+         
+         // Try and consume at node 0
+         
+         cons0 = sess0.createConsumer(queue0);
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         //And at node 2
+         
+         cons2 = sess2.createConsumer(queue2);
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+         
+         // Now consume at node 1
+         
+         cons1 = sess1.createConsumer(queue1);
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         // ==============
+         // Send at node 2
+
+         MessageProducer prod2 = sess2.createProducer(queue2);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+
+            prod2.send(tm);
+         }
+         
+         // Try and consume at node 0
+         
+         cons0 = sess0.createConsumer(queue0);
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         //And at node 1
+         
+         cons1 = sess1.createConsumer(queue1);
+         
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         // Now consume at node 2
+         
+         cons2 = sess2.createConsumer(queue2);
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+           
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+         
+         ServerManagement.undeployQueue("nonClusteredQueue", 0);
+         
+         ServerManagement.undeployQueue("nonClusteredQueue", 1);
+         
+         ServerManagement.undeployQueue("nonClusteredQueue", 2);
+      }
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2007-07-03 21:53:31 UTC (rev 2832)
@@ -29,8 +29,11 @@
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
+import org.jboss.test.messaging.tools.ServerManagement;
 
+
 /**
  * 
  * A DistributedTopicTest
@@ -108,7 +111,27 @@
    {
       clusteredTopicSharedDurableNoLocalSub(true);
    }
+   
+   public void testNonClusteredTopicNonDurableNonPersistent() throws Exception
+   {
+   	nonClusteredTopicNonDurable(false);
+   }
 
+   public void testNonClusteredTopicNonDurablePersistent() throws Exception
+   {
+   	nonClusteredTopicNonDurable(true);
+   }
+   
+   public void testNonClusteredTopicDurableNonPersistent() throws Exception
+   {
+   	nonClusteredTopicDurable(false);
+   }
+
+   public void testNonClusteredTopicDurablePersistent() throws Exception
+   {
+   	nonClusteredTopicDurable(true);
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -146,8 +169,6 @@
          conn1 = cf.createConnection();
          conn2 = cf.createConnection();
          
-         log.info("Created connections");
-         
          checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
 
          Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -185,8 +206,6 @@
 
             assertNotNull(tm);
             
-            log.info("Got message " + tm.getText());
-
             assertEquals("message" + i, tm.getText());
          }
          
@@ -200,8 +219,7 @@
 
             assertNotNull(tm);
             
-            log.info("Got message " + tm.getText());
-
+ 
             assertEquals("message" + i, tm.getText());
          }
          
@@ -215,8 +233,6 @@
 
             assertNotNull(tm);
             
-            log.info("Got message " + tm.getText());
-
             assertEquals("message" + i, tm.getText());
          }
          
@@ -230,8 +246,6 @@
 
             assertNotNull(tm);
             
-            log.info("Got message " + tm.getText());
-
             assertEquals("message" + i, tm.getText());
          }
          
@@ -245,8 +259,6 @@
 
             assertNotNull(tm);
             
-            log.info("Got message " + tm.getText());
-
             assertEquals("message" + i, tm.getText());
          }
          
@@ -292,8 +304,6 @@
          conn1 = cf.createConnection();
          conn2 = cf.createConnection();
          
-         log.info("Created connections");
-         
          checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
 
          Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -442,8 +452,6 @@
          conn1 = cf.createConnection();
          conn2 = cf.createConnection();
          
-         log.info("Created connections");
-         
          checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
 
          conn0.setClientID("wib1");
@@ -480,33 +488,20 @@
          }
          catch (Exception ignore) {}
          
-         log.info("creating subs");
-
          MessageConsumer alpha = sess0.createDurableSubscriber(topic[0], "alpha");
          
-         log.info("created 0");
-         
          MessageConsumer beta = sess1.createDurableSubscriber(topic[1], "beta");
          
-         log.info("created 1");
-         
          MessageConsumer gamma = sess2.createDurableSubscriber(topic[2], "gamma");
          
-         log.info("created 2");
          MessageConsumer delta = sess0.createDurableSubscriber(topic[0], "delta");
          
-         log.info("created 3");
-         
          MessageConsumer epsilon = sess1.createDurableSubscriber(topic[1], "epsilon");
          
-         log.info("created 4");
-         
          conn0.start();
          conn1.start();
          conn2.start();
          
-         log.info("started");
-
          // Send at node 0 - and make sure the messages are consumable from all the durable subs
 
          MessageProducer prod = sess0.createProducer(topic[0]);
@@ -515,25 +510,18 @@
 
          final int NUM_MESSAGES = 100;
 
-         log.info("sending messages");
-
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             prod.send(sess0.createTextMessage("message" + i));
          }
          
-         log.info("Sent messages");
-
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             TextMessage tm = (TextMessage)alpha.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message" + i, tm.getText());
          }
          
-         log.info("got 1");
-         
          Message msg = alpha.receive(1000);
          assertNull(msg);         
 
@@ -541,12 +529,9 @@
          {
             TextMessage tm = (TextMessage)beta.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message" + i, tm.getText());
          }
          
-         log.info("got 2");
-         
          msg = beta.receive(1000);
          assertNull(msg);
 
@@ -554,12 +539,9 @@
          {
             TextMessage tm = (TextMessage)gamma.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message" + i, tm.getText());
          }
          
-         log.info("got 3");
-         
          msg = gamma.receive(1000);
          assertNull(msg);
 
@@ -567,12 +549,9 @@
          {
             TextMessage tm = (TextMessage)delta.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message" + i, tm.getText());
          }
 
-         log.info("got 4");
-         
          msg = delta.receive(1000);
          assertNull(msg);
 
@@ -580,23 +559,15 @@
          {
             TextMessage tm = (TextMessage)epsilon.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message" + i, tm.getText());
          }
-         
-         
-         log.info("got 5");
-         
+ 
          msg = epsilon.receive(1000);
          assertNull(msg);
-         
-         
+                 
          //close beta
          beta.close();
-         
-         
-         log.info("Sent messages");
-         
+  
          // Create another beta - this one node 0
          MessageConsumer beta0 = sess0.createDurableSubscriber(topic[0], "beta");
          
@@ -609,8 +580,6 @@
 
          prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
 
-         log.info("sending more messages");
-
          for (int i = 0; i < NUM_MESSAGES; i++)
          {
             prod2.send(sess1.createTextMessage("message2-" + i));
@@ -624,8 +593,7 @@
          {
             TextMessage tm = (TextMessage)beta0.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
-            
+
             if (tm.getText().substring("message2-".length()).equals("1"))
             {
             	offset = 1;
@@ -650,7 +618,6 @@
          {
             TextMessage tm = (TextMessage)beta1.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message2-" + (i * 2 + offset), tm.getText());
          }
          
@@ -670,7 +637,6 @@
          {
             TextMessage tm = (TextMessage)beta0.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message3-" + i, tm.getText());
          }
          
@@ -688,16 +654,12 @@
          delta.close();
          epsilon.close();
          
-         log.info("got 6");
-
          sess0.unsubscribe("alpha");
          sess1.unsubscribe("beta");
          sess2.unsubscribe("gamma");
          sess0.unsubscribe("delta");
          sess1.unsubscribe("epsilon");
          
-         log.info("got 7");
-
       }
       finally
       {
@@ -737,8 +699,6 @@
          conn2 = cf.createConnection();
          conn3 = cf.createConnection();
          
-         log.info("Created connections");
-         
          checkConnectionsDifferentServers(new Connection[] {conn1, conn2, conn3});
          conn1.setClientID("wib1");
          conn2.setClientID("wib1");
@@ -764,13 +724,11 @@
          }
          catch (Exception ignore) {}
 
-         log.info("** creating 1");
          MessageConsumer cons1 = sess1.createDurableSubscriber(topic[0], "sub");
-         log.info("** creating 2");
+
          MessageConsumer cons2 = sess2.createDurableSubscriber(topic[1], "sub");
-         log.info("** creating 3");
+
          MessageConsumer cons3 = sess3.createDurableSubscriber(topic[2], "sub");
-         log.info("** creating 4");
 
          conn1.start();
          conn2.start();
@@ -953,8 +911,6 @@
          conn2 = cf.createConnection();
          conn3 = cf.createConnection();
          
-         log.info("Created connections");
-         
          checkConnectionsDifferentServers(new Connection[] {conn1, conn2, conn3});
          
          conn2.setClientID("wib1");
@@ -1000,8 +956,7 @@
          {
             TextMessage tm = (TextMessage)cons1.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
-            
+ 
             if (tm.getText().substring("message2-".length()).equals("1"))
             {
             	offset = 1;
@@ -1026,7 +981,6 @@
          {
             TextMessage tm = (TextMessage)cons2.receive(1000);
             assertNotNull(tm);
-            log.info("**** got message" + tm.getText());
             assertEquals("message2-" + (i * 2 + offset), tm.getText());
          }
          
@@ -1067,7 +1021,438 @@
          }
       }
    }
+   
+   
+   private void nonClusteredTopicNonDurable(boolean persistent) throws Exception
+   {   	   	
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+   	   
+   	// Deploy three non clustered topics with same name on different nodes
+      
+      try
+      {
+         ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 0, false);
+         
+         ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 1, false);
+         
+         ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 2, false);
+         
+         Topic topic0 = (Topic)ic[0].lookup("/nonClusteredTopic");
+         Topic topic1 = (Topic)ic[1].lookup("/nonClusteredTopic");
+         Topic topic2 = (Topic)ic[2].lookup("/nonClusteredTopic");
 
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn0 = cf.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn0.start();
+         conn1.start();
+         conn2.start();
+         
+         MessageConsumer cons0 = sess0.createConsumer(topic0);
+         MessageConsumer cons1 = sess1.createConsumer(topic1);
+         MessageConsumer cons2 = sess2.createConsumer(topic2);
+         
+         // ==============
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(topic0);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message" + i);
+
+            prod0.send(tm);
+         }
+         
+         // Try and consume at node 1
+            
+         Message m = cons1.receive(2000);
+
+         assertNull(m);
+            
+         //And at node 2
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+           
+         // Now consume at node 0
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons0.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         // ==============
+         // Send at node 1
+
+         MessageProducer prod1 = sess1.createProducer(topic1);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod1.send(tm);
+         }
+         
+         // Try and consume at node 0
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         //And at node 2
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         // Now consume at node 1
+            
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         // ==============
+         // Send at node 2
+
+         MessageProducer prod2 = sess2.createProducer(topic2);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+
+            prod2.send(tm);
+         }
+         
+         // Try and consume at node 0
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+              
+         //And at node 1
+           
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         // Now consume at node 2
+               
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+         
+         ServerManagement.undeployQueue("nonClusteredTopic", 0);
+         
+         ServerManagement.undeployQueue("nonClusteredTopic", 1);
+         
+         ServerManagement.undeployQueue("nonClusteredTopic", 2);
+      }
+   }
+   
+   
+   private void nonClusteredTopicDurable(boolean persistent) throws Exception
+   {   	   	
+      Connection conn0 = null;
+      Connection conn1 = null;
+      Connection conn2 = null;
+   	   
+   	// Deploy three non clustered topics with same name on different nodes
+      
+      try
+      {
+         ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 0, false);
+         
+         ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 1, false);
+         
+         ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 200000, 2000, 2000, 2, false);
+         
+         Topic topic0 = (Topic)ic[0].lookup("/nonClusteredTopic");
+         Topic topic1 = (Topic)ic[1].lookup("/nonClusteredTopic");
+         Topic topic2 = (Topic)ic[2].lookup("/nonClusteredTopic");
+
+         //This will create 3 different connection on 3 different nodes, since
+         //the cf is clustered
+         conn0 = cf.createConnection();
+         conn1 = cf.createConnection();
+         conn2 = cf.createConnection();
+         
+         conn0.setClientID("cl123");
+         conn1.setClientID("cl123");
+         conn2.setClientID("cl123");
+         
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn0.start();
+         conn1.start();
+         conn2.start();
+         
+         MessageConsumer cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+         cons0.close();
+         cons1.close();
+         cons2.close();
+         
+         // ==============
+         // Send at node 0
+
+         MessageProducer prod0 = sess0.createProducer(topic0);
+
+         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         final int NUM_MESSAGES = 100;
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message" + i);
+
+            prod0.send(tm);
+         }
+         
+         // Try and consume at node 1
+            
+         cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+         
+         Message m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+            
+         //And at node 2
+         
+         cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+           
+         // Now consume at node 0
+         
+         cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+          
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons0.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         // ==============
+         // Send at node 1
+
+         MessageProducer prod1 = sess1.createProducer(topic1);
+
+         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess1.createTextMessage("message" + i);
+
+            prod1.send(tm);
+         }
+         
+         cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+         
+         // Try and consume at node 0
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         //And at node 2
+         
+         cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+         
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+         
+         // Now consume at node 1
+            
+         cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         // ==============
+         // Send at node 2
+
+         MessageProducer prod2 = sess2.createProducer(topic2);
+
+         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess2.createTextMessage("message" + i);
+
+            prod2.send(tm);
+         }
+         
+         cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
+                  
+         // Try and consume at node 0
+         
+         m = cons0.receive(2000);
+
+         assertNull(m);
+         
+         cons0.close();
+         
+         sess0.unsubscribe("mysub1");
+                       
+         //And at node 1
+           
+         cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
+                  
+         m = cons1.receive(2000);
+
+         assertNull(m);
+         
+         cons1.close();
+         
+         sess1.unsubscribe("mysub1");         
+         
+         // Now consume at node 2
+         
+         cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
+                        
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons2.receive(1000);
+
+            assertNotNull(tm);
+            
+            assertEquals("message" + i, tm.getText());
+         }                 
+
+         m = cons2.receive(2000);
+
+         assertNull(m);
+         
+         cons2.close();
+         
+         sess2.unsubscribe("mysub1");
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+
+         if (conn2 != null)
+         {
+            conn2.close();
+         }
+         
+         ServerManagement.undeployQueue("nonClusteredTopic", 0);
+         
+         ServerManagement.undeployQueue("nonClusteredTopic", 1);
+         
+         ServerManagement.undeployQueue("nonClusteredTopic", 2);
+      }
+   }
+
   
    // Inner classes -------------------------------------------------
    

Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-07-03 19:11:24 UTC (rev 2831)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java	2007-07-03 21:53:31 UTC (rev 2832)
@@ -868,6 +868,13 @@
       insureStarted();
       servers[0].getServer().deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, false);
    }
+   
+   public static void deployTopic(String name, String jndiName, int fullSize, int pageSize,
+   		                         int downCacheSize, int serverIndex, boolean clustered) throws Exception
+   {
+   	insureStarted();
+   	servers[serverIndex].getServer().deployTopic(name, jndiName, fullSize, pageSize, downCacheSize, clustered);
+    }
 
    /**
     * Simulates a topic un-deployment (deleting the topic descriptor from the deploy directory).




More information about the jboss-cvs-commits mailing list