[hornetq-commits] JBoss hornetq SVN: r10067 - in trunk: src/main/org/hornetq/core/paging/cursor and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Dec 22 14:31:00 EST 2010


Author: clebert.suconic at jboss.com
Date: 2010-12-22 14:30:59 -0500 (Wed, 22 Dec 2010)
New Revision: 10067

Added:
   trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
Modified:
   trunk/src/main/org/hornetq/core/paging/PagingManager.java
   trunk/src/main/org/hornetq/core/paging/PagingStore.java
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
   trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
   trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/persistence/OperationContext.java
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Log:
Changes on paging

Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -52,9 +52,6 @@
    /** An injection point for the PostOffice to inject itself */
    void setPostOffice(PostOffice postOffice);
 
-   /** Used to start depaging every paged address, after a reload/restart */
-   void resumeDepages() throws Exception;
-
    /**
     * Point to inform/restoring Transactions used when the messages were added into paging
     * */

Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -57,7 +57,11 @@
 
    boolean isPaging();
 
+   // It will schedule sync to the file storage
    void sync() throws Exception;
+   
+   // It will perform a real sync on the current IO file
+   void ioSync() throws Exception;
 
    boolean page(ServerMessage message, RoutingContext ctx) throws Exception;
 
@@ -88,12 +92,6 @@
    Page getCurrentPage();
 
 
-   /**
-    * @return false if a thread was already started, or if not in page mode
-    * @throws Exception 
-    */
-   boolean startDepaging();
-   
    /** @return true if paging was started, or false if paging was already started before this call */
    boolean startPaging() throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscription.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -33,7 +33,6 @@
    // To be called before the server is down
    void stop();
 
-   // TODO: this method is only used on testcases and can go away
    void bookmark(PagePosition position) throws Exception;
    
    PageSubscriptionCounter getCounter();

Modified: trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -33,7 +33,7 @@
    
    void loadInc(final long recordInd, final int add);
    
-   void replayIncrement(Transaction tx, long recordID, int add);
+   void applyIncrement(Transaction tx, long recordID, int add);
    
    /** This will process the reload */
    void processReload();

Modified: trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -100,20 +100,31 @@
     */
    public void increment(Transaction tx, int add) throws Exception
    {
-
-      if (persistent)
+      if (tx == null)
       {
-         tx.setContainsPersistent();
-         long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
-         replayIncrement(tx, id, add);
+         if (persistent)
+         {
+            long id = storage.storePageCounterInc(this.subscriptionID, add);
+            incrementProcessed(id, add);
+         }
+         else
+         {
+            incrementProcessed(-1, add);
+         }
       }
       else
       {
-         replayIncrement(tx, -1, add);
+         if (persistent)
+         {
+            tx.setContainsPersistent();
+            long id = storage.storePageCounterInc(tx.getID(), this.subscriptionID, add);
+            applyIncrement(tx, id, add);
+         }
+         else
+         {
+            applyIncrement(tx, -1, add);
+         }
       }
-
-
-      
    }
 
    /**
@@ -122,7 +133,7 @@
     * @param recordID
     * @param add
     */
-   public void replayIncrement(Transaction tx, long recordID, int add)
+   public void applyIncrement(Transaction tx, long recordID, int add)
    {
       CounterOperations oper = (CounterOperations)tx.getProperty(TransactionPropertyIndexes.PAGE_COUNT_INC);
 
@@ -194,13 +205,13 @@
    public void addInc(long id, int variance)
    {
       value.addAndGet(variance);
-      
+
       if (id >= 0)
       {
          incrementRecords.add(id);
       }
    }
-   
+
    /** used on testing only */
    public void setPersistent(final boolean persistent)
    {
@@ -215,6 +226,10 @@
       long valueReplace;
       synchronized (this)
       {
+         if (incrementRecords.size() <= FLUSH_COUNTER)
+         {
+            return;
+         }
          valueReplace = value.get();
          deleteList = new ArrayList<Long>(incrementRecords.size());
          deleteList.addAll(incrementRecords);
@@ -242,7 +257,7 @@
          storage.commit(txCleanup);
 
          storage.waitOnOperations();
-      }
+     }
       catch (Exception e)
       {
          newRecordID = recordID;

Added: trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageSyncTimer.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.paging.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * This will batch multiple calls waiting to perform a sync in a single call
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class PageSyncTimer
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   
+   private final PagingStore store;
+   
+   private final ScheduledExecutorService scheduledExecutor;
+   
+   private boolean pendingSync;
+   
+   private final long timeSync;
+   
+   private final Runnable runnable = new Runnable() 
+   {
+      public void run()
+      {
+         tick();
+      }
+   };
+   
+   private List<OperationContext> syncOperations = new LinkedList<OperationContext>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+   
+   public PageSyncTimer(PagingStore store, ScheduledExecutorService scheduledExecutor, long timeSync)
+   {
+      this.store = store;
+      this.scheduledExecutor = scheduledExecutor;
+      this.timeSync = timeSync;
+   }
+
+   // Public --------------------------------------------------------
+
+   public synchronized void addSync(OperationContext ctx)
+   {
+      ctx.pageSyncLineUp();
+      if (!pendingSync)
+      {
+         pendingSync = true;
+         scheduledExecutor.schedule(runnable, timeSync, TimeUnit.NANOSECONDS);
+      }
+      syncOperations.add(ctx);
+   }
+   
+   private void tick()
+   {
+      System.out.println("Tick on PageSynctimer");
+      OperationContext [] pendingSyncsArray;
+      synchronized (this)
+      {
+         pendingSync = false;
+         pendingSyncsArray = new OperationContext[syncOperations.size()];
+         pendingSyncsArray = syncOperations.toArray(pendingSyncsArray);
+         syncOperations.clear();
+      }
+      
+      try
+      {
+         System.out.println("will perform a sync");
+         store.ioSync();
+         System.out.println("done with the sync");
+      }
+      catch (Exception e)
+      {
+         for (OperationContext ctx : pendingSyncsArray)
+         {
+            ctx.onError(HornetQException.IO_ERROR, e.getMessage());
+         }
+      }
+      finally
+      {
+         // In case of failure, The context should propage an exception to the client
+         // We send an exception to the client even on the case of a failure
+         // to avoid possible locks and the client not getting the exception back
+         for (OperationContext ctx : pendingSyncsArray)
+         {
+            ctx.pageSyncDone();
+         }
+      }
+   }
+   
+   
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -149,6 +149,7 @@
    {
       if (lateDeliveries != null)
       {
+         // This is to make sure deliveries that were touched before the commit arrived will be delivered
          for (Pair<PageSubscription, PagePosition> pos : lateDeliveries)
          {
             pos.a.redeliver(pos.b);
@@ -164,7 +165,9 @@
       storageManager.storePageTransaction(tx.getID(), this);
    }
 
-   /* (non-Javadoc)
+   /* 
+    * This is to be used after paging. We will update the PageTransactions until they get all the messages delivered. On that case we will delete the page TX
+    * (non-Javadoc)
     * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
     */
    public void storeUpdate(final StorageManager storageManager, final PagingManager pagingManager, final Transaction tx) throws Exception

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -207,28 +207,6 @@
       pagingStoreFactory.stop();
    }
 
-   public void resumeDepages()
-   {
-      if (!started)
-      {
-         // If stop the server while depaging, the server may call a rollback,
-         // the rollback may addSizes back and that would fire a globalDepage.
-         // Because of that we must ignore any startGlobalDepage calls,
-         // and this check needs to be done outside of the lock
-         return;
-      }
-      synchronized (this)
-      {
-         for (PagingStore store : stores.values())
-         {
-            if (store.isPaging())
-            {
-               store.startDepaging();
-            }
-         }
-      }
-   }
-   
    public void processReload() throws Exception
    {
       for (PagingStore store: stores.values())

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -23,6 +23,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.SequentialFileFactory;
@@ -61,6 +62,10 @@
    protected final boolean syncNonTransactional;
 
    private PagingManager pagingManager;
+   
+   private final ScheduledExecutorService scheduledExecutor;
+   
+   private final long syncTimeout;
 
    private StorageManager storageManager;
 
@@ -71,6 +76,8 @@
    // Constructors --------------------------------------------------
 
    public PagingStoreFactoryNIO(final String directory,
+                                final long syncTimeout,
+                                final ScheduledExecutorService scheduledExecutor,
                                 final ExecutorFactory executorFactory,
                                 final boolean syncNonTransactional)
    {
@@ -79,6 +86,10 @@
       this.executorFactory = executorFactory;
 
       this.syncNonTransactional = syncNonTransactional;
+      
+      this.scheduledExecutor = scheduledExecutor;
+      
+      this.syncTimeout = syncTimeout;
    }
 
    // Public --------------------------------------------------------
@@ -91,6 +102,8 @@
    {
 
       return new PagingStoreImpl(address,
+                                 scheduledExecutor,
+                                 syncTimeout,
                                  pagingManager,
                                  storageManager,
                                  postOffice,
@@ -195,6 +208,8 @@
             AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
 
             PagingStore store = new PagingStoreImpl(address,
+                                                    scheduledExecutor,
+                                                    syncTimeout,
                                                     pagingManager,
                                                     storageManager,
                                                     postOffice,

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -15,10 +15,14 @@
 
 import java.text.DecimalFormat;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -39,7 +43,9 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.DuplicateIDCache;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.MessageReference;
@@ -50,6 +56,7 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.Transaction.State;
+import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.core.transaction.TransactionOperation;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -89,6 +96,9 @@
 
    private final PagingStoreFactory storeFactory;
 
+   // Used to schedule sync threads
+   private final PageSyncTimer syncTimer;
+
    private final long maxSize;
 
    private final long pageSize;
@@ -113,7 +123,7 @@
    private volatile int currentPageId;
 
    private volatile Page currentPage;
-   
+
    private volatile boolean paging = false;
 
    /** duplicate cache used at this address */
@@ -142,6 +152,8 @@
    // Constructors --------------------------------------------------
 
    public PagingStoreImpl(final SimpleString address,
+                          final ScheduledExecutorService scheduledExecutor,
+                          final long syncTimeout,
                           final PagingManager pagingManager,
                           final StorageManager storageManager,
                           final PostOffice postOffice,
@@ -191,6 +203,15 @@
 
       this.syncNonTransactional = syncNonTransactional;
 
+      if (scheduledExecutor != null)
+      {
+         this.syncTimer = new PageSyncTimer(this, scheduledExecutor, syncTimeout);
+      }
+      else
+      {
+         this.syncTimer = null;
+      }
+
       this.cursorProvider = new PageCursorProviderImpl(this, this.storageManager, executorFactory);
 
       // Post office could be null on the backup node
@@ -303,7 +324,7 @@
    {
       return storeName;
    }
-   
+
    public boolean page(final ServerMessage message, final RoutingContext ctx) throws Exception
    {
       return page(message, ctx, ctx.getContextListing(storeName));
@@ -311,73 +332,37 @@
 
    public boolean page(final ServerMessage message, final RoutingContext ctx, RouteContextList listCtx) throws Exception
    {
-      // The sync on transactions is done on commit only
-      // TODO: sync on paging
-      return page(message, ctx, listCtx, false);
+      return page(message, ctx, listCtx, syncNonTransactional && ctx.getTransaction() == null);
    }
 
    public void sync() throws Exception
    {
-      lock.readLock().lock();
-
-      try
+      if (syncTimer != null)
       {
-         if (currentPage != null)
-         {
-            currentPage.sync();
-         }
+         syncTimer.addSync(storageManager.getContext());
       }
-      finally
+      else
       {
-         lock.readLock().unlock();
+         ioSync();
       }
+
    }
 
-   public boolean startDepaging()
+   public void ioSync() throws Exception
    {
+      lock.readLock().lock();
 
-      // Disabled for now
-
-      return false;
-
-      /*
-      if (!running)
-      {
-         return false;
-      }
-
-      currentPageLock.readLock().lock();
       try
       {
-         if (currentPage == null)
+         if (currentPage != null)
          {
-            return false;
+            currentPage.sync();
          }
-         else
-         {
-            // startDepaging and clearDepage needs to be atomic.
-            // We can't use writeLock to this operation as writeLock would still be used by another thread, and still
-            // being a valid usage
-            synchronized (this)
-            {
-               if (!depaging.get())
-               {
-                  depaging.set(true);
-                  Runnable depageAction = new DepageRunnable(executor);
-                  executor.execute(depageAction);
-                  return true;
-               }
-               else
-               {
-                  return false;
-               }
-            }
-         }
       }
       finally
       {
-         currentPageLock.readLock().unlock();
-      } */
+         lock.readLock().unlock();
+      }
    }
 
    public void processReload() throws Exception
@@ -415,11 +400,11 @@
          }
       }
    }
-   
+
    public void flushExecutors()
    {
       cursorProvider.flushExecutors();
-      
+
       Future future = new Future();
 
       executor.execute(future);
@@ -498,7 +483,7 @@
 
                   cursorProvider.addPageCache(pageCache);
                }
-               
+
                // We will not mark it for paging if there's only a single empty file
                if (currentPage != null && !(numberOfPages == 1 && currentPage.getSize() == 0))
                {
@@ -513,7 +498,7 @@
          lock.writeLock().unlock();
       }
    }
-   
+
    public void stopPaging()
    {
       lock.writeLock().lock();
@@ -551,7 +536,7 @@
          {
             return false;
          }
-         
+
          if (currentPage == null)
          {
             try
@@ -568,7 +553,7 @@
          }
 
          paging = true;
-         
+
          return true;
       }
       finally
@@ -805,19 +790,6 @@
                }
             }
          }
-         else
-         {
-            if (maxSize > 0 && currentPage != null && addressSize <= maxSize - pageSize && !depaging.get())
-            {
-               if (startDepaging())
-               {
-                  if (PagingStoreImpl.isTrace)
-                  {
-                     PagingStoreImpl.trace("Starting depaging Thread, size = " + addressSize);
-                  }
-               }
-            }
-         }
 
          return;
       }
@@ -878,8 +850,6 @@
       }
 
       Transaction tx = ctx.getTransaction();
-      
-      boolean startedTx = false;
 
       lock.writeLock().lock();
 
@@ -889,12 +859,6 @@
          {
             return false;
          }
-         
-         if (tx == null)
-         {
-            tx = new TransactionImpl(storageManager);
-            startedTx = true;
-         }
 
          PagedMessage pagedMessage;
 
@@ -905,7 +869,7 @@
             message.bodyChanged();
          }
 
-         pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), getTransactionID(tx, listCtx));
+         pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), installPageTransaction(tx, listCtx));
 
          int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
 
@@ -917,15 +881,31 @@
 
          currentPage.write(pagedMessage);
 
+         if (tx != null)
+         {
+            SyncPageStoreTX syncPage = (SyncPageStoreTX)tx.getProperty(TransactionPropertyIndexes.PAGE_SYNC);
+            if (syncPage == null)
+            {
+               syncPage = new SyncPageStoreTX();
+               tx.putProperty(TransactionPropertyIndexes.PAGE_SYNC, syncPage);
+               tx.addOperation(syncPage);
+            }
+            syncPage.addStore(this);
+         }
+         else
+         {
+            if (sync)
+            {
+               System.out.println("Doing a sync on page");
+               sync();
+            }
+         }
+
          return true;
       }
       finally
       {
          lock.writeLock().unlock();
-         if (startedTx)
-         {
-            tx.commit();
-         }
       }
 
    }
@@ -934,7 +914,7 @@
    {
       List<org.hornetq.core.server.Queue> durableQueues = ctx.getDurableQueues();
       List<org.hornetq.core.server.Queue> nonDurableQueues = ctx.getNonDurableQueues();
-      long ids[] = new long [durableQueues.size() + nonDurableQueues.size()];
+      long ids[] = new long[durableQueues.size() + nonDurableQueues.size()];
       int i = 0;
 
       for (org.hornetq.core.server.Queue q : durableQueues)
@@ -942,7 +922,7 @@
          q.getPageSubscription().getCounter().increment(tx, 1);
          ids[i++] = q.getID();
       }
-      
+
       for (org.hornetq.core.server.Queue q : nonDurableQueues)
       {
          q.getPageSubscription().getCounter().increment(tx, 1);
@@ -950,8 +930,8 @@
       }
       return ids;
    }
-   
-   private long getTransactionID(final Transaction tx, final RouteContextList listCtx) throws Exception
+
+   private long installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
    {
       if (tx == null)
       {
@@ -959,7 +939,7 @@
       }
       else
       {
-         PageTransactionInfo pgTX = (PageTransactionInfo) tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
+         PageTransactionInfo pgTX = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
          if (pgTX == null)
          {
             pgTX = new PageTransactionInfoImpl(tx.getID());
@@ -967,25 +947,81 @@
             tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pgTX);
             tx.addOperation(new FinishPageMessageOperation(pgTX));
          }
-         
+
          pgTX.increment(listCtx.getNumberOfQueues());
-         
+
          return tx.getID();
       }
    }
 
-   
+   private static class SyncPageStoreTX extends TransactionOperationAbstract
+   {
+      Set<PagingStore> storesToSync = new HashSet<PagingStore>();
+
+      public void addStore(PagingStore store)
+      {
+         storesToSync.add(store);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforePrepare(Transaction tx) throws Exception
+      {
+         sync();
+      }
+
+      void sync() throws Exception
+      {
+         OperationContext originalTX = OperationContextImpl.getContext();
+
+         try
+         {
+            // We only want to sync paging here, no need to wait for any other events
+            OperationContextImpl.clearContext();
+
+            for (PagingStore store : storesToSync)
+            {
+               store.sync();
+            }
+
+            // We can't perform a commit/sync on the journal before we can assure page files are synced or we may get
+            // out of sync
+            OperationContext ctx = OperationContextImpl.getContext();
+
+            if (ctx != null)
+            {
+               // if null it means there were no operations done before, hence no need to wait any completions
+               ctx.waitCompletion();
+            }
+         }
+         finally
+         {
+            OperationContextImpl.setContext(originalTX);
+         }
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+       */
+      public void beforeCommit(Transaction tx) throws Exception
+      {
+         sync();
+      }
+   }
+
    private class FinishPageMessageOperation implements TransactionOperation
    {
       private final PageTransactionInfo pageTransaction;
-      
+
       private boolean stored = false;
 
       public FinishPageMessageOperation(final PageTransactionInfo pageTransaction)
       {
          this.pageTransaction = pageTransaction;
       }
-      
+
       public void afterCommit(final Transaction tx)
       {
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -1019,7 +1055,7 @@
       {
          storePageTX(tx);
       }
-      
+
       private void storePageTX(final Transaction tx) throws Exception
       {
          if (!stored)
@@ -1044,24 +1080,6 @@
 
    }
 
-   /**
-    * This method will remove files from the page system and and route them, doing it transactionally
-    *     
-    * If persistent messages are also used, it will update eventual PageTransactions
-    */
-
-    /**
-    * @param pageId
-    * @return
-    */
-   private byte[] generateDuplicateID(final int pageId)
-   {
-      byte duplicateIdForPage[] = new SimpleString("page-" + pageId).getData();
-      return duplicateIdForPage;
-   }
-
-   
-
    private void openNewPage() throws Exception
    {
       lock.writeLock().lock();
@@ -1126,40 +1144,4 @@
    }
 
    // Inner classes -------------------------------------------------
-
-   /*   private class DepageRunnable implements Runnable
-      {
-         private final Executor followingExecutor;
-
-         public DepageRunnable(final Executor followingExecutor)
-         {
-            this.followingExecutor = followingExecutor;
-         }
-
-         public void run()
-         {
-            try
-            {
-               if (running)
-               {
-                  if (!isAddressFull(getPageSizeBytes()))
-                  {
-                     readPage();
-                  }
-
-                  // Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
-                  // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
-                  // the lock and this would dead lock
-                  if (running && !clearDepage())
-                  {
-                     followingExecutor.execute(this);
-                  }
-               }
-            }
-            catch (Throwable e)
-            {
-               PagingStoreImpl.log.error(e, e);
-            }
-         }
-      } */
 }

Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -34,6 +34,10 @@
    void replicationLineUp();
 
    void replicationDone();
+   
+   void pageSyncLineUp();
+   
+   void pageSyncDone();
 
    void waitCompletion() throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -205,5 +205,11 @@
     */
    long storePageCounterInc(long txID, long queueID, int add) throws Exception;
    
+   /**
+    * @return the ID with the increment record
+    * @throws Exception 
+    */
+   long storePageCounterInc(long queueID, int add) throws Exception;
    
+   
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -1059,7 +1059,6 @@
 
                encoding.decode(buff);
 
-
                PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
 
                if (sub != null)
@@ -1126,9 +1125,16 @@
          }
       }
 
-      loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, queueInfos, preparedTransactions, duplicateIDMap, pageSubscriptions);
-      
-      for (PageSubscription sub: pageSubscriptions.values())
+      loadPreparedTransactions(postOffice,
+                               pagingManager,
+                               resourceManager,
+                               queues,
+                               queueInfos,
+                               preparedTransactions,
+                               duplicateIDMap,
+                               pageSubscriptions);
+
+      for (PageSubscription sub : pageSubscriptions.values())
       {
          sub.getCounter().processReload();
       }
@@ -1205,7 +1211,7 @@
             pageSubscriptions.put(queueID, subs);
          }
       }
-      
+
       return subs;
    }
 
@@ -1262,6 +1268,20 @@
    }
 
    /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storePageCounterAdd(long, long, int)
+    */
+   public long storePageCounterInc(long queueID, int value) throws Exception
+   {
+      long recordID = idGenerator.generateID();
+      messageJournal.appendAddRecord(recordID,
+                                     JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+                                     new PageCountRecordInc(queueID, value),
+                                     true,
+                                     getContext());
+      return recordID;
+   }
+
+   /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
     */
    public long storePageCounter(long txID, long queueID, long value) throws Exception
@@ -1685,8 +1705,21 @@
                }
                case ACKNOWLEDGE_CURSOR:
                {
-                  // TODO: implement and test this case
-                  // and make sure the rollback will work well also
+                  CursorAckRecordEncoding encoding = new CursorAckRecordEncoding();
+                  encoding.decode(buff);
+
+                  encoding.position.setRecordID(record.id);
+
+                  PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
+
+                  if (sub != null)
+                  {
+                     sub.reloadPreparedACK(tx, encoding.position);
+                  }
+                  else
+                  {
+                     log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+                  }
                   break;
                }
                case PAGE_CURSOR_COUNTER_VALUE:
@@ -1702,12 +1735,14 @@
 
                   encoding.decode(buff);
 
+                  PageSubscription sub = locateSubscription(encoding.queueID,
+                                                            pageSubscriptions,
+                                                            queueInfos,
+                                                            pagingManager);
 
-                  PageSubscription sub = locateSubscription(encoding.queueID, pageSubscriptions, queueInfos, pagingManager);
-
                   if (sub != null)
                   {
-                     sub.getCounter().replayIncrement(tx, record.id, encoding.value);
+                     sub.getCounter().applyIncrement(tx, record.id, encoding.value);
                   }
                   else
                   {
@@ -1872,6 +1907,20 @@
          return true;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.OperationContext#pageLineUp()
+       */
+      public void pageSyncLineUp()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.OperationContext#pageDone()
+       */
+      public void pageSyncDone()
+      {
+      }
+
    }
 
    private static class XidEncoding implements EncodingSupport

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -49,14 +49,26 @@
    {
       OperationContextImpl.threadLocalContext.set(null);
    }
-
+   
+   public static OperationContext getContext()
+   {
+      return getContext(null);
+   }
+   
    public static OperationContext getContext(final ExecutorFactory executorFactory)
    {
       OperationContext token = OperationContextImpl.threadLocalContext.get();
       if (token == null)
       {
-         token = new OperationContextImpl(executorFactory.getExecutor());
-         OperationContextImpl.threadLocalContext.set(token);
+         if (executorFactory == null)
+         {
+            return null;
+         }
+         else
+         {
+            token = new OperationContextImpl(executorFactory.getExecutor());
+            OperationContextImpl.threadLocalContext.set(token);
+         }
       }
       return token;
    }
@@ -68,17 +80,23 @@
 
    private List<TaskHolder> tasks;
 
+   private int minimalStore = Integer.MAX_VALUE;
+
+   private int minimalReplicated = Integer.MAX_VALUE;
+   
+   private int minimalPage = Integer.MAX_VALUE;
+
    private volatile int storeLineUp = 0;
 
    private volatile int replicationLineUp = 0;
+   
+   private volatile int pageLineUp = 0;
 
-   private int minimalStore = Integer.MAX_VALUE;
-
-   private int minimalReplicated = Integer.MAX_VALUE;
-
    private int stored = 0;
 
    private int replicated = 0;
+   
+   private int paged = 0;
 
    private int errorCode = -1;
 
@@ -93,6 +111,17 @@
       super();
       this.executor = executor;
    }
+   
+   public void pageSyncLineUp()
+   {
+      pageLineUp++;
+   }
+   
+   public synchronized void pageSyncDone()
+   {
+      paged++;
+      checkTasks();
+   }
 
    public void storeLineUp()
    {
@@ -127,10 +156,11 @@
             tasks = new LinkedList<TaskHolder>();
             minimalReplicated = replicationLineUp;
             minimalStore = storeLineUp;
+            minimalPage = pageLineUp;
          }
 
          // On this case, we can just execute the context directly
-         if (replicationLineUp == replicated && storeLineUp == stored)
+         if (replicationLineUp == replicated && storeLineUp == stored && pageLineUp == paged)
          {
             // We want to avoid the executor if everything is complete...
             // However, we can't execute the context if there are executions pending
@@ -168,13 +198,13 @@
 
    private void checkTasks()
    {
-      if (stored >= minimalStore && replicated >= minimalReplicated)
+      if (stored >= minimalStore && replicated >= minimalReplicated && paged >= minimalPage)
       {
          Iterator<TaskHolder> iter = tasks.iterator();
          while (iter.hasNext())
          {
             TaskHolder holder = iter.next();
-            if (stored >= holder.storeLined && replicated >= holder.replicationLined)
+            if (stored >= holder.storeLined && replicated >= holder.replicationLined && paged >= holder.pageLined)
             {
                // If set, we use an executor to avoid the server being single threaded
                execute(holder.task);
@@ -250,6 +280,8 @@
       int storeLined;
 
       int replicationLined;
+      
+      int pageLined;
 
       IOAsyncTask task;
 
@@ -257,6 +289,7 @@
       {
          storeLined = storeLineUp;
          replicationLined = replicationLineUp;
+         pageLined = pageLineUp;
          this.task = task;
       }
    }
@@ -288,4 +321,25 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "OperationContextImpl [storeLineUp=" + storeLineUp +
+             ", stored=" +
+             stored +
+             ", replicationLineUp=" +
+             replicationLineUp +
+             ", replicated=" +
+             replicated +
+             ", pageLineUp=" +
+             pageLineUp +
+             ", paged=" +
+             paged +
+             "]";
+   }
+   
+
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -507,4 +507,13 @@
       return 0;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
+    */
+   public long storePageCounterInc(long queueID, int add) throws Exception
+   {
+      // TODO Auto-generated method stub
+      return 0;
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -219,6 +219,8 @@
       journalLoadInformation = storage.loadInternalOnly();
 
       pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
+                                                                    config.getJournalBufferSize_NIO(),
+                                                                    server.getScheduledPool(),
                                                                     server.getExecutorFactory(),
                                                                     config.isJournalSyncNonTransactional()),
                                           storage,

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -15,6 +15,7 @@
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.management.MBeanServer;
 
@@ -140,6 +141,8 @@
 
    void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
 
+   ScheduledExecutorService getScheduledPool();
+   
    ExecutorFactory getExecutorFactory();
 
    void setGroupingHandler(GroupingHandler groupingHandler);

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -764,6 +764,12 @@
    // HornetQServer implementation
    // -----------------------------------------------------------
 
+   
+   public ScheduledExecutorService getScheduledPool()
+   {
+      return scheduledPool;
+   }
+   
    public Configuration getConfiguration()
    {
       return configuration;
@@ -1110,7 +1116,10 @@
 
    protected PagingManager createPagingManager()
    {
+      
       return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                             (long)configuration.getJournalBufferSize_NIO(),
+                                                             scheduledPool,
                                                              executorFactory,
                                                              configuration.isJournalSyncNonTransactional()),
                                    storageManager,

Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -25,6 +25,8 @@
 public class TransactionPropertyIndexes
 {
 
+   public static final int PAGE_SYNC = 2;
+   
    public static final int PAGE_COUNT_INC = 3;
    
    public static final int PAGE_TRANSACTION_UPDATE = 4;

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -13,47 +13,46 @@
 
 package org.hornetq.tests.integration.client;
 
-import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
 import junit.framework.Assert;
 import junit.framework.AssertionFailedError;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.MessageHandler;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.DivertConfiguration;
-import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.impl.PagingManagerImpl;
-import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.impl.HornetQServerImpl;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
-import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.ExecutorFactory;
 
 /**
  * A PagingTest
@@ -111,7 +110,189 @@
 
       super.tearDown();
    }
+   
+   public void testPreparePersistent() throws Exception
+   {
+      boolean persistentMessages = true;
+      
+      System.out.println("PageDir:" + getPageDir());
+      clearData();
 
+      Configuration config = createDefaultConfig();
+      
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+      
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 10000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(persistentMessages);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+         session.close();
+         session = null;
+         
+         sf.close();
+         locator.close();
+
+         server.stop();
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         locator = createInVMNonHALocator();
+         sf = locator.createSessionFactory();
+         
+         Queue queue = server.locateQueue(ADDRESS);
+         
+         assertEquals(numberOfMessages, queue.getMessageCount());
+
+         
+         LinkedList<Xid> xids = new LinkedList<Xid>();
+         
+         int msgReceived = 0;
+         for (int i = 0 ; i < numberOfMessages / 999; i++)
+         {
+            ClientSession sessionConsumer = sf.createSession(true, false, false);
+            Xid xid = newXID();
+            xids.add(xid);
+            sessionConsumer.start(xid, XAResource.TMNOFLAGS);
+            sessionConsumer.start();
+            ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+            for (int msgCount = 0 ; msgCount < 1000; i++)
+            {
+               if (msgReceived == numberOfMessages)
+               {
+                  break;
+               }
+               System.out.println("MsgReceived = " + (msgReceived++));
+               ClientMessage msg = consumer.receive(5000);
+               assertNotNull(msg);
+               msg.acknowledge();
+            }
+            sessionConsumer.end(xid, XAResource.TMSUCCESS);
+            sessionConsumer.prepare(xid);
+            sessionConsumer.close();
+         }
+         
+         
+         ClientSession sessionCheck = sf.createSession(true, true);
+         
+         ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS);
+         
+         assertNull(consumer.receiveImmediate());
+
+         sessionCheck.close();
+
+         sf.close();
+         locator.close();
+
+         server.stop();
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         queue = server.locateQueue(ADDRESS);
+
+         locator = createInVMNonHALocator();
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, false, false);
+
+         consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         session.start();
+         
+         assertNull(consumer.receiveImmediate());
+         
+         for (Xid xid : xids)
+         {
+            session.rollback(xid);
+         }
+         
+         xids.clear();
+         
+         assertNotNull(consumer.receiveImmediate());
+
+         session.close();
+         
+         sf.close();
+         
+         locator.close();
+         
+         //assertEquals(numberOfMessages, queue.getMessageCount());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
    public void testSendReceivePagingPersistent() throws Exception
    {
       internaltestSendReceivePaging(true);
@@ -137,6 +318,8 @@
       clearData();
 
       Configuration config = createDefaultConfig();
+      
+      config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
                                           config,
@@ -284,7 +467,7 @@
 
                         Assert.assertNotNull(message2);
 
-                        session.commit();
+                        if (i % 1000 == 0) session.commit();
 
                         try
                         {
@@ -299,6 +482,8 @@
                            throw e;
                         }
                      }
+                     
+                     session.commit();
 
                      consumer.close();
 
@@ -362,6 +547,8 @@
       clearData();
 
       Configuration config = createDefaultConfig();
+      
+      config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
                                           config,
@@ -452,7 +639,7 @@
 
             Assert.assertNotNull(message2);
 
-            session.commit();
+            if (i % 1000 == 0) session.commit();
 
             try
             {
@@ -1060,203 +1247,6 @@
       }
    }
 
-   // This test will force a depage thread as soon as the first message hits the page
-   public void testDepageDuringTransaction5() throws Exception
-   {
-      clearData();
-
-      final Configuration config = createDefaultConfig();
-      HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
-
-      final Executor executor = Executors.newSingleThreadExecutor();
-
-      final AtomicInteger countDepage = new AtomicInteger(0);
-      class HackPagingStore extends PagingStoreImpl
-      {
-         HackPagingStore(final SimpleString address,
-                         final PagingManager pagingManager,
-                         final StorageManager storageManager,
-                         final PostOffice postOffice,
-                         final SequentialFileFactory fileFactory,
-                         final PagingStoreFactory storeFactory,
-                         final SimpleString storeName,
-                         final AddressSettings addressSettings,
-                         final ExecutorFactory executorFactory,
-                         final boolean syncNonTransactional)
-         {
-            super(address,
-                  pagingManager,
-                  storageManager,
-                  postOffice,
-                  fileFactory,
-                  storeFactory,
-                  storeName,
-                  addressSettings,
-                  executorFactory,
-                  syncNonTransactional);
-         }
-
-         public boolean startDepaging()
-         {
-            // do nothing, we are hacking depage right in between paging
-            return false;
-         }
-
-      };
-
-      class HackStoreFactory extends PagingStoreFactoryNIO
-      {
-         HackStoreFactory(final String directory,
-                          final ExecutorFactory executorFactory,
-                          final boolean syncNonTransactional)
-         {
-            super(directory, executorFactory, syncNonTransactional);
-         }
-
-         public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings)
-         {
-
-            return new HackPagingStore(address,
-                                       getPagingManager(),
-                                       getStorageManager(),
-                                       getPostOffice(),
-                                       null,
-                                       this,
-                                       address,
-                                       settings,
-                                       getExecutorFactory(),
-                                       syncNonTransactional);
-         }
-
-      };
-
-      HornetQServer server = new HornetQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), securityManager)
-
-      {
-         protected PagingManager createPagingManager()
-         {
-            return new PagingManagerImpl(new HackStoreFactory(config.getPagingDirectory(),
-                                                              getExecutorFactory(),
-                                                              config.isJournalSyncNonTransactional()),
-                                         getStorageManager(),
-                                         getAddressSettingsRepository());
-         }
-      };
-
-      AddressSettings defaultSetting = new AddressSettings();
-      defaultSetting.setPageSizeBytes(PAGE_SIZE);
-      defaultSetting.setMaxSizeBytes(PAGE_MAX);
-      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
-      server.start();
-
-      final AtomicInteger errors = new AtomicInteger(0);
-
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setBlockOnAcknowledge(false);
-
-      final ClientSessionFactory sf = locator.createSessionFactory();
-      final int messageSize = 1024; // 1k
-      final int numberOfMessages = 2000;
-
-      try
-      {
-
-         final byte[] body = new byte[messageSize];
-
-         Thread producerThread = new Thread()
-         {
-            public void run()
-            {
-               ClientSession sessionProducer = null;
-               try
-               {
-                  sessionProducer = sf.createSession(false, false);
-                  ClientProducer producer = sessionProducer.createProducer(ADDRESS);
-
-                  for (int i = 0; i < numberOfMessages; i++)
-                  {
-                     ClientMessage msg = sessionProducer.createMessage(true);
-                     msg.getBodyBuffer().writeBytes(body);
-                     msg.putIntProperty("count", i);
-                     producer.send(msg);
-
-                     if (i % 500 == 0 && i != 0)
-                     {
-                        sessionProducer.commit();
-                        // Thread.sleep(500);
-                     }
-                  }
-
-                  sessionProducer.commit();
-
-                  System.out.println("Producer gone");
-
-               }
-               catch (Throwable e)
-               {
-                  e.printStackTrace(); // >> junit report
-                  errors.incrementAndGet();
-               }
-               finally
-               {
-                  try
-                  {
-                     if (sessionProducer != null)
-                     {
-                        sessionProducer.close();
-                     }
-                  }
-                  catch (Throwable e)
-                  {
-                     e.printStackTrace();
-                     errors.incrementAndGet();
-                  }
-               }
-            }
-         };
-
-         ClientSession session = sf.createSession(true, true, 0);
-         session.start();
-         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-
-         producerThread.start();
-
-         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-
-         for (int i = 0; i < numberOfMessages; i++)
-         {
-            ClientMessage msg = consumer.receive(500000);
-            assertNotNull(msg);
-            assertEquals(i, msg.getIntProperty("count").intValue());
-            msg.acknowledge();
-            if (i > 0 && i % 10 == 0)
-            {
-               // session.commit();
-            }
-         }
-         // session.commit();
-
-         session.close();
-
-         producerThread.join();
-
-         assertEquals(0, errors.get());
-      }
-      finally
-      {
-         try
-         {
-            server.stop();
-         }
-         catch (Throwable ignored)
-         {
-         }
-      }
-
-   }
-
    public void testOrderingNonTX() throws Exception
    {
       clearData();
@@ -1399,6 +1389,8 @@
       clearData();
 
       Configuration config = createDefaultConfig();
+      
+      config.setJournalSyncNonTransactional(false);
 
       HornetQServer server = createServer(true,
                                           config,
@@ -1980,7 +1972,7 @@
       }
 
    }
-
+   
    public void testDropMessagesExpiring() throws Exception
    {
       clearData();
@@ -2190,7 +2182,207 @@
       }
 
    }
+   
+   public void testSyncPage() throws Exception
+   {
+      Configuration config = createDefaultConfig();
 
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      try
+      {
+         server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+         
+         final CountDownLatch pageUp = new CountDownLatch(0);
+         final CountDownLatch pageDone = new CountDownLatch(1);
+         
+         OperationContext ctx = new OperationContext()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+            }
+            
+            public void storeLineUp()
+            {
+            }
+            
+            public boolean waitCompletion(long timeout) throws Exception
+            {
+               return false;
+            }
+            
+            public void waitCompletion() throws Exception
+            {
+               
+            }
+            
+            public void replicationLineUp()
+            {
+               
+            }
+            
+            public void replicationDone()
+            {
+               
+            }
+            
+            public void pageSyncLineUp()
+            {
+               pageUp.countDown();
+            }
+            
+            public void pageSyncDone()
+            {
+               pageDone.countDown();
+            }
+            
+            public void executeOnCompletion(IOAsyncTask runnable)
+            {
+               
+            }
+         };
+
+         
+         OperationContextImpl.setContext(ctx);
+         
+         PagingManager paging = server.getPagingManager();
+         
+         PagingStore store = paging.getPageStore(ADDRESS);
+         
+         store.sync();
+         
+         assertTrue(pageUp.await(10, TimeUnit.SECONDS));
+         
+         assertTrue(pageDone.await(10, TimeUnit.SECONDS));
+         
+         server.stop();
+
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
+   public void testSyncPageTX() throws Exception
+   {
+      Configuration config = createDefaultConfig();
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      try
+      {
+         server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
+         
+         final CountDownLatch pageUp = new CountDownLatch(0);
+         final CountDownLatch pageDone = new CountDownLatch(1);
+         
+         OperationContext ctx = new OperationContext()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+            }
+            
+            public void storeLineUp()
+            {
+            }
+            
+            public boolean waitCompletion(long timeout) throws Exception
+            {
+               return false;
+            }
+            
+            public void waitCompletion() throws Exception
+            {
+               
+            }
+            
+            public void replicationLineUp()
+            {
+               
+            }
+            
+            public void replicationDone()
+            {
+               
+            }
+            
+            public void pageSyncLineUp()
+            {
+               pageUp.countDown();
+            }
+            
+            public void pageSyncDone()
+            {
+               pageDone.countDown();
+            }
+            
+            public void executeOnCompletion(IOAsyncTask runnable)
+            {
+               
+            }
+         };
+
+         
+         OperationContextImpl.setContext(ctx);
+         
+         PagingManager paging = server.getPagingManager();
+         
+         PagingStore store = paging.getPageStore(ADDRESS);
+         
+         store.sync();
+         
+         assertTrue(pageUp.await(10, TimeUnit.SECONDS));
+         
+         assertTrue(pageDone.await(10, TimeUnit.SECONDS));
+         
+         server.stop();
+
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+
    public void testPagingOneDestinationOnly() throws Exception
    {
       SimpleString PAGED_ADDRESS = new SimpleString("paged");

Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -14,7 +14,6 @@
 package org.hornetq.tests.integration.replication;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -34,7 +33,6 @@
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -59,7 +57,6 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.protocol.core.Packet;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.replication.impl.ReplicatedJournal;
 import org.hornetq.core.replication.impl.ReplicationManagerImpl;
 import org.hornetq.core.server.HornetQServer;
@@ -69,7 +66,6 @@
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.spi.core.protocol.RemotingConnection;
-import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
@@ -710,6 +706,8 @@
    {
 
       PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                                             1000,
+                                                                             null,
                                                                              executorFactory,
                                                                              false),
                                                    storageManager,

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -67,6 +67,7 @@
       
       
       PagingStoreFactoryNIO storeFactory = new PagingStoreFactoryNIO(getPageDir(),
+                                                                     100, null,
                                 new OrderedExecutorFactory(Executors.newCachedThreadPool()),
                                 true);
       

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -138,6 +138,8 @@
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
       PagingStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                  null,
+                                                  100,
                                                   createMockManager(),
                                                   createStorageManagerMock(),
                                                   createPostOfficeMock(),
@@ -174,6 +176,8 @@
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                           null,
+                                                           100,
                                                            createMockManager(),
                                                            createStorageManagerMock(),
                                                            createPostOfficeMock(),
@@ -210,6 +214,8 @@
       storeImpl.sync();
 
       storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                      null,
+                                      100,
                                       createMockManager(),
                                       createStorageManagerMock(),
                                       createPostOfficeMock(),
@@ -237,6 +243,8 @@
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                           null,
+                                                           100,
                                                            createMockManager(),
                                                            createStorageManagerMock(),
                                                            createPostOfficeMock(),
@@ -312,6 +320,8 @@
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                           null,
+                                                           100,
                                                            createMockManager(),
                                                            createStorageManagerMock(),
                                                            createPostOfficeMock(),
@@ -358,7 +368,7 @@
       for (int pageNr = 0; pageNr < 2; pageNr++)
       {
          Page page = storeImpl.depage();
-         
+
          System.out.println("numberOfPages = " + storeImpl.getNumberOfPages());
 
          page.open();
@@ -459,6 +469,8 @@
       settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
       final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                                 null,
+                                                                 100,
                                                                  createMockManager(),
                                                                  createStorageManagerMock(),
                                                                  createPostOfficeMock(),
@@ -622,6 +634,8 @@
       }
 
       TestSupportPageStore storeImpl2 = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                            null,
+                                                            100,
                                                             createMockManager(),
                                                             createStorageManagerMock(),
                                                             createPostOfficeMock(),
@@ -644,7 +658,7 @@
 
       long lastMessageId = messageIdGenerator.incrementAndGet();
       ServerMessage lastMsg = createMessage(lastMessageId, storeImpl, destination, createRandomBuffer(lastMessageId, 5));
-      
+
       storeImpl2.forceAnotherPage();
 
       storeImpl2.page(lastMsg, new RoutingContextImpl(null));
@@ -707,6 +721,8 @@
       settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
       final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                                 null,
+                                                                 100,
                                                                  createMockManager(),
                                                                  createStorageManagerMock(),
                                                                  createPostOfficeMock(),
@@ -747,6 +763,8 @@
       settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
       final TestSupportPageStore storeImpl = new PagingStoreImpl(PagingStoreImplTest.destinationTestName,
+                                                                 null,
+                                                                 100,
                                                                  createMockManager(),
                                                                  createStorageManagerMock(),
                                                                  createPostOfficeMock(),
@@ -755,7 +773,7 @@
                                                                  new SimpleString("test"),
                                                                  settings,
                                                                  getExecutorFactory(),
-                                                                 true);
+                                                                 false);
 
       storeImpl.start();
 
@@ -1181,7 +1199,6 @@
          return 0;
       }
 
-
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
        */
@@ -1605,7 +1622,7 @@
       public void deleteIncrementRecord(long txID, long recordID) throws Exception
       {
          // TODO Auto-generated method stub
-         
+
       }
 
       /* (non-Javadoc)
@@ -1614,7 +1631,7 @@
       public void deletePageCounter(long txID, long recordID) throws Exception
       {
          // TODO Auto-generated method stub
-         
+
       }
 
       /* (non-Javadoc)
@@ -1626,6 +1643,15 @@
          return 0;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
+       */
+      public long storePageCounterInc(long queueID, int add) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return 0;
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory

Modified: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java	2010-12-22 08:37:28 UTC (rev 10066)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java	2010-12-22 19:30:59 UTC (rev 10067)
@@ -45,6 +45,67 @@
 
    // Public --------------------------------------------------------
 
+   public void testCompleteTaskAfterPaging() throws Exception
+   {
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      try
+      {
+         OperationContextImpl impl = new OperationContextImpl(executor);
+         final CountDownLatch latch1 = new CountDownLatch(1);
+         final CountDownLatch latch2 = new CountDownLatch(1);
+         
+         impl.executeOnCompletion(new IOAsyncTask()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+               latch1.countDown();
+            }
+         });
+         
+         assertTrue(latch1.await(10, TimeUnit.SECONDS));
+         
+         for (int i = 0 ; i < 10; i++) impl.storeLineUp();
+         for (int i = 0 ; i < 3; i++) impl.pageSyncLineUp();
+
+         impl.executeOnCompletion(new IOAsyncTask()
+         {
+            
+            public void onError(int errorCode, String errorMessage)
+            {
+            }
+            
+            public void done()
+            {
+               latch2.countDown();
+            }
+         });
+         
+         
+         assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+         
+         for (int i = 0 ; i < 9; i++) impl.done();
+         for (int i = 0 ; i < 2; i++) impl.pageSyncDone();
+         
+         
+         assertFalse(latch2.await(1, TimeUnit.MILLISECONDS));
+         
+         impl.done();
+         impl.pageSyncDone();
+         
+         assertTrue(latch2.await(10, TimeUnit.SECONDS));
+         
+      }
+      finally
+      {
+         executor.shutdown();
+      }
+   }
+
    public void testCaptureExceptionOnExecutor() throws Exception
    {
       ExecutorService executor = Executors.newSingleThreadExecutor();



More information about the hornetq-commits mailing list