[hornetq-commits] JBoss hornetq SVN: r10137 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jan 24 20:09:51 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-01-24 20:09:51 -0500 (Mon, 24 Jan 2011)
New Revision: 10137
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
Log:
HORNETQ-628 - fix for paging counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-01-25 01:09:51 UTC (rev 10137)
@@ -102,8 +102,6 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
-
- private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -178,7 +176,7 @@
public long getMessageCount()
{
- return counter.getValue() - deliveredCount.get();
+ return counter.getValue();
}
public PageSubscriptionCounter getCounter()
@@ -969,7 +967,6 @@
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
- cursor.deliveredCount.decrementAndGet();
}
}
@@ -1206,7 +1203,6 @@
{
if (!isredelivery)
{
- deliveredCount.incrementAndGet();
PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-01-25 01:09:51 UTC (rev 10137)
@@ -662,11 +662,11 @@
{
if (pageSubscription != null)
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount() + pageSubscription.getMessageCount();
+ return messageReferences.size() + getScheduledCount() + pageSubscription.getMessageCount();
}
else
{
- return messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ return messageReferences.size() + getScheduledCount();
}
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-24 20:40:55 UTC (rev 10136)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingOrderTest.java 2011-01-25 01:09:51 UTC (rev 10137)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.SimpleString;
@@ -28,14 +29,20 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
- * A PagingOrderTest
+ * A PagingOrderTest.
+ *
+ * PagingTest has a lot of tests already. I decided to create a newer one more specialized on Ordering and counters
*
* @author clebertsuconic
*
@@ -177,12 +184,10 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
if (i < 100)
{
- System.out.println("Acking " + i);
// Do not consume the last one so we could restart
message.acknowledge();
}
@@ -211,7 +216,6 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
@@ -235,6 +239,305 @@
}
+ public void testPageCounter() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerLocator sl = createInVMNonHALocator();
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession sess = sf.createSession(true, true, 0);
+ sess.start();
+ ClientConsumer cons = sess.createConsumer(ADDRESS);
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ msg.acknowledge();
+ }
+
+ assertNull(cons.receiveImmediate());
+ sess.close();
+ sl.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ t1.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 20 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ t1.join();
+
+ assertEquals(0, errors.get());
+
+ assertEquals(numberOfMessages, q2.getMessageCount());
+ assertEquals(0, q1.getMessageCount());
+
+
+ session.close();
+ sf.close();
+ locator.close();
+
+ server.stop();
+
+
+ server.start();
+
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(ADDRESS);
+
+ q1 = null;
+ q2 = null;
+
+ for (Binding bind : bindings.getBindings())
+ {
+ if (bind instanceof LocalQueueBinding)
+ {
+ LocalQueueBinding qb = (LocalQueueBinding)bind;
+ if (qb.getQueue().getName().equals(ADDRESS))
+ {
+ q1 = qb.getQueue();
+ }
+
+ if (qb.getQueue().getName().equals(new SimpleString("inactive")))
+ {
+ q2 = qb.getQueue();
+ }
+ }
+ }
+
+ assertNotNull(q1);
+
+ assertNotNull(q2);
+
+
+ assertEquals(numberOfMessages, q2.getMessageCount());
+ assertEquals(0, q1.getMessageCount());
+
+
+
+
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testPageCounter2() throws Throwable
+ {
+ boolean persistentMessages = true;
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 500;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(1000);
+ locator.setConnectionTTL(2000);
+ locator.setReconnectAttempts(0);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+ locator.setConsumerWindowSize(1024 * 1024);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ Queue q1 = server.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ Queue q2 = server.createQueue(ADDRESS, new SimpleString("inactive"), null, true, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t1 = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ServerLocator sl = createInVMNonHALocator();
+ ClientSessionFactory sf = sl.createSessionFactory();
+ ClientSession sess = sf.createSession(true, true, 0);
+ sess.start();
+ ClientConsumer cons = sess.createConsumer(ADDRESS);
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage msg = cons.receive(5000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ msg.acknowledge();
+ }
+ sess.close();
+ sl.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 20 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ t1.start();
+ t1.join();
+
+ assertEquals(0, errors.get());
+
+ assertEquals(numberOfMessages, q2.getMessageCount());
+ assertEquals(numberOfMessages - 10, q1.getMessageCount());
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testOrderOverRollback() throws Throwable
{
boolean persistentMessages = true;
@@ -302,8 +605,6 @@
session.close();
- System.out.println("number of refs " + queue.getNumberOfReferences());
-
session = sf.createSession(false, false, 0);
session.start();
@@ -314,7 +615,6 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
@@ -333,7 +633,6 @@
{
ClientMessage message = consumer.receive(5000);
assertNotNull(message);
- System.out.println("msg = " + message.getIntProperty("id"));
assertEquals(i, message.getIntProperty("id").intValue());
message.acknowledge();
}
More information about the hornetq-commits
mailing list