[jboss-cvs] JBoss Messaging SVN: r3296 - in branches/Branch_Stable: src/main/org/jboss/jms/server/destination and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 7 11:34:07 EST 2007
Author: timfox
Date: 2007-11-07 11:34:07 -0500 (Wed, 07 Nov 2007)
New Revision: 3296
Modified:
branches/Branch_Stable/.classpath
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1136
Modified: branches/Branch_Stable/.classpath
===================================================================
--- branches/Branch_Stable/.classpath 2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/.classpath 2007-11-07 16:34:07 UTC (rev 3296)
@@ -45,7 +45,6 @@
<classpathentry kind="var" path="ANT_HOME/lib/ant.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/common/lib/jboss-common.jar"/>
- <classpathentry kind="src" path=".apt_generated"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-j2ee.jar"/>
<classpathentry kind="lib" path="thirdparty/jboss/aop/lib/jboss-aop.jar"/>
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-system.jar"/>
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java 2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/QueueService.java 2007-11-07 16:34:07 UTC (rev 3296)
@@ -138,6 +138,22 @@
started = true;
+ //Now we need to trigger a delivery - this is because message suckers might have
+ //been create *before* the queue was deployed - this is because message suckers can be
+ //created when the clusterpullconnectionfactory deploy is detected which then causes
+ //the clusterconnectionmanager to inspect the bindings for queues to create suckers
+ //to - but these bindings will exist before the queue or topic is deployed and before
+ //it has had its messages loaded
+ //Therefore we need to trigger a delivery now so remote suckers get messages
+ //See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+ //For JBM we should remove the distinction between activation and deployment to
+ //remove these annoyances and edge cases.
+ //The post office should load(=deploy) all bindings on startup including loading their
+ //state before adding the binding - there should be no separate deployment stage
+ //If the queue can be undeployed there should be a separate flag for this on the
+ //binding
+ queue.deliver();
+
log.info(this + " started, fullSize=" + destination.getFullSize() +
", pageSize=" + destination.getPageSize() + ", downCacheSize=" + destination.getDownCacheSize());
}
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java 2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/destination/TopicService.java 2007-11-07 16:34:07 UTC (rev 3296)
@@ -102,7 +102,23 @@
new MessageCounter(counterName, subName, queue, true, true,
dayLimitToUse);
- serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);
+ serverPeer.getMessageCounterManager().registerMessageCounter(counterName, counter);
+
+ //Now we need to trigger a delivery - this is because message suckers might have
+ //been create *before* the queue was deployed - this is because message suckers can be
+ //created when the clusterpullconnectionfactory deploy is detected which then causes
+ //the clusterconnectionmanager to inspect the bindings for queues to create suckers
+ //to - but these bindings will exist before the queue or topic is deployed and before
+ //it has had its messages loaded
+ //Therefore we need to trigger a delivery now so remote suckers get messages
+ //See http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+ //For JBM we should remove the distinction between activation and deployment to
+ //remove these annoyances and edge cases.
+ //The post office should load(=deploy) all bindings on startup including loading their
+ //state before adding the binding - there should be no separate deployment stage
+ //If the queue can be undeployed there should be a separate flag for this on the
+ //binding
+ queue.deliver();
}
serverPeer.getDestinationManager().registerDestination(destination);
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-11-07 14:03:57 UTC (rev 3295)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/clustering/ClusterConnectionManagerTest.java 2007-11-07 16:34:07 UTC (rev 3296)
@@ -278,6 +278,119 @@
deployCFLocal();
suck();
}
+
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-1136
+ public void testCreateConsumerBeforeRemoteDeployment() throws Exception
+ {
+ final int NUM_MESSAGES = 20;
+
+ deployCFLocal();
+ deployLocal();
+
+ //Send some messages
+
+ Queue queue0 = (Queue)ic[0].lookup("/queue/suckQueue");
+
+ Connection conn0 = null;
+
+ try
+ {
+ conn0 = this.createConnectionOnServer(cf, 0);
+
+ assertEquals(0, getServerId(conn0));
+
+ //Send some messages on node 0
+
+ Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess0.createProducer(queue0);
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = sess0.createTextMessage("message" + i);
+
+ prod.send(tm);
+ }
+ }
+ finally
+ {
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ }
+
+ log.info("Sent messages");
+
+ //Undeploy
+ this.undeployAll();
+
+ log.info("Undeployed");
+
+ deployCFRemote();
+ deployRemote();
+
+ Queue queue1 = (Queue)ic[1].lookup("/queue/suckQueue");
+
+ //Create the consumer - but the messages will be stranded on other node
+ //Until we deploy - we do this on another thread
+
+ Thread t = new Thread(new Runnable() {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(5000);
+ deployCFLocal();
+ deployLocal();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to deploy", e);
+ }
+ }
+ });
+
+ t.start();
+
+ Connection conn1 = null;
+
+ try
+ {
+ //Consume them on node 1
+
+ conn1 = this.createConnectionOnServer(cf, 1);
+
+ assertEquals(1, getServerId(conn1));
+
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue1);
+
+ conn1.start();
+
+ for (int i = 0; i < NUM_MESSAGES; i++)
+ {
+ TextMessage tm = (TextMessage)cons1.receive(30000);
+
+ assertNotNull(tm);
+
+ log.info("Got message " + tm.getText());
+
+ assertEquals("message" + i, tm.getText());
+ }
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+
+ t.join();
+
+ }
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
@@ -298,9 +411,21 @@
private void undeployAll() throws Exception
{
- ServerManagement.undeployQueue("suckQueue", 0);
+ try
+ {
+ ServerManagement.undeployQueue("suckQueue", 0);
+ }
+ catch (Exception ignore)
+ {
+ }
- ServerManagement.undeployQueue("suckQueue", 1);
+ try
+ {
+ ServerManagement.undeployQueue("suckQueue", 1);
+ }
+ catch (Exception ignore)
+ {
+ }
String cfName =
(String)ServerManagement.getServer(1).getAttribute(ServerManagement.getServerPeerObjectName(), "ClusterPullConnectionFactoryName");
@@ -379,13 +504,6 @@
MessageProducer prod = sess0.createProducer(queue0);
- //Note! The message must be sent as non persistent for this test
- //Since we have not deployed suckQueue on all nodes of the cluster
- //this would cause persistent messages to not be delivered since they would
- //fail to replicate to their backup (since suckQueue is not deployed on it)
-
- prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-
for (int i = 0; i < NUM_MESSAGES; i++)
{
TextMessage tm = sess0.createTextMessage("message" + i);
More information about the jboss-cvs-commits
mailing list