Author: clebert.suconic(a)jboss.com
Date: 2011-06-07 15:36:02 -0400 (Tue, 07 Jun 2011)
New Revision: 10784
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-6646 - performance issue on paging - avoiding non-persistence page transactions
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -53,9 +53,9 @@
// To be used after the update was stored or reload
void onUpdate(int update, StorageManager storageManager, PagingManager
pagingManager);
- void increment();
+ void increment(boolean persistent);
- void increment(int size);
+ void increment(int durableSize, int nonDurableSize);
int getNumberOfMessages();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -59,6 +59,8 @@
private AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private AtomicInteger numberOfPersistentMessages = new AtomicInteger(0);
+
private List<Pair<PageSubscription, PagePosition>> lateDeliveries;
// Static --------------------------------------------------------
@@ -110,14 +112,19 @@
}
}
- public void increment()
+ public void increment(final boolean persistent)
{
+ if (persistent)
+ {
+ numberOfPersistentMessages.incrementAndGet();
+ }
numberOfMessages.incrementAndGet();
}
- public void increment(final int size)
+ public void increment(final int durableSize, final int nonDurableSize)
{
- numberOfMessages.addAndGet(size);
+ numberOfPersistentMessages.addAndGet(durableSize);
+ numberOfMessages.addAndGet(durableSize + nonDurableSize);
}
public int getNumberOfMessages()
@@ -131,13 +138,14 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
+ numberOfPersistentMessages.set(numberOfMessages.get());
committed = true;
}
public synchronized void encode(final HornetQBuffer buffer)
{
buffer.writeLong(transactionID);
- buffer.writeInt(numberOfMessages.get());
+ buffer.writeInt(numberOfPersistentMessages.get());
}
public synchronized int getEncodeSize()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -938,7 +938,7 @@
}
pgOper.addStore(this);
- pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
+ pgOper.pageTransaction.increment(listCtx.getNumberOfDurableQueues(),
listCtx.getNumberOfNonDurableQueues());
return;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/RouteContextList.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -25,8 +25,10 @@
public interface RouteContextList
{
- int getNumberOfQueues();
+ int getNumberOfNonDurableQueues();
+ int getNumberOfDurableQueues();
+
List<Queue> getDurableQueues();
List<Queue> getNonDurableQueues();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -125,10 +125,15 @@
private List<Queue> nonDurableQueue = new ArrayList<Queue>(1);
- public int getNumberOfQueues()
+ public int getNumberOfDurableQueues()
{
- return durableQueue.size() + nonDurableQueue.size();
+ return durableQueue.size();
}
+
+ public int getNumberOfNonDurableQueues()
+ {
+ return nonDurableQueue.size();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.RouteContextList#getDurableQueues()
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -419,7 +419,7 @@
}
session.commit();
session.close();
-
+
session = null;
sf.close();
@@ -459,16 +459,16 @@
fail("Didn't receive a message");
}
msg.acknowledge();
-
+
if (msgCount % 5 == 0)
{
log.info("commit");
sessionConsumer.commit();
}
}
-
+
sessionConsumer.commit();
-
+
sessionConsumer.close();
sf.close();
@@ -1205,6 +1205,189 @@
}
+ public void testMultiQueuesNonPersistentAndPersistent() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 3000;
+
+ final byte[] body = new byte[messageSize];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= messageSize; j++)
+ {
+ bb.put(getSamplebyte(j));
+ }
+
+ try
+ {
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS +
"-1", null, true);
+
+ session.createQueue(PagingTest.ADDRESS.toString(), PagingTest.ADDRESS +
"-2", null, false);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ if (i % 500 == 0)
+ {
+ session.commit();
+ }
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+
+ sf.close();
+ locator.close();
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ final ClientSessionFactory sf2 = locator.createSessionFactory();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSession session = sf2.createSession(null, null, false, true,
true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS +
"-1");
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 =
consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ Assert.assertEquals(i,
message2.getIntProperty("id").intValue());
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+
+ if (i % 1000 == 0)
+ session.commit();
+
+ try
+ {
+ assertBodiesEqual(body, message2.getBodyBuffer());
+ }
+ catch (AssertionFailedError e)
+ {
+ PagingTest.log.info("Expected buffer:" +
UnitTestCase.dumbBytesHex(body, 40));
+ PagingTest.log.info("Arriving buffer:" +
UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+
.toByteBuffer()
+
.array(), 40));
+ throw e;
+ }
+ }
+
+ session.commit();
+
+ consumer.close();
+
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+
+ t.start();
+ t.join();
+
+
+ assertEquals(0, errors.get());
+
+ for (int i = 0; i < 20 &&
server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging(); i++)
+ {
+ // The delete may be asynchronous, giving some time case it eventually happen
asynchronously
+ Thread.sleep(500);
+ }
+
+ assertFalse
(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
+
+
+ for (int i = 0; i < 20 &&
server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+ {
+ // The delete may be asynchronous, giving some time case it eventually happen
asynchronously
+ Thread.sleep(500);
+ }
+
+ assertEquals(0,
server.getPostOffice().getPagingManager().getTransactions().size());
+
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internaltestSendReceivePaging(final boolean persistentMessages) throws
Exception
{
@@ -1915,7 +2098,7 @@
}
}
}
-
+
public void testOrderingNonTX() throws Exception
{
clearData();
@@ -2024,7 +2207,7 @@
{
log.info("###### different");
}
- //assertEquals(i, msg.getIntProperty("count").intValue());
+ // assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
}
@@ -2953,7 +3136,7 @@
catch (Throwable ignored)
{
}
-
+
OperationContextImpl.clearContext();
}
@@ -3676,7 +3859,7 @@
}
}
}
-
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -3723,18 +3906,17 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
for (int i = 0; i < 100; i++)
{
log.debug("send message #" + i);
ClientMessage message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
-
+
message.setBodyInputStream(createFakeLargeStream(messageSize));
producer.send(message);
-
+
if ((i + 1) % 2 == 0)
{
session.commit();
@@ -3746,27 +3928,27 @@
session.start();
ClientConsumer cons = session.createConsumer(ADDRESS);
-
- for (int msgNr = 0 ; msgNr < 2; msgNr++)
+
+ for (int msgNr = 0; msgNr < 2; msgNr++)
{
- for (int i = 0 ; i < 5; i++)
+ for (int i = 0; i < 5; i++)
{
ClientMessage msg = cons.receive(5000);
-
+
assertNotNull(msg);
-
+
msg.acknowledge();
-
+
assertEquals("str" + msgNr,
msg.getStringProperty("id"));
-
+
for (int j = 0; j < messageSize; j++)
{
assertEquals(getSamplebyte(j), msg.getBodyBuffer().readByte());
}
-
+
session.rollback();
}
-
+
pgStoreDLA.startPaging();
}
@@ -3776,9 +3958,9 @@
ClientMessage message = cons.receive(5000);
assertNotNull("Message " + i + " wasn't received",
message);
message.acknowledge();
-
+
final AtomicInteger bytesOutput = new AtomicInteger(0);
-
+
message.setOutputStream(new OutputStream()
{
@Override
@@ -3800,41 +3982,42 @@
{
log.info("output bytes = " + bytesOutput);
log.info(threadDump("dump"));
- fail("Couldn't finish large message receiving for id=" +
- message.getStringProperty("id") + " with
messageID=" + message.getMessageID());
+ fail("Couldn't finish large message receiving for id=" +
message.getStringProperty("id") +
+ " with messageID=" +
+ message.getMessageID());
}
}
-
+
assertNull(cons.receiveImmediate());
cons.close();
-
+
cons = session.createConsumer("DLA");
-
- for (int i = 0 ; i < 2; i++)
+
+ for (int i = 0; i < 2; i++)
{
assertNotNull(cons.receive(5000));
}
-
+
sf.close();
-
+
session.close();
-
+
locator.close();
-
+
server.stop();
-
+
server.start();
-
+
locator = createInVMNonHALocator();
-
+
sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, false);
-
+
session.start();
-
+
cons = session.createConsumer(ADDRESS);
for (int i = 2; i < 100; i++)
@@ -3842,7 +4025,7 @@
log.debug("Received message " + i);
ClientMessage message = cons.receive(5000);
assertNotNull(message);
-
+
assertEquals("str" + i,
message.getStringProperty("id"));
message.acknowledge();
@@ -3855,53 +4038,53 @@
}
});
-
+
assertTrue(message.waitOutputStreamCompletion(5000));
}
-
+
assertNull(cons.receiveImmediate());
-
+
cons.close();
-
+
cons = session.createConsumer("DLA");
- for (int msgNr = 0 ; msgNr < 2; msgNr++)
+ for (int msgNr = 0; msgNr < 2; msgNr++)
{
ClientMessage msg = cons.receive(10000);
assertNotNull(msg);
-
+
assertEquals("str" + msgNr,
msg.getStringProperty("id"));
for (int i = 0; i < messageSize; i++)
{
assertEquals(getSamplebyte(i), msg.getBodyBuffer().readByte());
}
-
+
msg.acknowledge();
}
-
+
cons.close();
-
+
cons = session.createConsumer(ADDRESS);
-
+
session.commit();
-
+
assertNull(cons.receiveImmediate());
-
+
long timeout = System.currentTimeMillis() + 5000;
-
+
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
+
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
-
+
pgStoreAddress.getCursorProvier().cleanup();
-
+
while (timeout > System.currentTimeMillis() &&
pgStoreAddress.isPaging())
{
Thread.sleep(50);
}
-
+
assertFalse(pgStoreAddress.isPaging());
session.commit();
@@ -3969,11 +4152,12 @@
for (int i = 0; i < 500; i++)
{
- if (i % 100 == 0) log.info("send message #" + i);
+ if (i % 100 == 0)
+ log.info("send message #" + i);
message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
-
+
message.setExpiration(System.currentTimeMillis() + 2000);
if (i % 2 == 0)
@@ -3983,7 +4167,7 @@
else
{
byte bytes[] = new byte[messageSize];
- for (int s = 0 ; s < bytes.length; s++)
+ for (int s = 0; s < bytes.length; s++)
{
bytes[s] = getSamplebyte(s);
}
@@ -3991,7 +4175,7 @@
}
producer.send(message);
-
+
if ((i + 1) % 2 == 0)
{
session.commit();
@@ -4003,30 +4187,29 @@
}
session.commit();
-
+
sf.close();
-
+
locator.close();
-
+
server.stop();
-
+
Thread.sleep(3000);
-
+
server.start();
-
+
locator = createInVMNonHALocator();
-
+
sf = locator.createSessionFactory();
-
+
session = sf.createSession(false, false);
-
+
session.start();
-
+
ClientConsumer consAddr = session.createConsumer(ADDRESS);
-
+
assertNull(consAddr.receive(1000));
-
-
+
ClientConsumer cons = session.createConsumer("DLA");
for (int i = 0; i < 500; i++)
@@ -4045,22 +4228,22 @@
}
});
}
-
+
assertNull(cons.receiveImmediate());
-
+
session.commit();
-
+
cons.close();
-
+
long timeout = System.currentTimeMillis() + 5000;
-
+
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
+
while (timeout > System.currentTimeMillis() &&
pgStoreAddress.isPaging())
{
Thread.sleep(50);
}
-
+
assertFalse(pgStoreAddress.isPaging());
session.close();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-06-07
18:16:11 UTC (rev 10783)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-06-07
19:36:02 UTC (rev 10784)
@@ -113,7 +113,7 @@
for (int i = 0; i < nr1; i++)
{
- trans.increment();
+ trans.increment(true);
}
Assert.assertEquals(nr1, trans.getNumberOfMessages());