[jboss-cvs] JBoss Messaging SVN: r3296 - in branches/Branch_Stable: src/main/org/jboss/jms/server/destination and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 7 11:34:07 EST 2007


Author: timfox
Date: 2007-11-07 11:34:07 -0500 (Wed, 07 Nov 2007)
New Revision: 3296

Modified:
   branches/Branch_Stable/.classpath
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
   branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
   branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1136


Modified: branches/Branch_Stable/.classpath
===================================================================
--- branches/Branch_Stable/.classpath	2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/.classpath	2007-11-07 16:34:07 UTC (rev 3296)
@@ -45,7 +45,6 @@
 	<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
 	<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
-	<classpathentry kind="src" path=".apt_generated"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-j2ee.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
 	<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-system.jar"/>

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java	2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java	2007-11-07 16:34:07 UTC (rev 3296)
@@ -138,6 +138,22 @@
          
          started = true;         
          
+         //Now we need to trigger a delivery - this is because message suckers might have
+         //been create *before* the queue was deployed - this is because message suckers can be
+         //created when the clusterpullconnectionfactory deploy is detected which then causes
+         //the clusterconnectionmanager to inspect the bindings for queues to create suckers
+         //to - but these bindings will exist before the queue or topic is deployed and before
+         //it has had its messages loaded
+         //Therefore we need to trigger a delivery now so remote suckers get messages
+         //See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+         //For JBM we should remove the distinction between activation and deployment to
+         //remove these annoyances and edge cases.
+         //The post office should load(=deploy) all bindings on startup including loading their
+         //state before adding the binding - there should be no separate deployment stage
+         //If the queue can be undeployed there should be a separate flag for this on the
+         //binding
+         queue.deliver();
+         
          log.info(this + " started, fullSize=" + destination.getFullSize() +
                   ", pageSize=" + destination.getPageSize() + ", downCacheSize=" + destination.getDownCacheSize());
       }

Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java	2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java	2007-11-07 16:34:07 UTC (rev 3296)
@@ -102,7 +102,23 @@
                new MessageCounter(counterName, subName, queue, true, true,
                                   dayLimitToUse);
             
-            serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);            
+            serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);    
+            
+            //Now we need to trigger a delivery - this is because message suckers might have
+            //been create *before* the queue was deployed - this is because message suckers can be
+            //created when the clusterpullconnectionfactory deploy is detected which then causes
+            //the clusterconnectionmanager to inspect the bindings for queues to create suckers
+            //to - but these bindings will exist before the queue or topic is deployed and before
+            //it has had its messages loaded
+            //Therefore we need to trigger a delivery now so remote suckers get messages
+            //See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+            //For JBM we should remove the distinction between activation and deployment to
+            //remove these annoyances and edge cases.
+            //The post office should load(=deploy) all bindings on startup including loading their
+            //state before adding the binding - there should be no separate deployment stage
+            //If the queue can be undeployed there should be a separate flag for this on the
+            //binding
+            queue.deliver();
          }
 
          serverPeer.getDestinationManager().registerDestination(destination);

Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java	2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java	2007-11-07 16:34:07 UTC (rev 3296)
@@ -278,6 +278,119 @@
    	deployCFLocal();
    	suck();
    }
+   
+   // http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+   public void testCreateConsumerBeforeRemoteDeployment() throws Exception
+   {      
+      final int NUM_MESSAGES = 20;      
+      
+      deployCFLocal();      
+      deployLocal();
+      
+      //Send some messages
+      
+      Queue queue0 = (Queue)ic[0].lookup("/queue/suckQueue");
+      
+      Connection conn0 = null;
+      
+      try
+      {
+         conn0 = this.createConnectionOnServer(cf, 0);
+         
+         assertEquals(0, getServerId(conn0));
+         
+         //Send some messages on node 0
+         
+         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageProducer prod = sess0.createProducer(queue0);
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = sess0.createTextMessage("message" + i);
+            
+            prod.send(tm);
+         }
+      }
+      finally
+      {
+         if (conn0 != null)
+         {
+            conn0.close();
+         }
+      }
+      
+      log.info("Sent messages");
+      
+      //Undeploy
+      this.undeployAll();
+      
+      log.info("Undeployed");
+      
+      deployCFRemote();
+      deployRemote();
+      
+      Queue queue1 = (Queue)ic[1].lookup("/queue/suckQueue");
+            
+      //Create the consumer - but the messages will be stranded on other node
+      //Until we deploy - we do this on another thread
+      
+      Thread t = new Thread(new Runnable() {
+         public void run()
+         {
+            try
+            {
+               Thread.sleep(5000);
+               deployCFLocal();
+               deployLocal();
+            }
+            catch (Exception e)
+            {
+               log.error("Failed to deploy", e);
+            }
+         }
+      });
+      
+      t.start();
+      
+      Connection conn1 = null;
+      
+      try
+      {         
+         //Consume them on node 1
+         
+         conn1 = this.createConnectionOnServer(cf, 1);
+         
+         assertEquals(1, getServerId(conn1));
+         
+         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         
+         MessageConsumer cons1 = sess1.createConsumer(queue1);
+         
+         conn1.start();
+         
+         for (int i = 0; i < NUM_MESSAGES; i++)
+         {
+            TextMessage tm = (TextMessage)cons1.receive(30000);
+            
+            assertNotNull(tm);
+            
+            log.info("Got message " + tm.getText());
+            
+            assertEquals("message" + i, tm.getText());
+         } 
+      }
+      finally
+      {
+         if (conn1 != null)
+         {
+            conn1.close();
+         }
+      }
+      
+      t.join();
+      
+   }
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------
@@ -298,9 +411,21 @@
 
    private void undeployAll() throws Exception
    {
-   	ServerManagement.undeployQueue("suckQueue", 0);
+      try
+      {
+         ServerManagement.undeployQueue("suckQueue", 0);
+      }
+      catch (Exception ignore)
+      {
+      }
 
-   	ServerManagement.undeployQueue("suckQueue", 1);
+      try
+      {
+         ServerManagement.undeployQueue("suckQueue", 1);
+      }
+      catch (Exception ignore)
+      {
+      }
    	
       String cfName =
    		(String)ServerManagement.getServer(1).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
@@ -379,13 +504,6 @@
       	
       	MessageProducer prod = sess0.createProducer(queue0);
       	
-      	//Note! The message must be sent as non persistent for this test
-      	//Since we have not deployed suckQueue on all nodes of the cluster
-      	//this would cause persistent messages to not be delivered since they would
-      	//fail to replicate to their backup (since suckQueue is not deployed on it)
-      	
-      	prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-      	
       	for (int i = 0; i < NUM_MESSAGES; i++)
       	{
       		TextMessage tm = sess0.createTextMessage("message" + i);




More information about the jboss-cvs-commits mailing list