[hornetq-commits] JBoss hornetq SVN: r11072 - in branches/Branch_2_2_EAP_cluster_clean2: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 29 18:39:12 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-07-29 18:39:12 -0400 (Fri, 29 Jul 2011)
New Revision: 11072

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Improvements on getQueueCount

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-07-29 18:35:41 UTC (rev 11071)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-07-29 22:39:12 UTC (rev 11072)
@@ -402,7 +402,7 @@
 
       directDeliver = false;
 
-      executor.execute(concurrentPoller);
+      getExecutor().execute(concurrentPoller);
    }
 
    public void forceDelivery()
@@ -921,9 +921,31 @@
 
    public long getMessagesAdded()
    {
-      blockOnExecutorFuture();
+      final CountDownLatch latch = new CountDownLatch(1);
+      final AtomicLong count = new AtomicLong(0);
       
-      return getInstantMessagesAdded();
+      getExecutor().execute(new Runnable()
+      {
+         public void run()
+         {
+            count.set(getInstantMessagesAdded());
+            latch.countDown();
+         }
+      });
+      
+      try
+      {
+         if (!latch.await(10, TimeUnit.SECONDS))
+         {
+            throw new IllegalStateException("Timed out on waiting for MessagesAdded");
+         }
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
+      
+      return count.get();
   }
    
    public long getInstantMessagesAdded()

Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-07-29 18:35:41 UTC (rev 11071)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-07-29 22:39:12 UTC (rev 11072)
@@ -24,8 +24,10 @@
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.ejb.FinderException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
@@ -1274,6 +1276,40 @@
       {
          bb.put(getSamplebyte(j));
       }
+      
+      final AtomicBoolean running = new AtomicBoolean(true);
+      
+      class TCount extends Thread
+      {
+         Queue queue;
+         
+         TCount(Queue queue)
+         {
+            this.queue = queue;
+         }
+         public void run()
+         {
+            try
+            {
+               while (running.get())
+               {
+                 // log.info("Message count = " + queue.getMessageCount() + " on queue " + queue.getName());
+                  queue.getMessagesAdded();
+                  queue.getMessageCount();
+                  //log.info("Message added = " + queue.getMessagesAdded() + " on queue " + queue.getName());
+                  Thread.sleep(10);
+               }
+            }
+            catch (InterruptedException e)
+            {
+               log.info("Thread interrupted");
+            }
+         }
+      };
+      
+      TCount tcount1 = null;
+      TCount tcount2 = null;
+      
 
       try
       {
@@ -1300,7 +1336,8 @@
 
                session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS + "-2", null, true);
             }
-
+            
+            
             ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
 
             ClientMessage message = null;
@@ -1309,6 +1346,7 @@
             {
                if (i % 500 == 0)
                {
+                  log.info("Sent " + i + " messages");
                   session.commit();
                }
                message = session.createMessage(true);
@@ -1338,7 +1376,24 @@
                                PagingTest.PAGE_MAX,
                                new HashMap<String, AddressSettings>());
          server.start();
+         
+         Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
+         
+         Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
+         
+         assertNotNull(queue1);
+         
+         assertNotNull(queue2);
+         
+         assertNotSame(queue1, queue2);
 
+         tcount1 = new TCount(queue1);
+         
+         tcount2 = new TCount(queue2);
+         
+         tcount1.start();
+         tcount2.start();
+
          ServerLocator locator = createInVMNonHALocator();
          final ClientSessionFactory sf2 = locator.createSessionFactory();
 
@@ -1375,8 +1430,14 @@
 
                         Assert.assertNotNull(message2);
 
-                        if (i % 1000 == 0)
+                        if (i % 100 == 0)
+                        {
+                           if (i % 5000 == 0)
+                           {
+                              log.info(addressToSubscribe + " consumed " + i + " messages");
+                           }
                            session.commit();
+                        }
 
                         try
                         {
@@ -1437,6 +1498,20 @@
       }
       finally
       {
+         running.set(false);
+         
+         if (tcount1 != null)
+         {
+            tcount1.interrupt();
+            tcount1.join();
+         }
+         
+         if (tcount2 != null)
+         {
+            tcount2.interrupt();
+            tcount2.join();
+         }
+         
          try
          {
             server.stop();



More information about the hornetq-commits mailing list