Author: clebert.suconic(a)jboss.com
Date: 2010-09-08 15:40:33 -0400 (Wed, 08 Sep 2010)
New Revision: 9657
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java
branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
HORNETQ-502 Fixing out of order during paging and transactions
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-08
19:17:54 UTC (rev 9656)
+++
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-08
19:40:33 UTC (rev 9657)
@@ -613,10 +613,18 @@
{
Transaction tx = context.getTransaction();
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-
- if (!depage && message.storeIsPaging())
+
+ // if the TX paged at least one message on a give address, all the other
addresses should also go towards paging cache now
+ boolean alreadyPaging = false;
+
+ if (tx.isPaging())
{
-
+ alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
+ }
+
+ if (!depage && message.storeIsPaging() || alreadyPaging)
+ {
+ tx.setPaging(true);
getPageOperation(tx).addMessageToPage(message);
if (startedTx)
{
@@ -1104,12 +1112,20 @@
{
private final List<ServerMessage> messagesToPage = new
ArrayList<ServerMessage>();
+ private final HashSet<SimpleString> addressesPaging = new
HashSet<SimpleString>();
+
private Transaction subTX = null;
void addMessageToPage(final ServerMessage message)
{
messagesToPage.add(message);
+ addressesPaging.add(message.getAddress());
}
+
+ boolean isPaging(final SimpleString address)
+ {
+ return addressesPaging.contains(address);
+ }
public void afterCommit(final Transaction tx)
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java 2010-09-08
19:17:54 UTC (rev 9656)
+++ branches/Branch_2_1/src/main/org/hornetq/core/transaction/Transaction.java 2010-09-08
19:40:33 UTC (rev 9657)
@@ -60,6 +60,12 @@
void removeOperation(TransactionOperation sync);
boolean hasTimedOut(long currentTime, int defaultTimeout);
+
+ /** We don't want to look on operations at every send, so we keep the paging
attribute and will only look at
+ * the PagingOperation case this attribute is true*/
+ boolean isPaging();
+
+ void setPaging(boolean paging);
void putProperty(int index, Object property);
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
---
branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-09-08
19:17:54 UTC (rev 9656)
+++
branches/Branch_2_1/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-09-08
19:40:33 UTC (rev 9657)
@@ -47,6 +47,8 @@
private final Xid xid;
private final long id;
+
+ private boolean paging = false;
private volatile State state = State.ACTIVE;
@@ -352,7 +354,17 @@
{
this.state = state;
}
+
+ public boolean isPaging()
+ {
+ return paging;
+ }
+ public void setPaging(boolean paging)
+ {
+ this.paging = paging;
+ }
+
public Xid getXid()
{
return xid;
Modified:
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-08
19:17:54 UTC (rev 9656)
+++
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-08
19:40:33 UTC (rev 9657)
@@ -685,6 +685,133 @@
}
+ public void testDepageDuringTransaction3() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024; // 1k
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ byte[] body = new byte[messageSize];
+
+ ClientSession sessionTransacted = sf.createSession(null, null, false, false,
false, false, 0);
+ ClientProducer producerTransacted =
sessionTransacted.createProducer(PagingTest.ADDRESS);
+
+ ClientSession sessionNonTX = sf.createSession(true, true, 0);
+ sessionNonTX.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ ClientProducer producerNonTransacted =
sessionNonTX.createProducer(PagingTest.ADDRESS);
+
+ sessionNonTX.start();
+
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("Sending " + i);
+ ClientMessage message = sessionNonTX.createMessage(true);
+ message.getBodyBuffer().writeBytes(body);
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producerTransacted.send(message);
+
+ if (i % 2 == 0)
+ {
+ System.out.println("Sending 20 msgs to make it page");
+ for (int j = 0 ; j < 20; j++)
+ {
+ ClientMessage msgSend = sessionNonTX.createMessage(true);
+ msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
+ producerNonTransacted.send(msgSend);
+ }
+
assertTrue(server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+ }
+ else
+ {
+ System.out.println("Consuming 20 msgs to make it page");
+ ClientConsumer consumer =
sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ for (int j = 0 ; j < 20; j++)
+ {
+ ClientMessage msgReceived = consumer.receive(10000);
+ assertNotNull(msgReceived);
+ msgReceived.acknowledge();
+ }
+ consumer.close();
+ }
+ }
+
+ ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+ while (true)
+ {
+ ClientMessage msgReceived = consumerNonTX.receive(1000);
+ if (msgReceived == null)
+ {
+ break;
+ }
+ msgReceived.acknowledge();
+ }
+ consumerNonTX.close();
+
+
+ ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ sessionTransacted.commit();
+
+ sessionTransacted.close();
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage message = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message);
+
+ Integer messageID = (Integer)message.getObjectProperty(new
SimpleString("id"));
+
+ // System.out.println(messageID);
+ Assert.assertNotNull(messageID);
+ Assert.assertEquals("message received out of order", i,
messageID.intValue());
+
+ System.out.println("MessageID = " + messageID);
+
+ message.acknowledge();
+ }
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ sessionNonTX.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
Modified:
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-09-08
19:17:54 UTC (rev 9656)
+++
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-09-08
19:40:33 UTC (rev 9657)
@@ -321,6 +321,24 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#isPaging()
+ */
+ public boolean isPaging()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.transaction.Transaction#setPaging(boolean)
+ */
+ public void setPaging(boolean paging)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeMessage implements ServerMessage