[jboss-cvs] JBoss Messaging SVN: r3055 - in trunk: tests/src/org/jboss/test/messaging and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun Aug 26 01:34:05 EDT 2007


Author: clebert.suconic at jboss.com
Date: 2007-08-26 01:34:05 -0400 (Sun, 26 Aug 2007)
New Revision: 3055

Modified:
   trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
   trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-1059 - The remove messages has to be done after the Suckers are closed, or we would get a failure

Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-26 05:31:52 UTC (rev 3054)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2007-08-26 05:34:05 UTC (rev 3055)
@@ -1735,7 +1735,7 @@
 				{	
 					try
 					{
-						if (trace) { log.trace(this + " waiting for bind unbind lock"); }
+						if (trace) { log.trace(this + " waiting for bind unbind lock, timeout=" + groupMember.getCastTimeout()); }
 						
 						waitForBindUnbindLock.wait(groupMember.getCastTimeout());
 
@@ -1827,39 +1827,40 @@
       }
 
       Binding removed = removeBindingInMemory(thisNodeID, queueName);
-      
+
       //The queue might not be removed (it's already removed) if two unbind all requests are sent simultaneously on the cluster
       if (removed != null)
-      {	      
+      {
 	      Queue queue = removed.queue;
-	      
+
 	      Condition condition = removed.condition;
-	      	     	
+
 	      if (queue.isRecoverable())
 	      {
 	         //Need to remove from db too
-	
+
 	         deleteBindingFromStorage(queue);
 	      }
-	
-	      queue.removeAllReferences();
-	      
+
 	      if (clustered && queue.isClustered())
 	      {
-	      	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();      	
-	      	
+	      	String filterString = queue.getFilter() == null ? null : queue.getFilter().getFilterString();
+
 	      	MappingInfo info = new MappingInfo(thisNodeID, queue.getName(), condition.toText(), filterString, queue.getChannelID(),
 	      			                             queue.isRecoverable(), true, allNodes);
-	      	
+
 		      UnbindRequest request = new UnbindRequest(info, allNodes);
-		
+
 		      groupMember.multicastControl(request, sync);
 	      }
+
+         queue.removeAllReferences();
+
       }
-      
+
       return removed;
    }
-   
+
    private synchronized void calculateFailoverMap()
    {
    	failoverMap.clear();
@@ -2192,7 +2193,32 @@
 
       return routed;
    }   
-         
+
+   private Binding lookupBinding(int nodeID, String queueName) throws Exception
+   {
+      lock.readLock().acquire();
+
+      try
+      {
+         Integer nid = new Integer(nodeID);
+
+         Map nameMap = (Map)nameMaps.get(nid);
+
+         if (nameMap == null)
+         {
+            return null;
+         }
+
+         return (Binding)nameMap.get(queueName);
+
+      }
+      finally
+      {
+         lock.readLock().acquire();
+      }
+
+   }
+
    private Binding removeBindingInMemory(int nodeID, String queueName) throws Exception
    {
    	lock.writeLock().acquire();

Modified: trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-08-26 05:31:52 UTC (rev 3054)
+++ trunk/tests/src/org/jboss/test/messaging/MessagingTestCase.java	2007-08-26 05:34:05 UTC (rev 3055)
@@ -158,25 +158,37 @@
       
       assertEquals(0, messageCount.intValue());    
    }
-         
+
    protected void checkNoSubscriptions(Topic topic) throws Exception
    {
-   	ObjectName destObjectName =  new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
-   	
-      Integer messageCount = (Integer)ServerManagement.getAttribute(destObjectName, "AllSubscriptionsCount"); 
-      
-      assertEquals(0, messageCount.intValue());      
+      Integer messageCount = getNoSubscriptions(topic);
+
+      assertEquals(0, messageCount.intValue());
    }
-   
+
+
    protected void checkNoSubscriptions(Topic topic, int server) throws Exception
    {
-   	ObjectName destObjectName =  new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
-   	
-      Integer messageCount = (Integer)ServerManagement.getServer(server).getAttribute(destObjectName, "AllSubscriptionsCount"); 
+      Integer messageCount = getNoSubscriptions(topic, server);
       
       assertEquals(0, messageCount.intValue());      
    }
-      
+
+   protected int getNoSubscriptions(Topic topic)
+      throws Exception
+   {
+      return getNoSubscriptions(topic,0);
+   }
+
+   protected int getNoSubscriptions(Topic topic, int server)
+      throws Exception
+   {
+      ObjectName destObjectName =  new ObjectName("jboss.messaging.destination:service=Topic,name=" + topic.getTopicName());
+
+      Integer messageCount = (Integer) ServerManagement.getAttribute(server, destObjectName, "AllSubscriptionsCount");
+      return messageCount.intValue();
+   }
+
    protected boolean assertRemainingMessages(int expected) throws Exception
    {
       ObjectName destObjectName = 

Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2007-08-26 05:31:52 UTC (rev 3054)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/DistributedTopicTest.java	2007-08-26 05:34:05 UTC (rev 3055)
@@ -32,6 +32,7 @@
 import javax.jms.Topic;
 
 import org.jboss.test.messaging.tools.ServerManagement;
+import java.util.ArrayList;
 
 
 /**
@@ -131,6 +132,110 @@
    {
    	nonClusteredTopicDurable(true);
    }
+
+   public void testFloodSubscriptions() throws Exception
+   {
+      Connection conn0 = this.createConnectionOnServer(cf, 0);
+      Connection conn1 = this.createConnectionOnServer(cf, 1);
+      Connection conn2 = this.createConnectionOnServer(cf, 2);
+
+      try
+      {
+         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});
+
+         conn0.setClientID("c1");
+         conn1.setClientID("c1");
+         conn2.setClientID("c1");
+
+         Session session [] = new Session[3];
+         session[0] = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         session[1] = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         session[2] = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         conn0.start();
+         conn1.start();
+         conn2.start();
+
+         ArrayList<MessageConsumer> consumersArray[] = new ArrayList[3];
+
+
+         MessageProducer prod = session[2].createProducer(topic[0]);
+         prod.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         int NUMBER_OF_MESSAGES = 10;
+         int NUMBER_OF_SUBSCRIPTIONS = 50;
+
+         int sessId=0;
+         for (Session sess : session)
+         {
+            consumersArray[sessId] = new ArrayList<MessageConsumer>();
+            for (int i=0;i<NUMBER_OF_SUBSCRIPTIONS;i++)
+            {
+               MessageConsumer consumer = sess.createDurableSubscriber(topic[sessId], "sess_" + sessId + "_" + i);
+               consumersArray[sessId].add(consumer);
+            }
+
+            sessId++;
+         }
+
+
+         for (int i=0;i<NUMBER_OF_MESSAGES;i++)
+         {
+            log.info("Sending message " + i);
+            prod.send(session[0].createTextMessage("test" + i));
+         }
+
+
+         assertEquals(NUMBER_OF_SUBSCRIPTIONS * 3, getNoSubscriptions(topic[0]));
+
+         int messageRead = 0;
+
+         for (ArrayList<MessageConsumer> consumers: consumersArray)
+         {
+            for (MessageConsumer consumer: consumers)
+            {
+               TextMessage msg = null;
+               for (int i=0;i<NUMBER_OF_MESSAGES;i++)
+               {
+                  msg = (TextMessage)consumer.receive(5000);
+                  assertNotNull(msg);
+                  log.info("Msg:" + msg + " text - " + msg.getText());
+                  assertEquals("test" + i, msg.getText());
+                  messageRead ++;
+               }
+            }
+         }
+
+         assertEquals(NUMBER_OF_SUBSCRIPTIONS * NUMBER_OF_MESSAGES * 3, messageRead);
+
+         MessageProducer prod1 = session[1].createProducer(topic[0]);
+
+         for (ArrayList<MessageConsumer> consumers: consumersArray)
+         {
+            for (MessageConsumer consumer: consumers)
+            {
+               consumer.close();
+            }
+         }
+
+         for (sessId = 0; sessId < 3; sessId++)
+         {
+            for (int i=0;i<NUMBER_OF_SUBSCRIPTIONS;i++)
+            {
+               session[sessId].unsubscribe("sess_" + sessId + "_" + i);
+            }
+         }
+
+         checkNoSubscriptions(topic[0]);
+         
+      }
+      finally
+      {
+         try { if (conn0 != null) conn0.close(); } catch (Exception ignored){}
+         try { if (conn1 != null) conn1.close(); } catch (Exception ignored){}
+         try { if (conn2 != null) conn2.close(); } catch (Exception ignored){}
+      }
+   }
    
    // Package protected ---------------------------------------------
 
@@ -597,7 +702,7 @@
                  
          //close beta
          beta.close();
-  
+
          // Create another beta - this one node 0
          MessageConsumer beta0 = sess0.createDurableSubscriber(topic[0], "beta");
          
@@ -677,19 +782,19 @@
          
          beta0.close();
          beta1.close();	
-         
+
          alpha.close();
          beta.close();
          gamma.close();
          delta.close();
          epsilon.close();
-         
+
          sess0.unsubscribe("alpha");
          sess1.unsubscribe("beta");
          sess2.unsubscribe("gamma");
          sess0.unsubscribe("delta");
          sess1.unsubscribe("epsilon");
-         
+
       }
       finally
       {




More information about the jboss-cvs-commits mailing list