[hornetq-commits] JBoss hornetq SVN: r10674 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon May 16 23:39:56 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-05-16 23:39:56 -0400 (Mon, 16 May 2011)
New Revision: 10674
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
optimizations on paging sync
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java 2011-05-17 01:42:03 UTC (rev 10673)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java 2011-05-17 03:39:56 UTC (rev 10674)
@@ -19,6 +19,7 @@
import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.OperationContext;
@@ -34,6 +35,9 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageSyncTimer.class);
+
+
// Attributes ----------------------------------------------------
private final PagingStore store;
@@ -83,6 +87,7 @@
OperationContext [] pendingSyncsArray;
synchronized (this)
{
+
pendingSync = false;
pendingSyncsArray = new OperationContext[syncOperations.size()];
pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
@@ -91,7 +96,10 @@
try
{
- store.ioSync();
+ if (pendingSyncsArray.length != 0)
+ {
+ store.ioSync();
+ }
}
catch (Exception e)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-17 01:42:03 UTC (rev 10673)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-05-17 03:39:56 UTC (rev 10674)
@@ -880,20 +880,19 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
- installPageTransaction(tx, listCtx, currentPage.getPageId());
currentPage.write(pagedMessage);
-
- if (sync || tx != null)
- {
- sync();
- }
if (tx != null)
{
+ installPageTransaction(tx, listCtx);
tx.setWaitBeforeCommit(true);
}
+ else
+ if (sync && tx == null)
+ {
+ sync();
+ }
return true;
}
@@ -925,38 +924,46 @@
return ids;
}
- private PageTransactionInfo installPageTransaction(final Transaction tx, final RouteContextList listCtx, int pageID) throws Exception
+ private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
{
- if (tx == null)
+ FinishPageMessageOperation pgOper = (FinishPageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+ if (pgOper == null)
{
- return null;
+ PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+ pagingManager.addTransaction(pgTX);
+ pgOper = new FinishPageMessageOperation(pgTX, storageManager, pagingManager);
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgOper);
+ tx.addOperation(pgOper);
}
- else
- {
- PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
- if (pgTX == null)
- {
- pgTX = new PageTransactionInfoImpl(tx.getID());
- pagingManager.addTransaction(pgTX);
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
- tx.addOperation(new FinishPageMessageOperation(pgTX));
- }
- pgTX.increment(listCtx.getNumberOfQueues());
+ pgOper.addStore(this);
+ pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
- return pgTX;
- }
+ return;
}
- private class FinishPageMessageOperation implements TransactionOperation
+ private static class FinishPageMessageOperation implements TransactionOperation
{
- private final PageTransactionInfo pageTransaction;
+ public final PageTransactionInfo pageTransaction;
+
+ private final StorageManager storageManager;
+
+ private final PagingManager pagingManager;
+
+ private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
private boolean stored = false;
+
+ public void addStore(PagingStore store)
+ {
+ this.usedStores.add(store);
+ }
- public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+ public FinishPageMessageOperation(final PageTransactionInfo pageTransaction, final StorageManager storageManager, final PagingManager pagingManager)
{
this.pageTransaction = pageTransaction;
+ this.storageManager = storageManager;
+ this.pagingManager = pagingManager;
}
public void afterCommit(final Transaction tx)
@@ -985,11 +992,24 @@
public void beforeCommit(final Transaction tx) throws Exception
{
+ syncStore();
storePageTX(tx);
}
+ /**
+ * @throws Exception
+ */
+ private void syncStore() throws Exception
+ {
+ for (PagingStore store : usedStores)
+ {
+ store.sync();
+ }
+ }
+
public void beforePrepare(final Transaction tx) throws Exception
{
+ syncStore();
storePageTX(tx);
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-17 01:42:03 UTC (rev 10673)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-05-17 03:39:56 UTC (rev 10674)
@@ -3791,7 +3791,7 @@
for (int i = 0; i < 500; i++)
{
- log.info("send message #" + i);
+ if (i % 100 == 0) log.info("send message #" + i);
message = session.createMessage(true);
message.putStringProperty("id", "str" + i);
More information about the hornetq-commits
mailing list