[jboss-cvs] JBoss Messaging SVN: r6116 - in trunk/tests/src/org/jboss/messaging/tests: integration/cluster/distribution and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Mar 19 15:00:26 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-03-19 15:00:26 -0400 (Thu, 19 Mar 2009)
New Revision: 6116

Modified:
   trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
   trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
FixFixing MessageRedistributionTest (making it wait messages before removing the binding)

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java	2009-03-19 19:00:26 UTC (rev 6116)
@@ -30,22 +30,18 @@
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.MessageHandler;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
-import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.server.Messaging;
 import org.jboss.messaging.core.server.MessagingService;
 import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.jms.client.JBossTextMessage;
-import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
 import org.jboss.messaging.utils.SimpleString;
 
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
-public class ClientConsumerTest extends UnitTestCase
+public class ClientConsumerTest extends ServiceTestBase
 {
    private static final Logger log = Logger.getLogger(ClientConsumerTest.class);
 
@@ -57,16 +53,9 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      
-      Configuration conf = new ConfigurationImpl();
 
-      conf.setSecurityEnabled(false);
+      messagingService = createService(false);
 
-      conf.getAcceptorConfigurations()
-          .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
-
-      messagingService = Messaging.newNullStorageMessagingService(conf);
-
       messagingService.start();
    }
 
@@ -96,7 +85,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -126,10 +115,56 @@
 
    }
 
+   public void testMessageCounter() throws Exception
+   {
+      ClientSessionFactory sf = createInVMFactory();
+
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
+      
+      ClientSession session = sf.createSession(null, null, false, false, false, false, 0);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false);
+
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = createTextMessage("m" + i, session);
+         producer.send(message);
+      }
+      
+      session.commit();
+      session.start();
+      
+      assertEquals(100, getMessageCounter(messagingService.getServer().getPostOffice(), QUEUE.toString()));
+
+      ClientConsumer consumer = session.createConsumer(QUEUE, null, false);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         assertNotNull(message);
+         message.acknowledge();
+
+         session.commit();
+
+         assertEquals("m" + i, message.getBody().readString());
+      }
+
+      session.close();
+
+      assertEquals(0, getMessageCounter(messagingService.getServer().getPostOffice(), QUEUE.toString()));
+
+   }
+
    public void testConsumerBrowserWithSelector() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -141,7 +176,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          message.putIntProperty(new SimpleString("x"), i);
          producer.send(message);
       }
@@ -175,7 +210,7 @@
    public void testConsumerBrowserWithStringSelector() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -187,7 +222,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          if (i % 2 == 0)
          {
             message.putStringProperty(new SimpleString("color"), new SimpleString("RED"));
@@ -211,7 +246,7 @@
    public void testConsumerMultipleBrowser() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -223,7 +258,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -248,7 +283,7 @@
    public void testConsumerMultipleBrowserWithSelector() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -260,7 +295,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          message.putIntProperty(new SimpleString("x"), i);
          producer.send(message);
       }
@@ -293,16 +328,15 @@
    {
       testConsumerBrowserMessagesArentAcked(false);
    }
-   
+
    public void testConsumerBrowserMessagesPreACK() throws Exception
    {
       testConsumerBrowserMessagesArentAcked(false);
    }
-   
 
    private void testConsumerBrowserMessagesArentAcked(boolean preACK) throws Exception
    {
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(null, null, false, true, true, preACK, 0);
 
@@ -314,7 +348,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -337,7 +371,7 @@
 
    public void testConsumerBrowserMessageAckDoesNothing() throws Exception
    {
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -349,7 +383,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -374,7 +408,7 @@
 
    public void testSetMessageHandlerWithMessagesPending() throws Exception
    {
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -386,7 +420,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -435,7 +469,7 @@
 
    public void testStopConsumer() throws Exception
    {
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       final ClientSession session = sf.createSession(false, true, true);
 
@@ -447,14 +481,14 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
       final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
 
       session.start();
-      
+
       final CountDownLatch latch = new CountDownLatch(10);
 
       // Message should be in consumer
@@ -462,20 +496,21 @@
       class MyHandler implements MessageHandler
       {
          boolean failed;
+
          boolean started = true;
-         
+
          public void onMessage(final ClientMessage message)
          {
-            
+
             try
             {
                if (!started)
                {
                   failed = true;
                }
-               
+
                latch.countDown();
-               
+
                if (latch.getCount() == 0)
                {
                   session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
@@ -490,36 +525,35 @@
             }
          }
       }
-      
+
       MyHandler handler = new MyHandler();
 
       consumer.setMessageHandler(handler);
 
       latch.await();
-      
+
       Thread.sleep(100);
 
       assertFalse(handler.failed);
 
       // Make sure no exceptions were thrown from onMessage
       assertNull(consumer.getLastException());
-      
+
       for (int i = 0; i < 90; i++)
       {
          ClientMessage msg = consumer.receive(1000);
          assertNotNull(msg);
          msg.acknowledge();
       }
-      
+
       assertNull(consumer.receiveImmediate());
 
       session.close();
    }
 
-
    public void testConsumerAckImmediateAutoCommitTrue() throws Exception
    {
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true, true);
 
@@ -531,7 +565,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -555,7 +589,7 @@
    public void testConsumerAckImmediateAutoCommitFalse() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, false, true);
 
@@ -567,7 +601,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -591,7 +625,7 @@
    public void testConsumerAckImmediateAckIgnored() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true, true);
 
@@ -603,7 +637,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -631,7 +665,7 @@
    public void testConsumerAckImmediateCloseSession() throws Exception
    {
 
-      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+      ClientSessionFactory sf = createInVMFactory();
 
       ClientSession session = sf.createSession(false, true, true, true);
 
@@ -643,7 +677,7 @@
 
       for (int i = 0; i < numMessages; i++)
       {
-         ClientMessage message = createMessage(session, "m" + i);
+         ClientMessage message = createTextMessage("m" + i, session);
          producer.send(message);
       }
 
@@ -673,14 +707,4 @@
                    ((Queue)messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable()).getMessageCount());
    }
 
-   private ClientMessage createMessage(final ClientSession session, final String msg)
-   {
-      ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
-                                                          false,
-                                                          0,
-                                                          System.currentTimeMillis(),
-                                                          (byte)1);
-      message.getBody().writeString(msg);
-      return message;
-   }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java	2009-03-19 19:00:26 UTC (rev 6116)
@@ -110,7 +110,48 @@
    private MessagingService[] services = new MessagingService[MAX_SERVERS];
 
    private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
+   
+   protected void waitForMessages(int node,
+                                  final String address,
+                                  final int count) throws Exception
+   {
+      MessagingService service = this.services[node];
 
+      if (service == null)
+      {
+         throw new IllegalArgumentException("No service at " + node);
+      }
+
+      PostOffice po = service.getServer().getPostOffice();
+
+      long start = System.currentTimeMillis();
+
+      int messageCount = 0;
+
+
+      do
+      {
+         messageCount = getMessageCounter(po, address);
+
+         log.info(node + " messageCount " + messageCount);
+
+         if (messageCount == count)
+         {
+            log.info("Waited " + (System.currentTimeMillis() - start));
+            return;
+         }
+
+         Thread.sleep(100);
+      }
+      while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
+      
+      System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+
+      throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount + ", expecting = " + count);
+   }
+
+
+
    protected void waitForBindings(int node,
                                   final String address,
                                   final int count,
@@ -137,13 +178,18 @@
 
       long start = System.currentTimeMillis();
 
+      int bindingCount = 0;
+
+      int totConsumers = 0;
+
+
       do
       {
-         Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
+         bindingCount = 0;
 
-         int bindingCount = 0;
+         totConsumers = 0;
 
-         int totConsumers = 0;
+         Bindings bindings = po.getBindingsForAddress(new SimpleString(address));
 
          for (Binding binding : bindings.getBindings())
          {
@@ -171,7 +217,7 @@
       
       System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
-      throw new IllegalStateException("Timed out waiting for bindings");
+      throw new IllegalStateException("Timed out waiting for bindings (bindingCount = " + bindingCount + ", totConsumers = " + totConsumers);
    }
 
    protected void createQueue(int node, String address, String queueName, String filterVal, boolean durable) throws Exception
@@ -356,6 +402,7 @@
    
    protected void verifyReceiveAllInRangeNotBefore(long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
+      boolean outOfOrder = false;
       for (int i = 0; i < consumerIDs.length; i++)
       {
          ConsumerHolder holder = consumers[consumerIDs[i]];
@@ -376,9 +423,15 @@
                assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
             }
 
-            assertEquals(j, message.getProperty(COUNT_PROP));
+            if (j != (Integer)(message.getProperty(COUNT_PROP)))
+            {
+               outOfOrder = true;
+               System.out.println("Message j=" + j + " was received out of order = " + message.getProperty(COUNT_PROP));
+            }
          }
       }
+      
+      assertFalse("Messages were consumed out of order, look at System.out for more information", outOfOrder);
    }
 
    protected void verifyReceiveAll(int numMessages, int... consumerIDs) throws Exception

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/MessageRedistributionTest.java	2009-03-19 19:00:26 UTC (rev 6116)
@@ -363,65 +363,82 @@
       setupSessionFactory(0, isNetty());
       setupSessionFactory(1, isNetty());
       setupSessionFactory(2, isNetty());
+      
+      final String ADDRESS = "queues.testaddress";
+      final String QUEUE = "queue0";
 
-      createQueue(0, "queues.testaddress", "queue0", null, false);
-      createQueue(1, "queues.testaddress", "queue0", null, false);
-      createQueue(2, "queues.testaddress", "queue0", null, false);
 
-      addConsumer(0, 0, "queue0", null);
+      createQueue(0, ADDRESS, QUEUE, null, false);
+      createQueue(1, ADDRESS, QUEUE, null, false);
+      createQueue(2, ADDRESS, QUEUE, null, false);
 
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 0, true);
-      waitForBindings(2, "queues.testaddress", 1, 0, true);
+      addConsumer(0, 0, QUEUE, null);
 
-      waitForBindings(0, "queues.testaddress", 2, 0, false);
-      waitForBindings(1, "queues.testaddress", 2, 1, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
+      waitForBindings(0, ADDRESS, 1, 1, true);
+      waitForBindings(1, ADDRESS, 1, 0, true);
+      waitForBindings(2, ADDRESS, 1, 0, true);
 
-      send(0, "queues.testaddress", 20, false, null);
+      waitForBindings(0, ADDRESS, 2, 0, false);
+      waitForBindings(1, ADDRESS, 2, 1, false);
+      waitForBindings(2, ADDRESS, 2, 1, false);
 
+      send(0, ADDRESS, 20, false, null);
+      
+      waitForMessages(0, ADDRESS, 20);
+
       removeConsumer(0);
       
-      waitForBindings(0, "queues.testaddress", 1, 0, true);
-      waitForBindings(1, "queues.testaddress", 1, 0, true);
-      waitForBindings(2, "queues.testaddress", 1, 0, true);
+      waitForBindings(0, ADDRESS, 1, 0, true);
+      waitForBindings(1, ADDRESS, 1, 0, true);
+      waitForBindings(2, ADDRESS, 1, 0, true);
 
-      waitForBindings(0, "queues.testaddress", 2, 0, false);
-      waitForBindings(1, "queues.testaddress", 2, 0, false);
-      waitForBindings(2, "queues.testaddress", 2, 0, false);
+      waitForBindings(0, ADDRESS, 2, 0, false);
+      waitForBindings(1, ADDRESS, 2, 0, false);
+      waitForBindings(2, ADDRESS, 2, 0, false);
 
-      addConsumer(1, 1, "queue0", null);
+      addConsumer(1, 1, QUEUE, null);
       
-      waitForBindings(0, "queues.testaddress", 1, 0, true);
-      waitForBindings(1, "queues.testaddress", 1, 1, true);
-      waitForBindings(2, "queues.testaddress", 1, 0, true);
+      waitForBindings(0, ADDRESS, 1, 0, true);
+      waitForBindings(1, ADDRESS, 1, 1, true);
+      waitForBindings(2, ADDRESS, 1, 0, true);
+      
+      waitForMessages(1, ADDRESS, 20);
+      waitForMessages(0, ADDRESS, 0);
+      
 
-      waitForBindings(0, "queues.testaddress", 2, 1, false);
-      waitForBindings(1, "queues.testaddress", 2, 0, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
+      waitForBindings(0, ADDRESS, 2, 1, false);
+      waitForBindings(1, ADDRESS, 2, 0, false);
+      waitForBindings(2, ADDRESS, 2, 1, false);
       
       removeConsumer(1);
       
-      waitForBindings(0, "queues.testaddress", 1, 0, true);
-      waitForBindings(1, "queues.testaddress", 1, 0, true);
-      waitForBindings(2, "queues.testaddress", 1, 0, true);
+      waitForBindings(0, ADDRESS, 1, 0, true);
+      waitForBindings(1, ADDRESS, 1, 0, true);
+      waitForBindings(2, ADDRESS, 1, 0, true);
 
-      waitForBindings(0, "queues.testaddress", 2, 0, false);
-      waitForBindings(1, "queues.testaddress", 2, 0, false);
-      waitForBindings(2, "queues.testaddress", 2, 0, false);
+      waitForBindings(0, ADDRESS, 2, 0, false);
+      waitForBindings(1, ADDRESS, 2, 0, false);
+      waitForBindings(2, ADDRESS, 2, 0, false);
 
-      addConsumer(0, 0, "queue0", null);
+      addConsumer(0, 0, QUEUE, null);
       
-      waitForBindings(0, "queues.testaddress", 1, 1, true);
-      waitForBindings(1, "queues.testaddress", 1, 0, true);
-      waitForBindings(2, "queues.testaddress", 1, 0, true);
+      waitForBindings(0, ADDRESS, 1, 1, true);
+      waitForBindings(1, ADDRESS, 1, 0, true);
+      waitForBindings(2, ADDRESS, 1, 0, true);
+      
+      waitForBindings(0, ADDRESS, 2, 0, false);
+      waitForBindings(1, ADDRESS, 2, 1, false);
+      waitForBindings(2, ADDRESS, 2, 1, false);
 
-      waitForBindings(0, "queues.testaddress", 2, 0, false);
-      waitForBindings(1, "queues.testaddress", 2, 1, false);
-      waitForBindings(2, "queues.testaddress", 2, 1, false);
-
+      waitForMessages(0, ADDRESS, 20);
+      
       verifyReceiveAll(20, 0);
       verifyNotReceive(0);
+      
+      addConsumer(1, 1, QUEUE, null);
+      verifyNotReceive(1);
+      removeConsumer(1);
+      
    }
    
    public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-03-19 17:10:25 UTC (rev 6115)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java	2009-03-19 19:00:26 UTC (rev 6116)
@@ -36,6 +36,11 @@
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.config.impl.FileConfiguration;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.QueueBinding;
+import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
 import org.jboss.messaging.core.server.JournalType;
@@ -46,6 +51,7 @@
 import org.jboss.messaging.integration.transports.netty.NettyConnectorFactory;
 import org.jboss.messaging.jms.client.JBossBytesMessage;
 import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.utils.SimpleString;
 
 /**
  * 
@@ -299,7 +305,34 @@
       message.getBody().writeBytes(b);
       return message;
    }
+   
+   /**
+    * @param address
+    * @param postOffice
+    * @return
+    * @throws Exception
+    */
+   protected int getMessageCounter(final PostOffice postOffice, final String address) throws Exception
+   {
+      int messageCount;
+      messageCount = 0;
 
+      Bindings bindings = postOffice.getBindingsForAddress(new SimpleString(address));
+
+      for (Binding binding : bindings.getBindings())
+      {
+         if ((binding instanceof LocalQueueBinding))
+         {
+            QueueBinding qBinding = (QueueBinding)binding;
+            
+            messageCount += qBinding.getQueue().getMessageCount();
+
+         }
+      }
+      return messageCount;
+   }
+
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------




More information about the jboss-cvs-commits mailing list