Author: clebert.suconic(a)jboss.com
Date: 2011-07-29 18:39:12 -0400 (Fri, 29 Jul 2011)
New Revision: 11072
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Improvements on getQueueCount
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-07-29
18:35:41 UTC (rev 11071)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-07-29
22:39:12 UTC (rev 11072)
@@ -402,7 +402,7 @@
directDeliver = false;
- executor.execute(concurrentPoller);
+ getExecutor().execute(concurrentPoller);
}
public void forceDelivery()
@@ -921,9 +921,31 @@
public long getMessagesAdded()
{
- blockOnExecutorFuture();
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicLong count = new AtomicLong(0);
- return getInstantMessagesAdded();
+ getExecutor().execute(new Runnable()
+ {
+ public void run()
+ {
+ count.set(getInstantMessagesAdded());
+ latch.countDown();
+ }
+ });
+
+ try
+ {
+ if (!latch.await(10, TimeUnit.SECONDS))
+ {
+ throw new IllegalStateException("Timed out on waiting for
MessagesAdded");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+
+ return count.get();
}
public long getInstantMessagesAdded()
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-29
18:35:41 UTC (rev 11071)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-07-29
22:39:12 UTC (rev 11072)
@@ -24,8 +24,10 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import javax.ejb.FinderException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -1274,6 +1276,40 @@
{
bb.put(getSamplebyte(j));
}
+
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ class TCount extends Thread
+ {
+ Queue queue;
+
+ TCount(Queue queue)
+ {
+ this.queue = queue;
+ }
+ public void run()
+ {
+ try
+ {
+ while (running.get())
+ {
+ // log.info("Message count = " + queue.getMessageCount() +
" on queue " + queue.getName());
+ queue.getMessagesAdded();
+ queue.getMessageCount();
+ //log.info("Message added = " + queue.getMessagesAdded() +
" on queue " + queue.getName());
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.info("Thread interrupted");
+ }
+ }
+ };
+
+ TCount tcount1 = null;
+ TCount tcount2 = null;
+
try
{
@@ -1300,7 +1336,8 @@
session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS +
"-2", null, true);
}
-
+
+
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
@@ -1309,6 +1346,7 @@
{
if (i % 500 == 0)
{
+ log.info("Sent " + i + " messages");
session.commit();
}
message = session.createMessage(true);
@@ -1338,7 +1376,24 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
+
+ Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
+
+ Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
+
+ assertNotNull(queue1);
+
+ assertNotNull(queue2);
+
+ assertNotSame(queue1, queue2);
+ tcount1 = new TCount(queue1);
+
+ tcount2 = new TCount(queue2);
+
+ tcount1.start();
+ tcount2.start();
+
ServerLocator locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = locator.createSessionFactory();
@@ -1375,8 +1430,14 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0)
+ if (i % 100 == 0)
+ {
+ if (i % 5000 == 0)
+ {
+ log.info(addressToSubscribe + " consumed " + i +
" messages");
+ }
session.commit();
+ }
try
{
@@ -1437,6 +1498,20 @@
}
finally
{
+ running.set(false);
+
+ if (tcount1 != null)
+ {
+ tcount1.interrupt();
+ tcount1.join();
+ }
+
+ if (tcount2 != null)
+ {
+ tcount2.interrupt();
+ tcount2.join();
+ }
+
try
{
server.stop();