[hornetq-commits] JBoss hornetq SVN: r10185 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Feb 7 17:10:54 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-07 17:10:53 -0500 (Mon, 07 Feb 2011)
New Revision: 10185
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
Log:
Fixing tests on paging and transactions
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -29,6 +29,8 @@
boolean isCommit();
boolean isRollback();
+
+ void setCommitted(boolean committed);
void commit();
@@ -44,6 +46,8 @@
void storeUpdate(StorageManager storageManager, PagingManager pagingManager, Transaction tx) throws Exception;
+ void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception;
+
void storeUpdate(StorageManager storageManager, PagingManager pagingManager) throws Exception;
// To be used after the update was stored or reload
@@ -63,4 +67,5 @@
* @return true if the message will be delivered later, false if it should be delivered right away
*/
boolean deliverAfterCommit(PageSubscription cursor, PagePosition cursorPos);
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -87,9 +87,12 @@
/**
* To be called when the cursor decided to ignore a position.
+ *
* @param position
*/
void positionIgnored(PagePosition position);
+
+ void lateDeliveryRollback(PagePosition position);
/**
* To be used to avoid a redelivery of a prepared ACK after load
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -73,7 +73,7 @@
private static void trace(final String message)
{
- PageSubscriptionImpl.log.trace(message);
+ PageSubscriptionImpl.log.trace(message);
}
private volatile boolean autoCleanup = true;
@@ -99,9 +99,9 @@
private List<PagePosition> recoveredACK;
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
-
+
private final PageSubscriptionCounter counter;
-
+
private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
@@ -178,8 +178,6 @@
confirmPosition(position);
}
-
-
public long getMessageCount()
{
return counter.getValue() - deliveredCount.get();
@@ -189,7 +187,7 @@
{
return counter;
}
-
+
public void scheduleCleanupCheck()
{
if (autoCleanup)
@@ -278,7 +276,7 @@
synchronized (PageSubscriptionImpl.this)
{
for (PageCursorInfo completePage : completedPages)
- {
+ {
if (isTrace)
{
PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
@@ -423,7 +421,7 @@
public void ackTx(final Transaction tx, final PagedReference reference) throws Exception
{
confirmPosition(tx, reference.getPosition());
-
+
counter.increment(tx, -1);
PageTransactionInfo txInfo = getPageTransaction(reference);
@@ -480,7 +478,7 @@
return consumedPages.firstKey();
}
}
-
+
public void addPendingDelivery(final PagePosition position)
{
getPageInfo(position).incrementPendingTX();
@@ -551,6 +549,13 @@
processACK(position);
}
+
+ public void lateDeliveryRollback(PagePosition position)
+ {
+ PageCursorInfo cursorInfo = processACK(position);
+ cursorInfo.decrementPendingTX();
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#isComplete(long)
*/
@@ -753,7 +758,7 @@
// To be called only after the ACK has been processed and guaranteed to be on storae
// The only exception is on non storage events such as not matching messages
- private void processACK(final PagePosition pos)
+ private PageCursorInfo processACK(final PagePosition pos)
{
if (lastAckedPosition == null || pos.compareTo(lastAckedPosition) > 0)
{
@@ -770,6 +775,8 @@
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
+
+ return info;
}
/**
@@ -783,7 +790,7 @@
// It needs to persist, otherwise the cursor will return to the fist page position
tx.setContainsPersistent();
}
-
+
getPageInfo(position).remove(position);
PageCursorTX cursorTX = (PageCursorTX)tx.getProperty(TransactionPropertyIndexes.PAGE_CURSOR_POSITIONS);
@@ -846,7 +853,7 @@
// The page was live at the time of the creation
private final boolean wasLive;
-
+
// There's a pending TX to add elements on this page
private AtomicInteger pendingTX = new AtomicInteger(0);
@@ -902,12 +909,12 @@
{
return pageId;
}
-
+
public void incrementPendingTX()
{
pendingTX.incrementAndGet();
}
-
+
public void decrementPendingTX()
{
pendingTX.decrementAndGet();
@@ -932,12 +939,13 @@
if (isTrace)
{
PageSubscriptionImpl.trace("numberOfMessages = " + getNumberOfMessages() +
- " confirmed = " +
- (confirmed.get() + 1) +
- ", page = " +
- pageId);
+ " confirmed = " +
+ (confirmed.get() + 1) +
+ " pendingTX = " + pendingTX +
+ ", page = " +
+ pageId);
}
-
+
// Negative could mean a bookmark on the first element for the page (example -1)
if (posACK.getMessageNr() >= 0)
{
@@ -1018,7 +1026,7 @@
}
}
-
+
/* (non-Javadoc)
* @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
*/
@@ -1026,7 +1034,6 @@
{
return Collections.emptyList();
}
-
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -172,6 +172,25 @@
*/
public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception
{
+ internalUpdatePageManager(storageManager, pagingManager, tx, 1);
+ }
+
+ public void reloadUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx, final int increment) throws Exception
+ {
+ UpdatePageTXOperation updt = internalUpdatePageManager(storageManager, pagingManager, tx, increment);
+ updt.setStored();
+ }
+
+ /**
+ * @param storageManager
+ * @param pagingManager
+ * @param tx
+ */
+ protected UpdatePageTXOperation internalUpdatePageManager(final StorageManager storageManager,
+ final PagingManager pagingManager,
+ final Transaction tx,
+ final int increment)
+ {
UpdatePageTXOperation pgtxUpdate = (UpdatePageTXOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION_UPDATE);
if (pgtxUpdate == null)
@@ -183,7 +202,9 @@
tx.setContainsPersistent();
- pgtxUpdate.addUpdate(this);
+ pgtxUpdate.addUpdate(this, increment);
+
+ return pgtxUpdate;
}
public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager) throws Exception
@@ -213,6 +234,11 @@
{
return committed;
}
+
+ public void setCommitted(final boolean committed)
+ {
+ this.committed = committed;
+ }
public boolean isRollback()
{
@@ -228,8 +254,9 @@
{
for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
{
- pos.a.positionIgnored(pos.b);
+ pos.a.lateDeliveryRollback(pos.b);
}
+ lateDeliveries = null;
}
}
@@ -303,8 +330,13 @@
this.pagingManager = pagingManager;
}
- public void addUpdate(PageTransactionInfo info)
+ public void setStored()
{
+ stored = true;
+ }
+
+ public void addUpdate(final PageTransactionInfo info, final int increment)
+ {
AtomicInteger counter = countsToUpdate.get(info);
if (counter == null)
@@ -313,7 +345,7 @@
countsToUpdate.put(info, counter);
}
- counter.incrementAndGet();
+ counter.addAndGet(increment);
}
public void beforePrepare(Transaction tx) throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -1664,16 +1664,27 @@
}
case PAGE_TRANSACTION:
{
+
PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
pageTransactionInfo.decode(buff);
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+ if (record.isUpdate)
+ {
+ PageTransactionInfo pgTX = pagingManager.getTransaction(pageTransactionInfo.getTransactionID());
+ pgTX.reloadUpdate(this, pagingManager, tx, pageTransactionInfo.getNumberOfMessages());
+ }
+ else
+ {
+ pageTransactionInfo.setCommitted(false);
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+
+ tx.addOperation(new FinishPageMessageOperation());
+ }
- pagingManager.addTransaction(pageTransactionInfo);
-
- tx.addOperation(new FinishPageMessageOperation());
-
break;
}
case SET_SCHEDULED_DELIVERY_TIME:
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -309,8 +309,6 @@
doRollback();
- state = State.ROLLEDBACK;
-
// We use the Callback even for non persistence
// If we are using non-persistence with replication, the replication manager will have
// to execute this runnable in the correct order
@@ -327,6 +325,7 @@
public void done()
{
afterRollback();
+ state = State.ROLLEDBACK;
}
});
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/utils/LinkedListImpl.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -154,20 +154,20 @@
return (Iterator[])Array.newInstance(Iterator.class, size);
}
- private void removeAfter(Node<E> after)
+ private void removeAfter(Node<E> node)
{
- Node<E> toRemove = after.next;
+ Node<E> toRemove = node.next;
- after.next = toRemove.next;
+ node.next = toRemove.next;
if (toRemove.next != null)
{
- toRemove.next.prev = after;
+ toRemove.next.prev = node;
}
if (toRemove == tail)
{
- tail = after;
+ tail = node;
}
size--;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2011-02-05 01:25:35 UTC (rev 10184)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/xa/BasicXaRecoveryTest.java 2011-02-07 22:10:53 UTC (rev 10185)
@@ -1209,8 +1209,8 @@
clientSession.rollback(xid);
clientSession.start();
m = clientConsumer.receive(1000);
+ Assert.assertNotNull(m);
m.acknowledge();
- Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
More information about the hornetq-commits
mailing list