[hornetq-commits] JBoss hornetq SVN: r10693 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 17 18:03:37 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-17 18:03:37 -0400 (Tue, 17 May 2011)
New Revision: 10693

Modified:
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
Adding a test to investigate an issue

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-05-17 16:55:41 UTC (rev 10692)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-05-17 22:03:37 UTC (rev 10693)
@@ -16,6 +16,7 @@
 import java.util.Arrays;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -29,6 +30,7 @@
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.MessageHandler;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
@@ -363,7 +365,126 @@
       
       
    }
+   
+   public void testTemoraryQueuesWithFilter() throws Exception
+   {
+      
+      int countTmpQueue=0;
+      
+      final AtomicInteger errors = new AtomicInteger(0);
+      
+      class MyHandler implements MessageHandler
+      {
+         final String color;
+         
+         final CountDownLatch latch;
+         
+         final ClientSession sess;
+         
+         public MyHandler(ClientSession sess, String color, int expectedMessages)
+         {
+            this.sess = sess;
+            latch = new CountDownLatch(expectedMessages);
+            this.color = color;
+         }
+         
+         public boolean waitCompletion() throws Exception
+         {
+            return latch.await(10, TimeUnit.SECONDS);
+         }
+         
+         public void onMessage(ClientMessage message)
+         {
+            try
+            {
+               message.acknowledge();
+               sess.commit();
+               latch.countDown();
+               
+               if (!message.getStringProperty("color").equals(color))
+               {
+                  log.warn("Unexpected color " + message.getStringProperty("color") + " when we were expecting " + color);
+                  errors.incrementAndGet();
+               }
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            }
+         }
+         
+      }
+      
+      String address = "AD_test";
+      int iterations = 100;
+      int msgs = 100;
+      
+      // Will be using a single Session as this is how an issue was raised
+      for (int i = 0 ; i < iterations; i++)
+      {
+         ClientSessionFactory clientsConnecton = locator.createSessionFactory();
+         ClientSession localSession = clientsConnecton.createSession();
+         
+         ClientProducer prod = localSession.createProducer(address);
+         
+         localSession.start();
+         
 
+         log.info("Iteration " + i);
+         String queueRed = address + "_red_" + (countTmpQueue++);
+         String queueBlue = address + "_blue_" + (countTmpQueue++);
+         
+         //ClientSession sessConsumerRed = clientsConnecton.createSession();
+         ClientSession sessConsumerRed = localSession;
+         sessConsumerRed.createTemporaryQueue(address, queueRed, "color='red'");
+         MyHandler redHandler = new MyHandler(sessConsumerRed, "red", msgs);
+         ClientConsumer redClientConsumer = sessConsumerRed.createConsumer(queueRed);
+         redClientConsumer.setMessageHandler(redHandler);
+         //sessConsumerRed.start();
+         
+         //ClientSession sessConsumerBlue = clientsConnecton.createSession();
+         ClientSession sessConsumerBlue = localSession;
+         sessConsumerBlue.createTemporaryQueue(address, queueBlue, "color='blue'");
+         MyHandler blueHandler = new MyHandler(sessConsumerBlue, "blue", msgs);
+         ClientConsumer blueClientConsumer = sessConsumerBlue.createConsumer(queueBlue);
+         blueClientConsumer.setMessageHandler(blueHandler);
+         //sessConsumerBlue.start();
+         
+         try
+         {
+            ClientMessage msgBlue = session.createMessage(false);
+            msgBlue.putStringProperty("color", "blue");
+
+            ClientMessage msgRed = session.createMessage(false);
+            msgRed.putStringProperty("color", "red");
+
+            for (int nmsg = 0; nmsg < msgs; nmsg++)
+            {
+               prod.send(msgBlue);
+               
+               prod.send(msgRed);
+               
+               session.commit();
+            }
+            
+            blueHandler.waitCompletion();
+            redHandler.waitCompletion();
+            
+            assertEquals(0, errors.get());
+            
+         }
+         finally
+         {
+//            sessConsumerRed.close();
+//            sessConsumerBlue.close();
+            localSession.close();
+            clientsConnecton.close();
+         }
+      }
+       
+   }
+
    public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
    {
       session.close();



More information about the hornetq-commits mailing list