[jboss-cvs] JBoss Messaging SVN: r6116 - in trunk/tests/src/org/jboss/messaging/tests: integration/cluster/distribution and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 19 15:00:26 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-19 15:00:26 -0400 (Thu, 19 Mar 2009)
New Revision: 6116
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
FixFixing MessageRedistributionTest (making it wait messages before removing the binding)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-19 19:00:26 UTC (rev 6116)
@@ -30,22 +30,18 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.Messaging;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
*/
-public class ClientConsumerTest extends UnitTestCase
+public class ClientConsumerTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
@@ -57,16 +53,9 @@
protected void setUp() throws Exception
{
super.setUp();
-
- Configuration conf = new ConfigurationImpl();
- conf.setSecurityEnabled(false);
+ messagingService = createService(false);
- conf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
-
- messagingService = Messaging.newNullStorageMessagingService(conf);
-
messagingService.start();
}
@@ -96,7 +85,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -126,10 +115,56 @@
}
+ public void testMessageCounter() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+
+ ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ producer.send(message);
+ }
+
+ session.commit();
+ session.start();
+
+ assertEquals(100, getMessageCounter(messagingService.getServer().getPostOffice(), QUEUE.toString()));
+
+ ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+ message.acknowledge();
+
+ session.commit();
+
+ assertEquals("m" + i, message.getBody().readString());
+ }
+
+ session.close();
+
+ assertEquals(0, getMessageCounter(messagingService.getServer().getPostOffice(), QUEUE.toString()));
+
+ }
+
public void testConsumerBrowserWithSelector() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -141,7 +176,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
message.putIntProperty(new SimpleString("x"), i);
producer.send(message);
}
@@ -175,7 +210,7 @@
public void testConsumerBrowserWithStringSelector() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -187,7 +222,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
if (i % 2 == 0)
{
message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
@@ -211,7 +246,7 @@
public void testConsumerMultipleBrowser() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -223,7 +258,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -248,7 +283,7 @@
public void testConsumerMultipleBrowserWithSelector() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -260,7 +295,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
message.putIntProperty(new SimpleString("x"), i);
producer.send(message);
}
@@ -293,16 +328,15 @@
{
testConsumerBrowserMessagesArentAcked(false);
}
-
+
public void testConsumerBrowserMessagesPreACK() throws Exception
{
testConsumerBrowserMessagesArentAcked(false);
}
-
private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
@@ -314,7 +348,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -337,7 +371,7 @@
public void testConsumerBrowserMessageAckDoesNothing() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -349,7 +383,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -374,7 +408,7 @@
public void testSetMessageHandlerWithMessagesPending() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true);
@@ -386,7 +420,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -435,7 +469,7 @@
public void testStopConsumer() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
final ClientSession session = sf.createSession(false, true, true);
@@ -447,14 +481,14 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
session.start();
-
+
final CountDownLatch latch = new CountDownLatch(10);
// Message should be in consumer
@@ -462,20 +496,21 @@
class MyHandler implements MessageHandler
{
boolean failed;
+
boolean started = true;
-
+
public void onMessage(final ClientMessage message)
{
-
+
try
{
if (!started)
{
failed = true;
}
-
+
latch.countDown();
-
+
if (latch.getCount() == 0)
{
session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
@@ -490,36 +525,35 @@
}
}
}
-
+
MyHandler handler = new MyHandler();
consumer.setMessageHandler(handler);
latch.await();
-
+
Thread.sleep(100);
assertFalse(handler.failed);
// Make sure no exceptions were thrown from onMessage
assertNull(consumer.getLastException());
-
+
for (int i = 0; i < 90; i++)
{
ClientMessage msg = consumer.receive(1000);
assertNotNull(msg);
msg.acknowledge();
}
-
+
assertNull(consumer.receiveImmediate());
session.close();
}
-
public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true, true);
@@ -531,7 +565,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -555,7 +589,7 @@
public void testConsumerAckImmediateAutoCommitFalse() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, false, true);
@@ -567,7 +601,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -591,7 +625,7 @@
public void testConsumerAckImmediateAckIgnored() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true, true);
@@ -603,7 +637,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -631,7 +665,7 @@
public void testConsumerAckImmediateCloseSession() throws Exception
{
- ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactory sf = createInVMFactory();
ClientSession session = sf.createSession(false, true, true, true);
@@ -643,7 +677,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = createMessage(session, "m" + i);
+ ClientMessage message = createTextMessage("m" + i, session);
producer.send(message);
}
@@ -673,14 +707,4 @@
((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
}
- private ClientMessage createMessage(final ClientSession session, final String msg)
- {
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.getBody().writeString(msg);
- return message;
- }
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-03-19 19:00:26 UTC (rev 6116)
@@ -110,7 +110,48 @@
private MessagingService[] services = new MessagingService[MAX_SERVERS];
private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
+
+ protected void waitForMessages(int node,
+ final String address,
+ final int count) throws Exception
+ {
+ MessagingService service = this.services[node];
+ if (service == null)
+ {
+ throw new IllegalArgumentException("No service at " + node);
+ }
+
+ PostOffice po = service.getServer().getPostOffice();
+
+ long start = System.currentTimeMillis();
+
+ int messageCount = 0;
+
+
+ do
+ {
+ messageCount = getMessageCounter(po, address);
+
+ log.info(node + " messageCount " + messageCount);
+
+ if (messageCount == count)
+ {
+ log.info("Waited " + (System.currentTimeMillis() - start));
+ return;
+ }
+
+ Thread.sleep(100);
+ }
+ while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+
+ System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+
+ throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount + ", expecting = " + count);
+ }
+
+
+
protected void waitForBindings(int node,
final String address,
final int count,
@@ -137,13 +178,18 @@
long start = System.currentTimeMillis();
+ int bindingCount = 0;
+
+ int totConsumers = 0;
+
+
do
{
- Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
+ bindingCount = 0;
- int bindingCount = 0;
+ totConsumers = 0;
- int totConsumers = 0;
+ Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
for (Binding binding : bindings.getBindings())
{
@@ -171,7 +217,7 @@
System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
- throw new IllegalStateException("Timed out waiting for bindings");
+ throw new IllegalStateException("Timed out waiting for bindings (bindingCount = " + bindingCount + ", totConsumers = " + totConsumers);
}
protected void createQueue(int node, String address, String queueName, String filterVal, boolean durable) throws Exception
@@ -356,6 +402,7 @@
protected void verifyReceiveAllInRangeNotBefore(long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
{
+ boolean outOfOrder = false;
for (int i = 0; i < consumerIDs.length; i++)
{
ConsumerHolder holder = consumers[consumerIDs[i]];
@@ -376,9 +423,15 @@
assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
}
- assertEquals(j, message.getProperty(COUNT_PROP));
+ if (j != (Integer)(message.getProperty(COUNT_PROP)))
+ {
+ outOfOrder = true;
+ System.out.println("Message j=" + j + " was received out of order = " + message.getProperty(COUNT_PROP));
+ }
}
}
+
+ assertFalse("Messages were consumed out of order, look at System.out for more information", outOfOrder);
}
protected void verifyReceiveAll(int numMessages, int... consumerIDs) throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java 2009-03-19 19:00:26 UTC (rev 6116)
@@ -363,65 +363,82 @@
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
+
+ final String ADDRESS = "queues.testaddress";
+ final String QUEUE = "queue0";
- createQueue(0, "queues.testaddress", "queue0", null, false);
- createQueue(1, "queues.testaddress", "queue0", null, false);
- createQueue(2, "queues.testaddress", "queue0", null, false);
- addConsumer(0, 0, "queue0", null);
+ createQueue(0, ADDRESS, QUEUE, null, false);
+ createQueue(1, ADDRESS, QUEUE, null, false);
+ createQueue(2, ADDRESS, QUEUE, null, false);
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ addConsumer(0, 0, QUEUE, null);
- waitForBindings(0, "queues.testaddress", 2, 0, false);
- waitForBindings(1, "queues.testaddress", 2, 1, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
+ waitForBindings(0, ADDRESS, 1, 1, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
- send(0, "queues.testaddress", 20, false, null);
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 1, false);
+ waitForBindings(2, ADDRESS, 2, 1, false);
+ send(0, ADDRESS, 20, false, null);
+
+ waitForMessages(0, ADDRESS, 20);
+
removeConsumer(0);
- waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 0, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 0, false);
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+ waitForBindings(2, ADDRESS, 2, 0, false);
- addConsumer(1, 1, "queue0", null);
+ addConsumer(1, 1, QUEUE, null);
- waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 1, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForMessages(1, ADDRESS, 20);
+ waitForMessages(0, ADDRESS, 0);
+
- waitForBindings(0, "queues.testaddress", 2, 1, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
+ waitForBindings(0, ADDRESS, 2, 1, false);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+ waitForBindings(2, ADDRESS, 2, 1, false);
removeConsumer(1);
- waitForBindings(0, "queues.testaddress", 1, 0, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
- waitForBindings(0, "queues.testaddress", 2, 0, false);
- waitForBindings(1, "queues.testaddress", 2, 0, false);
- waitForBindings(2, "queues.testaddress", 2, 0, false);
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+ waitForBindings(2, ADDRESS, 2, 0, false);
- addConsumer(0, 0, "queue0", null);
+ addConsumer(0, 0, QUEUE, null);
- waitForBindings(0, "queues.testaddress", 1, 1, true);
- waitForBindings(1, "queues.testaddress", 1, 0, true);
- waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(0, ADDRESS, 1, 1, true);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(2, ADDRESS, 1, 0, true);
+
+ waitForBindings(0, ADDRESS, 2, 0, false);
+ waitForBindings(1, ADDRESS, 2, 1, false);
+ waitForBindings(2, ADDRESS, 2, 1, false);
- waitForBindings(0, "queues.testaddress", 2, 0, false);
- waitForBindings(1, "queues.testaddress", 2, 1, false);
- waitForBindings(2, "queues.testaddress", 2, 1, false);
-
+ waitForMessages(0, ADDRESS, 20);
+
verifyReceiveAll(20, 0);
verifyNotReceive(0);
+
+ addConsumer(1, 1, QUEUE, null);
+ verifyNotReceive(1);
+ removeConsumer(1);
+
}
public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-03-19 19:00:26 UTC (rev 6116)
@@ -36,6 +36,11 @@
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.config.impl.FileConfiguration;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
import org.jboss.messaging.core.server.JournalType;
@@ -46,6 +51,7 @@
import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.utils.SimpleString;
/**
*
@@ -299,7 +305,34 @@
message.getBody().writeBytes(b);
return message;
}
+
+ /**
+ * @param address
+ * @param postOffice
+ * @return
+ * @throws Exception
+ */
+ protected int getMessageCounter(final PostOffice postOffice, final String address) throws Exception
+ {
+ int messageCount;
+ messageCount = 0;
+ Bindings bindings = postOffice.getBindingsForAddress(new SimpleString(address));
+
+ for (Binding binding : bindings.getBindings())
+ {
+ if ((binding instanceof LocalQueueBinding))
+ {
+ QueueBinding qBinding = (QueueBinding)binding;
+
+ messageCount += qBinding.getQueue().getMessageCount();
+
+ }
+ }
+ return messageCount;
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the jboss-cvs-commits
mailing list