[hornetq-commits] JBoss hornetq SVN: r10674 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon May 16 23:39:56 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-16 23:39:56 -0400 (Mon, 16 May 2011)
New Revision: 10674

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
optimizations on paging sync

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java	2011-05-17 01:42:03 UTC (rev 10673)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java	2011-05-17 03:39:56 UTC (rev 10674)
@@ -19,6 +19,7 @@
 import java.util.concurrent.TimeUnit;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.OperationContext;
 
@@ -34,6 +35,9 @@
 
    // Constants -----------------------------------------------------
 
+   private static final Logger log = Logger.getLogger(PageSyncTimer.class);
+
+
    // Attributes ----------------------------------------------------
    
    private final PagingStore store;
@@ -83,6 +87,7 @@
       OperationContext [] pendingSyncsArray;
       synchronized (this)
       {
+         
          pendingSync = false;
          pendingSyncsArray = new OperationContext[syncOperations.size()];
          pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
@@ -91,7 +96,10 @@
       
       try
       {
-         store.ioSync();
+         if (pendingSyncsArray.length != 0)
+         {
+            store.ioSync();
+         }
       }
       catch (Exception e)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-05-17 01:42:03 UTC (rev 10673)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-05-17 03:39:56 UTC (rev 10674)
@@ -880,20 +880,19 @@
             openNewPage();
             currentPageSize.addAndGet(bytesToWrite);
          }
-         
-         installPageTransaction(tx, listCtx, currentPage.getPageId());
  
          currentPage.write(pagedMessage);
-
-         if (sync || tx != null)
-         {
-            sync();
-         }
          
          if (tx != null)
          {
+            installPageTransaction(tx, listCtx);
             tx.setWaitBeforeCommit(true);
          }
+         else
+         if (sync && tx == null)
+         {
+            sync();
+         }
 
          return true;
       }
@@ -925,38 +924,46 @@
       return ids;
    }
 
-   private PageTransactionInfo installPageTransaction(final Transaction tx, final RouteContextList listCtx, int pageID) throws Exception
+   private void installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
    {
-      if (tx == null)
+      FinishPageMessageOperation pgOper = (FinishPageMessageOperation)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+      if (pgOper == null)
       {
-         return null;
+         PageTransactionInfo pgTX = new PageTransactionInfoImpl(tx.getID());
+         pagingManager.addTransaction(pgTX);
+         pgOper = new FinishPageMessageOperation(pgTX, storageManager, pagingManager);
+         tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgOper);
+         tx.addOperation(pgOper);
       }
-      else
-      {
-         PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
-         if (pgTX == null)
-         {
-            pgTX = new PageTransactionInfoImpl(tx.getID());
-            pagingManager.addTransaction(pgTX);
-            tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
-            tx.addOperation(new FinishPageMessageOperation(pgTX));
-         }
 
-         pgTX.increment(listCtx.getNumberOfQueues());
+      pgOper.addStore(this);
+      pgOper.pageTransaction.increment(listCtx.getNumberOfQueues());
 
-         return pgTX;
-      }
+      return;
    }
 
-   private class FinishPageMessageOperation implements TransactionOperation
+   private static class FinishPageMessageOperation implements TransactionOperation
    {
-      private final PageTransactionInfo pageTransaction;
+      public final PageTransactionInfo pageTransaction;
+      
+      private final StorageManager storageManager;
+      
+      private final PagingManager pagingManager;
+      
+      private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
 
       private boolean stored = false;
+      
+      public void addStore(PagingStore store)
+      {
+         this.usedStores.add(store);
+      }
 
-      public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
+      public FinishPageMessageOperation(final PageTransactionInfo pageTransaction, final StorageManager storageManager, final PagingManager pagingManager)
       {
          this.pageTransaction = pageTransaction;
+         this.storageManager = storageManager;
+         this.pagingManager = pagingManager;
       }
 
       public void afterCommit(final Transaction tx)
@@ -985,11 +992,24 @@
 
       public void beforeCommit(final Transaction tx) throws Exception
       {
+         syncStore();
          storePageTX(tx);
       }
 
+      /**
+       * @throws Exception
+       */
+      private void syncStore() throws Exception
+      {
+         for (PagingStore store : usedStores)
+         {
+            store.sync();
+         }
+      }
+
       public void beforePrepare(final Transaction tx) throws Exception
       {
+         syncStore();
          storePageTX(tx);
       }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-17 01:42:03 UTC (rev 10673)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-17 03:39:56 UTC (rev 10674)
@@ -3791,7 +3791,7 @@
 
          for (int i = 0; i < 500; i++)
          {
-            log.info("send message #" + i);
+            if (i % 100 == 0) log.info("send message #" + i);
             message = session.createMessage(true);
 
             message.putStringProperty("id", "str" + i);



More information about the hornetq-commits mailing list