Author: clebert.suconic(a)jboss.com
Date: 2009-11-19 23:00:56 -0500 (Thu, 19 Nov 2009)
New Revision: 8332
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing ordering on paging
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-20
03:01:51 UTC (rev 8331)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-20
04:00:56 UTC (rev 8332)
@@ -919,6 +919,8 @@
for (PagedMessage pagedMessage : pagedMessages)
{
ServerMessage message = pagedMessage.getMessage(storageManager);
+
+ System.out.println("Depaged id = " +
message.getIntProperty("id"));
if (message.isLargeMessage())
{
@@ -1008,9 +1010,10 @@
}
depageTransaction.commit();
+
+ // TODO: If we implement ordering on AIO, we won't need to block here
+ storageManager.waitOnOperations();
- storageManager.completeOperations();
-
if (isTrace)
{
trace("Depage committed, running = " + running);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-20
03:01:51 UTC (rev 8331)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-20
04:00:56 UTC (rev 8332)
@@ -44,9 +44,9 @@
private List<IOAsyncTask> tasks;
- private int linedup = 0;
+ private int storeLinedUp = 0;
- private int replicated = 0;
+ private int stored = 0;
private boolean empty = false;
@@ -63,12 +63,12 @@
/** To be called by the replication manager, when new replication is added to the
queue */
public void linedUp()
{
- linedup++;
+ storeLinedUp++;
}
public boolean hasData()
{
- return linedup > 0;
+ return storeLinedUp > 0;
}
/** You may have several actions to be done after a replication operation is
completed. */
@@ -90,10 +90,10 @@
tasks.add(completion);
}
- /** To be called by the replication manager, when data is confirmed on the channel */
+ /** To be called by the storage manager, when data is confirmed on the channel */
public synchronized void done()
{
- if (++replicated == linedup && complete)
+ if (++stored == storeLinedUp && complete)
{
flush();
}
@@ -106,7 +106,7 @@
{
tlContext.set(null);
complete = true;
- if (replicated == linedup && complete)
+ if (stored == storeLinedUp && complete)
{
flush();
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-20
03:01:51 UTC (rev 8331)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-20
04:00:56 UTC (rev 8332)
@@ -163,8 +163,7 @@
assertNotNull(message2);
- // TODO: AIO doesn't support ordering ATM
-// assertEquals(i, ((Integer)message2.getObjectProperty(new
SimpleString("id"))).intValue());
+ assertEquals(i, message2.getIntProperty("id").intValue());
message2.acknowledge();