[Jboss-cvs] JBoss Messaging SVN: r1361 - in trunk: src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core src/main/org/jboss/messaging/core/plugin/postoffice src/main/org/jboss/messaging/core/plugin/postoffice/cluster tests/src/org/jboss/test/messaging/core/plugin/base tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Sep 23 14:04:10 EDT 2006
Author: timfox
Date: 2006-09-23 14:04:00 -0400 (Sat, 23 Sep 2006)
New Revision: 1361
Modified:
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
Log:
Clustering
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -135,7 +135,7 @@
}
// SessionDelegate implementation --------------------------------
-
+
public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
String selectorString,
boolean noLocal,
@@ -193,17 +193,30 @@
// non-durable subscription
if (log.isTraceEnabled()) { log.trace("creating new non-durable subscription on " + jmsDestination); }
- //Create the sub
+ //Create the non durable sub
QueuedExecutor executor = (QueuedExecutor)pool.get();
- PagingFilteredQueue q =
- new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
- executor, selector, mDest.getFullSize(),
- mDest.getPageSize(),
- mDest.getDownCacheSize());
+ PagingFilteredQueue q;
- //Make a binding for this queue - non durable subscriptins are always non clustered
- binding = topicPostOffice.bindQueue(jmsDestination.getName(), q);
+ if (topicPostOffice.isLocal())
+ {
+ q = new PagingFilteredQueue(new GUID().toString(), idm.getId(), ms, pm, true, false,
+ executor, selector,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+
+ binding = topicPostOffice.bindQueue(jmsDestination.getName(), q);
+ }
+ else
+ {
+ q = new LocalClusteredQueue(topicPostOffice, nodeId, new GUID().toString(), idm.getId(), ms, pm, true, false,
+ executor, selector, tr,
+ mDest.getFullSize(),
+ mDest.getPageSize(),
+ mDest.getDownCacheSize());
+ binding = ((ClusteredPostOffice)topicPostOffice).bindClusteredQueue(jmsDestination.getName(), (LocalClusteredQueue)q);
+ }
}
else
{
@@ -608,6 +621,8 @@
new PagingFilteredQueue(dest.getName(), idm.getId(), ms, pm, true, false,
executor, null, fullSize, pageSize, downCacheSize);
+
+
//Make a binding for this queue
queuePostOffice.bindQueue(dest.getName(), q);
}
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -1133,13 +1133,12 @@
{
receiversReady = true;
- deliverInternal(false);
-
- if (result != null)
- {
- result.setResult(null);
- }
+ deliverInternal(false);
}
+ if (result != null)
+ {
+ result.setResult(null);
+ }
}
catch (Throwable t)
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -48,7 +48,6 @@
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.plugin.contract.PostOffice;
-import org.jboss.messaging.core.plugin.postoffice.cluster.LocalClusteredQueue;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionRepository;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -131,6 +131,8 @@
private StatsSender statsSender;
private long statsSendPeriod;
+
+ private boolean started;
public DefaultClusteredPostOffice()
{
@@ -241,7 +243,7 @@
// MessagingComponent overrides
// --------------------------------------------------------------
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
if (syncChannelConfigE != null)
{
@@ -286,9 +288,11 @@
syncSendRequest(new SendNodeIdRequest(currentAddress, nodeId));
statsSender.start();
+
+ started = true;
}
- public void stop() throws Exception
+ public synchronized void stop() throws Exception
{
super.stop();
@@ -297,6 +301,8 @@
syncChannel.close();
asyncChannel.close();
+
+ started = false;
}
// PostOffice implementation ---------------------------------------
@@ -752,8 +758,13 @@
}
}
- public void sendQueueStats() throws Exception
+ public synchronized void sendQueueStats() throws Exception
{
+ if (!started)
+ {
+ return;
+ }
+
lock.readLock().acquire();
List statsList = null;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultRouterFactory.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -21,7 +21,6 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
-import org.jboss.messaging.core.Router;
/**
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -28,7 +28,6 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.Filter;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.SimpleDelivery;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -171,6 +171,8 @@
//since they would have been acked on the pulling node
LocalClusteredQueue queue = (LocalClusteredQueue)del.getObserver();
+ log.info("i am committing request");
+
queue.acknowledgeFromCluster(del);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesResponse.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -23,7 +23,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -41,7 +40,7 @@
* $Id$
*
*/
-public class PullMessagesResponse implements Streamable, Serializable
+public class PullMessagesResponse implements Streamable
{
private List messages;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/ClusteringTestBase.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -269,7 +269,7 @@
list.add(msg);
}
- Thread.sleep(1000);
+ Thread.sleep(2000);
return list;
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOfficeTest.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -21,7 +21,6 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -29,10 +28,8 @@
import org.jboss.messaging.core.FilterFactory;
import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
-import org.jboss.messaging.core.Queue;
import org.jboss.messaging.core.local.PagingFilteredQueue;
import org.jboss.messaging.core.plugin.contract.ClusteredPostOffice;
-import org.jboss.messaging.core.plugin.contract.PostOffice;
import org.jboss.messaging.core.plugin.postoffice.Binding;
import org.jboss.messaging.core.plugin.postoffice.cluster.ClusterRouterFactory;
import org.jboss.messaging.core.plugin.postoffice.cluster.DefaultClusteredPostOffice;
@@ -550,19 +547,11 @@
{
//ok
}
+
+ office1.unbindClusteredQueue("queue1");
- try
- {
- office1.unbindQueue("queue1");
- fail();
- }
- catch (Exception e)
- {
- //ok
- }
+ //It should be possible to bind local queues into a clustered post office
- office1.unbindClusteredQueue("queue1");
-
PagingFilteredQueue queue7 = new PagingFilteredQueue("queue1", im.getId(), ms, pm, true, false, (QueuedExecutor)pool.get(), null);
Binding binding7 = office1.bindQueue("queue1", queue7);
@@ -848,17 +837,15 @@
if (office1 != null)
{
try
- {
- office2.unbindClusteredQueue("sub5");
+ {
office1.unbindClusteredQueue("sub7");
- office1.unbindClusteredQueue("sub8");
- office2.unbindClusteredQueue("sub13");
+ office1.unbindClusteredQueue("sub8");
office1.unbindClusteredQueue("sub15");
office1.unbindClusteredQueue("sub16");
}
catch (Exception ignore)
{
-
+ ignore.printStackTrace();
}
office1.stop();
@@ -866,6 +853,15 @@
if (office2 != null)
{
+ try
+ {
+ office2.unbindClusteredQueue("sub5");
+ office2.unbindClusteredQueue("sub13");
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
office2.stop();
}
@@ -976,8 +972,6 @@
this.checkEmpty(receiver4);
checkContainsAndAcknowledge(msg, receiver5, queue5);
- log.info("************* ROOTING");
-
msg = CoreMessageFactory.createCoreMessage(1, persistentMessage, null);
ref = ms.reference(msg);
routed = office6.route(ref, "queue1", null);
@@ -989,7 +983,6 @@
Thread.sleep(1000);
- log.info("checking");
checkContainsAndAcknowledge(msg, receiver1, queue1);
this.checkEmpty(receiver1);
this.checkEmpty(receiver2);
@@ -1174,12 +1167,10 @@
sharedDurable5.add(receiver14);
- //Send 3 messages at node1
+ //Send 1 message at node1
//========================
- log.info("******** sending");
- List msgs = sendMessages("topic", persistent, office1, 3, null);
- log.info("********** sent");
+ List msgs = sendMessages("topic", persistent, office1, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1208,10 +1199,10 @@
checkEmpty(receiver12);
- //Send 3 messages at node2
+ //Send 1 message at node2
//========================
- msgs = sendMessages("topic", persistent, office2, 3, null);
+ msgs = sendMessages("topic", persistent, office2, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1239,10 +1230,10 @@
//n7
checkEmpty(receiver12);
- //Send 3 messages at node3
+ //Send 1 message at node3
//========================
- msgs = sendMessages("topic", persistent, office3, 3, null);
+ msgs = sendMessages("topic", persistent, office3, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1270,18 +1261,10 @@
//n7
checkEmpty(receiver12);
- //Send 3 messages at node4
+ //Send 1 message at node4
//========================
-
-// * node1: no subscriptions
-// * node2: 2 non durable
-// * node3: 1 non shared durable, 1 non durable
-// * node4: 1 shared durable (shared1), 1 non shared durable, 3 non durable
-// * node5: 2 shared durable (shared1 and shared2)
-// * node6: 1 shared durable (shared2), 1 non durable
-// * node7: 1 shared durable (shared2)
-
- msgs = sendMessages("topic", persistent, office4, 3, null);
+
+ msgs = sendMessages("topic", persistent, office4, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1309,10 +1292,10 @@
//n7
checkEmpty(receiver12);
- //Send 3 messages at node5
+ //Send 1 message at node5
//========================
- msgs = sendMessages("topic", persistent, office5, 3, null);
+ msgs = sendMessages("topic", persistent, office5, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1340,10 +1323,10 @@
//n7
checkEmpty(receiver12);
- //Send 3 messages at node6
+ //Send 1 message at node6
//========================
- msgs = sendMessages("topic", persistent, office6, 3, null);
+ msgs = sendMessages("topic", persistent, office6, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1372,10 +1355,10 @@
//n7
checkEmpty(receiver12);
- //Send 3 messages at node7
+ //Send 1 message at node7
//========================
- msgs = sendMessages("topic", persistent, office7, 3, null);
+ msgs = sendMessages("topic", persistent, office7, 1, null);
//n2
checkContainsAndAcknowledge(msgs, receiver1, nonDurable1);
@@ -1423,7 +1406,8 @@
office3.unbindClusteredQueue("nonshareddurable1");
}
catch (Exception ignore)
- {
+ {
+ ignore.printStackTrace();
}
office3.stop();
}
@@ -1436,7 +1420,8 @@
office4.unbindClusteredQueue("nonshareddurable2");
}
catch (Exception ignore)
- {
+ {
+ ignore.printStackTrace();
}
office4.stop();
}
@@ -1450,6 +1435,7 @@
}
catch (Exception ignore)
{
+ ignore.printStackTrace();
}
office5.stop();
}
@@ -1462,6 +1448,7 @@
}
catch (Exception ignore)
{
+ ignore.printStackTrace();
}
office6.stop();
}
@@ -1470,10 +1457,11 @@
{
try
{
- office6.unbindClusteredQueue("shareddurable2");
+ office7.unbindClusteredQueue("shareddurable2");
}
catch (Exception ignore)
{
+ ignore.printStackTrace();
}
office7.stop();
}
@@ -2069,17 +2057,33 @@
{
if (office1 != null)
{
- office2.unbindClusteredQueue("sub5");
- office1.unbindClusteredQueue("sub7");
- office1.unbindClusteredQueue("sub8");
- office2.unbindClusteredQueue("sub13");
- office1.unbindClusteredQueue("sub15");
- office1.unbindClusteredQueue("sub16");
+ try
+ {
+ office1.unbindClusteredQueue("sub7");
+ office1.unbindClusteredQueue("sub8");
+ office1.unbindClusteredQueue("sub15");
+ office1.unbindClusteredQueue("sub16");
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
+
office1.stop();
}
if (office2 != null)
{
+ try
+ {
+ office2.unbindClusteredQueue("sub5");
+ office2.unbindClusteredQueue("sub13");
+ }
+ catch (Exception ignore)
+ {
+ ignore.printStackTrace();
+ }
+
office2.stop();
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-23 14:21:10 UTC (rev 1360)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionTest.java 2006-09-23 18:04:00 UTC (rev 1361)
@@ -21,8 +21,6 @@
*/
package org.jboss.test.messaging.core.plugin.postoffice.cluster;
-import java.util.List;
-
import org.jboss.messaging.core.Delivery;
import org.jboss.messaging.core.DeliveryObserver;
import org.jboss.messaging.core.FilterFactory;
@@ -71,16 +69,16 @@
super.tearDown();
}
- public void testRedist() throws Throwable
+ public void testRedistNonPersistent() throws Throwable
{
+ redistTest(false);
+ }
+
+ public void testRedistPersistent() throws Throwable
+ {
redistTest(true);
}
- /*
- *
- *
- *
- */
public void redistTest(boolean persistent) throws Throwable
{
ClusteredPostOffice office1 = null;
@@ -137,20 +135,20 @@
//Check the sizes
- List msgs = queue1.browse();
- assertEquals(30, msgs.size());
+ assertEquals(30, queue1.memoryRefCount());
+ assertEquals(0, queue1.memoryDeliveryCount());
- msgs = queue2.browse();
- assertEquals(30, msgs.size());
+ assertEquals(30, queue2.memoryRefCount());
+ assertEquals(0, queue2.memoryDeliveryCount());
- msgs = queue3.browse();
- assertEquals(30, msgs.size());
+ assertEquals(30, queue3.memoryRefCount());
+ assertEquals(0, queue3.memoryDeliveryCount());
- msgs = queue4.browse();
- assertEquals(30, msgs.size());
+ assertEquals(30, queue4.memoryRefCount());
+ assertEquals(0, queue4.memoryDeliveryCount());
- msgs = queue5.browse();
- assertEquals(30, msgs.size());
+ assertEquals(30, queue5.memoryRefCount());
+ assertEquals(0, queue5.memoryDeliveryCount());
//Now we add the receivers
//Note that we did not do this before the send.
@@ -184,44 +182,53 @@
Thread.sleep(1000);
//Now we check the sizes again in case automatic balancing has erroneously
- //kicked in
+ //kicked in
- msgs = queue1.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue1.memoryRefCount());
+ assertEquals(1, queue1.memoryDeliveryCount());
- msgs = queue2.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue2.memoryRefCount());
+ assertEquals(1, queue2.memoryDeliveryCount());
- msgs = queue3.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue3.memoryRefCount());
+ assertEquals(1, queue3.memoryDeliveryCount());
- msgs = queue4.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue4.memoryRefCount());
+ assertEquals(1, queue4.memoryDeliveryCount());
- msgs = queue5.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue5.memoryRefCount());
+ assertEquals(1, queue5.memoryDeliveryCount());
Thread.sleep(5000);
//And again - should still be no redistribution
- msgs = queue1.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue1.memoryRefCount());
+ assertEquals(1, queue1.memoryDeliveryCount());
- msgs = queue2.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue2.memoryRefCount());
+ assertEquals(1, queue2.memoryDeliveryCount());
- msgs = queue3.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue3.memoryRefCount());
+ assertEquals(1, queue3.memoryDeliveryCount());
- msgs = queue4.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue4.memoryRefCount());
+ assertEquals(1, queue4.memoryDeliveryCount());
- msgs = queue5.browse();
- assertEquals(30, msgs.size());
+ assertEquals(29, queue5.memoryRefCount());
+ assertEquals(1, queue5.memoryDeliveryCount());
- //Try and consumer them all via one receiver
+ Thread.sleep(2000);
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
+
+
log.info("trying to consume");
//So we have 150 messages in total - 30 on each node.
@@ -236,87 +243,79 @@
//Consume 10 on node 4
- //Consume 5 on node 5
+ //We leave the last 5 since they will be as deliveries in the receivers probably
+
+ Delivery del;
log.info("consuming queue1");
for (int i = 0; i < 10; i++)
{
queue1.deliver(true);
- Delivery del = receiver1.getDelivery();
+ del = receiver1.getDelivery();
log.info("Got delivery: " + del.getReference().getMessageID());
del.acknowledge(null);
}
log.info("consumed queue1");
+ log.info("Here are the sizes:");
+
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
log.info("consuming queue2");
for (int i = 0; i < 50; i++)
{
queue2.deliver(true);
- Delivery del = receiver2.getDelivery();
+ del = receiver2.getDelivery();
log.info("Got delivery: " + del.getReference().getMessageID());
del.acknowledge(null);
}
+ log.info("consumed queue2");
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
log.info("consuming queue3");
for (int i = 0; i < 75; i++)
{
queue3.deliver(true);
- Delivery del = receiver3.getDelivery();
+ del = receiver3.getDelivery();
log.info("Got delivery: " + del.getReference().getMessageID());
del.acknowledge(null);
}
+ log.info("consumed queue3");
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
+
log.info("consuming queue4");
for (int i = 0; i < 10; i++)
{
queue4.deliver(true);
- Delivery del = receiver4.getDelivery();
+ del = receiver4.getDelivery();
log.info("Got delivery: " + del.getReference().getMessageID());
del.acknowledge(null);
}
+ log.info("consumed queue4");
- Thread.sleep(2000);
+ log.info("Here are the sizes:");
+ log.info("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.memoryDeliveryCount());
+ log.info("queue2, refs:" + queue2.memoryRefCount() + " dels:" + queue2.memoryDeliveryCount());
+ log.info("queue3, refs:" + queue3.memoryRefCount() + " dels:" + queue3.memoryDeliveryCount());
+ log.info("queue4, refs:" + queue4.memoryRefCount() + " dels:" + queue4.memoryDeliveryCount());
+ log.info("queue5, refs:" + queue5.memoryRefCount() + " dels:" + queue5.memoryDeliveryCount());
- log.info("Here are the sizes:");
-
- msgs = queue1.browse();
- log.info("queue1: " + msgs.size());
-
- msgs = queue2.browse();
- log.info("queue2: " + msgs.size());
-
- msgs = queue3.browse();
- log.info("queue3: " + msgs.size());
-
- msgs = queue4.browse();
- log.info("queue4: " + msgs.size());
-
- msgs = queue5.browse();
- log.info("queue5: " + msgs.size());
-
- log.info("consuming queue5");
- for (int i = 0; i < 5; i++)
- {
- queue5.deliver(true);
- Delivery del = receiver5.getDelivery();
- log.info("Got delivery: " + del.getReference().getMessageID());
- del.acknowledge(null);
- }
-
- msgs = queue1.browse();
- assertEquals(0, msgs.size());
-
- msgs = queue2.browse();
- assertEquals(0, msgs.size());
-
- msgs = queue3.browse();
- assertEquals(0, msgs.size());
-
- msgs = queue4.browse();
- assertEquals(0, msgs.size());
-
- msgs = queue5.browse();
- assertEquals(0, msgs.size());
}
finally
{
More information about the jboss-cvs-commits
mailing list