[hornetq-commits] JBoss hornetq SVN: r8223 - in trunk: src/main/org/hornetq/core/paging and 10 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 5 09:23:21 EST 2009


Author: timfox
Date: 2009-11-05 09:23:20 -0500 (Thu, 05 Nov 2009)
New Revision: 8223

Modified:
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/paging/PagingManager.java
   trunk/src/main/org/hornetq/core/paging/PagingStore.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/server/ServerMessage.java
   trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java
   trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
   trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
   trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java
   trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java
   trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
   trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
refactor and cleanup of paging code

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -73,7 +73,7 @@
 
    protected long messageID;
 
-   private SimpleString destination;
+   protected SimpleString destination;
 
    protected byte type;
 

Modified: trunk/src/main/org/hornetq/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingManager.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/PagingManager.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -13,12 +13,9 @@
 
 package org.hornetq.core.paging;
 
-import java.util.Collection;
-
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -57,35 +54,6 @@
    void resumeDepages() throws Exception;
 
    /**
-    * To be used by transactions only.
-    * If you're sure you will page if isPaging, just call the method page and look at its return. 
-    * @param destination
-    * @return
-    */
-   boolean isPaging(SimpleString destination) throws Exception;
-
-   /**
-    * Page, only if destination is in page mode.
-    * @param message
-    * @param sync - Sync should be called right after the write
-    * @return false if destination is not on page mode
-    */
-
-   // FIXME - why are these methods still on PagingManager???
-   // The current code is doing a lookup every time through this class just to call page store!!
-   boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
-
-   /**
-    * Page, only if destination is in page mode.
-    * @param message
-    * @return false if destination is not on page mode
-    */
-
-   // FIXME - why are these methods still on PagingManager???
-   // The current code is doing a lookup every time through this class just to call page store!!
-   boolean page(ServerMessage message, long transactionId, boolean duplicateDetection) throws Exception;
-
-   /**
     * Point to inform/restoring Transactions used when the messages were added into paging
     * */
    void addTransaction(PageTransactionInfo pageTransaction);
@@ -95,9 +63,6 @@
     * */
    PageTransactionInfo getTransaction(long transactionID);
 
-   /** Sync current-pages on disk for these destinations */
-   void sync(Collection<SimpleString> destinationsToSync) throws Exception;
-
    /**
     * @param transactionID
     */

Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -33,32 +33,26 @@
  */
 public interface PagingStore extends HornetQComponent
 {
+   SimpleString getAddress();
+   
    int getNumberOfPages();
 
    SimpleString getStoreName();
 
-   /** Maximum number of bytes allowed in memory */
-   long getMaxSizeBytes();
-
    AddressFullMessagePolicy getAddressFullMessagePolicy();
 
    long getPageSizeBytes();
 
    long getAddressSize();
 
-   /** @return true if paging was started, or false if paging was already started before this call */
-   boolean startPaging() throws Exception;
-
    boolean isPaging();
 
    void sync() throws Exception;
 
-   boolean page(PagedMessage message, boolean sync, boolean duplicateDetection) throws Exception;
+   boolean page(ServerMessage message, long transactionId, boolean duplicateDetection) throws Exception;
    
-   public boolean readPage() throws Exception;
+   boolean page(ServerMessage message, boolean duplicateDetection) throws Exception;
    
-   Page getCurrentPage();
-   
    Page createPage(final int page) throws Exception;
 
    /**

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.paging.impl;
 
-import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,7 +26,6 @@
 import org.hornetq.core.paging.PagingStoreFactory;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.utils.SimpleString;
@@ -47,8 +45,6 @@
 
    private volatile boolean started = false;
 
-   // private volatile boolean backup;
-
    private final AtomicLong totalMemoryBytes = new AtomicLong(0);
 
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
@@ -59,8 +55,6 @@
 
    private final StorageManager storageManager;
 
-   private final boolean syncNonTransactional;
-
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
 
    // Static
@@ -73,13 +67,11 @@
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
                             final StorageManager storageManager,
-                            final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                            final boolean syncNonTransactional)
+                            final HierarchicalRepository<AddressSettings> addressSettingsRepository)
    {
       pagingStoreFactory = pagingSPI;
       this.addressSettingsRepository = addressSettingsRepository;
       this.storageManager = storageManager;
-      this.syncNonTransactional = syncNonTransactional;
    }
 
    // Public
@@ -150,28 +142,6 @@
       pagingStoreFactory.setPostOffice(postOffice);
    }
 
-   public boolean isPaging(final SimpleString destination) throws Exception
-   {
-      return getPageStore(destination).isPaging();
-   }
-
-   public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection) throws Exception
-   {
-      // The sync on transactions is done on commit only
-      return getPageStore(message.getDestination()).page(new PagedMessageImpl(message, transactionId),
-                                                         false,
-                                                         duplicateDetection);
-   }
-
-   public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
-   {
-      // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
-      // of crash
-      return getPageStore(message.getDestination()).page(new PagedMessageImpl(message),
-                                                         syncNonTransactional && message.isDurable(),
-                                                         duplicateDetection);
-   }
-
    public void addTransaction(final PageTransactionInfo pageTransaction)
    {
       transactions.put(pageTransaction.getTransactionID(), pageTransaction);
@@ -187,14 +157,6 @@
       return transactions.get(id);
    }
 
-   public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
-   {
-      for (SimpleString destination : destinationsToSync)
-      {
-         getPageStore(destination).sync();
-      }
-   }
-
    // HornetQComponent implementation
    // ------------------------------------------------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -57,6 +57,8 @@
    private final String directory;
 
    private final ExecutorFactory executorFactory;
+   
+   protected final boolean syncNonTransactional;
 
    private PagingManager pagingManager;
 
@@ -68,11 +70,14 @@
 
    // Constructors --------------------------------------------------
 
-   public PagingStoreFactoryNIO(final String directory, final ExecutorFactory executorFactory)
+   public PagingStoreFactoryNIO(final String directory, final ExecutorFactory executorFactory,
+                                final boolean syncNonTransactional)
    {
       this.directory = directory;
 
       this.executorFactory = executorFactory;
+      
+      this.syncNonTransactional = syncNonTransactional;
    }
 
    // Public --------------------------------------------------------
@@ -81,17 +86,19 @@
    {
    }
 
-   public synchronized PagingStore newStore(final SimpleString destinationName, final AddressSettings settings) throws Exception
+   public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) throws Exception
    {
 
-      return new PagingStoreImpl(pagingManager,
+      return new PagingStoreImpl(address,
+                                 pagingManager,
                                  storageManager,
                                  postOffice,
                                  null,
                                  this,
-                                 destinationName,
+                                 address,
                                  settings,
-                                 executorFactory.getExecutor());
+                                 executorFactory.getExecutor(),
+                                 syncNonTransactional);
    }
 
    /**
@@ -168,31 +175,33 @@
 
             BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(addressFile)));
 
-            String destination;
+            String addressString;
             
             try
             {
-               destination = reader.readLine();
+               addressString = reader.readLine();
             }
             finally
             {
                reader.close();
             }
 
-            SimpleString destinationName = new SimpleString(destination);
+            SimpleString address = new SimpleString(addressString);
 
             SequentialFileFactory factory = newFileFactory(guid);
 
-            AddressSettings settings = addressSettingsRepository.getMatch(destinationName.toString());
+            AddressSettings settings = addressSettingsRepository.getMatch(address.toString());
 
-            PagingStore store = new PagingStoreImpl(pagingManager,
+            PagingStore store = new PagingStoreImpl(address,
+                                                    pagingManager,
                                                     storageManager,
                                                     postOffice,
                                                     factory,
                                                     this,
-                                                    destinationName,
+                                                    address,
                                                     settings,
-                                                    executorFactory.getExecutor());
+                                                    executorFactory.getExecutor(),
+                                                    syncNonTransactional);
 
             storesReturn.add(store);
          }

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -64,6 +64,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private final SimpleString address;
+
    private final StorageManager storageManager;
 
    private final PostOffice postOffice;
@@ -110,15 +112,17 @@
     * We need to perform checks on currentPage with minimal locking
     * */
    private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
-   
+
    private final ServerProducerCreditManager creditManager;
-   
+
    private boolean exceededAvailableCredits;
-   
+
    private volatile boolean running = false;
-   
+
    private final AtomicLong availableProducerCredits = new AtomicLong(0);
-   
+
+   protected final boolean syncNonTransactional;
+
    // Static --------------------------------------------------------
 
    private static final boolean isTrace = log.isTraceEnabled();
@@ -133,20 +137,24 @@
 
    // Constructors --------------------------------------------------
 
-   public PagingStoreImpl(final PagingManager pagingManager,
+   public PagingStoreImpl(final SimpleString address,
+                          final PagingManager pagingManager,
                           final StorageManager storageManager,
                           final PostOffice postOffice,
                           final SequentialFileFactory fileFactory,
                           final PagingStoreFactory storeFactory,
                           final SimpleString storeName,
                           final AddressSettings addressSettings,
-                          final Executor executor)
+                          final Executor executor,
+                          final boolean syncNonTransactional)
    {
       if (pagingManager == null)
       {
          throw new IllegalStateException("Paging Manager can't be null");
       }
 
+      this.address = address;
+
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
@@ -166,25 +174,26 @@
       this.fileFactory = fileFactory;
 
       this.storeFactory = storeFactory;
-      
+
       this.availableProducerCredits.set(maxSize);
-      
-      this.creditManager = new ServerProducerCreditManagerImpl(this);            
+
+      this.creditManager = new ServerProducerCreditManagerImpl(this);
+
+      this.syncNonTransactional = syncNonTransactional;
    }
 
    // Public --------------------------------------------------------
 
    // PagingStore implementation ------------------------------------
 
-   public long getAddressSize()
+   public SimpleString getAddress()
    {
-      return sizeInBytes.get();
+      return address;
    }
 
-   /** Maximum number of bytes allowed in memory */
-   public long getMaxSizeBytes()
+   public long getAddressSize()
    {
-      return maxSize;
+      return sizeInBytes.get();
    }
 
    public AddressFullMessagePolicy getAddressFullMessagePolicy()
@@ -200,6 +209,7 @@
    public boolean isPaging()
    {
       currentPageLock.readLock().lock();
+      
       try
       {
          if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
@@ -226,27 +236,27 @@
    {
       return storeName;
    }
-   
+
    public boolean isExceededAvailableCredits()
    {
       return exceededAvailableCredits;
    }
-   
+
    public synchronized int getAvailableProducerCredits(final int credits)
-   {           
+   {
       if (maxSize != -1 && addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
-      {        
+      {
          long avail = availableProducerCredits.get();
-                           
+
          if (avail > 0)
          {
             long take = Math.min(avail, credits);
-            
+
             availableProducerCredits.addAndGet(-take);
-                        
+
             return (int)take;
          }
-         
+
          return 0;
       }
       else
@@ -254,243 +264,62 @@
          return credits;
       }
    }
-   
+
    public void returnProducerCredits(final int credits)
    {
       checkReleaseProducerFlowControlCredits(-credits);
    }
+
    
-   private synchronized void checkReleaseProducerFlowControlCredits(final long size)
+   public void addSize(final ServerMessage message, final boolean add) throws Exception
    {
-      if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
-      {         
-         long avail = availableProducerCredits.addAndGet(-size);
-         
-         if (avail > 0)
-         {
-            int used = creditManager.creditsReleased((int)avail);
-          
-            long num = availableProducerCredits.addAndGet(-used);
-            
-            if (num < 0)
-            {
-               log.warn("Available credits has gone negative");
-               
-               exceededAvailableCredits = true;
-            }
-         }     
-      }
-   }
-   
-   public void addSize(final ServerMessage message, final boolean add) throws Exception
-   {            
       long size = message.getMemoryEstimate();
-      
+
       if (add)
       {
          checkReleaseProducerFlowControlCredits(size);
-         
+
          addSize(size);
       }
       else
       {
          checkReleaseProducerFlowControlCredits(-size);
-         
+
          addSize(-size);
       }
    }
-         
+
    public void addSize(final MessageReference reference, final boolean add) throws Exception
    {
       long size = reference.getMemoryEstimate();
-      
+
       if (add)
       {
          checkReleaseProducerFlowControlCredits(size);
-         
+
          addSize(size);
       }
       else
       {
          checkReleaseProducerFlowControlCredits(-size);
-         
+
          addSize(-size);
       }
    }
    
-   private void addSize(final long size) throws Exception
-   {          
-      if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
-      {
-         addAddressSize(size);
-         
-         pagingManager.addSize(size);
-
-         return;
-      }
-      else
-      {
-         pagingManager.addSize(size);
-
-         final long addressSize = addAddressSize(size);
-
-         if (size > 0)
-         {
-            if (maxSize > 0 && addressSize > maxSize)
-            {
-               if (startPaging())
-               {
-                  if (isTrace)
-                  {
-                     trace("Starting paging on " + getStoreName() + ", size = " + addressSize + ", maxSize=" + maxSize);
-                  }
-               }
-            }
-         }
-         else
-         {
-            // When in Global mode, we use the default page size as the mark to start depage
-            if (maxSize > 0 && currentPage != null && addressSize <= maxSize - pageSize && !depaging.get())
-            {
-               if (startDepaging())
-               {
-                  if (isTrace)
-                  {
-                     trace("Starting depaging Thread, size = " + addressSize);
-                  }
-               }
-            }
-         }
-
-         return;
-      }
+   public boolean page(final ServerMessage message, final long transactionID, final boolean duplicateDetection) throws Exception
+   {
+      // The sync on transactions is done on commit only
+      return page(message, transactionID, false, duplicateDetection);
    }
 
-   // TODO all of this can be simplified
-   public boolean page(final PagedMessage message, final boolean sync, final boolean duplicateDetection) throws Exception
+   public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
    {
-      if (!running)
-      {
-         throw new IllegalStateException("PagingStore(" + getStoreName() + ") not initialized");
-      }
-      
-      boolean full = isFull();
-
-      if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
-      {
-         if (full)
-         {
-            if (!printedDropMessagesWarning)
-            {
-               printedDropMessagesWarning = true;
-
-               log.warn("Messages are being dropped on address " + getStoreName());
-            }
-
-            // Address is full, we just pretend we are paging, and drop the data
-            return true;
-         }
-         else
-         {
-            return false;
-         }
-      }
-      else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
-      {
-         return false;
-      }
-
-      // We need to ensure a read lock, as depage could change the paging state
-      currentPageLock.readLock().lock();
-
-      try
-      {
-         // First check done concurrently, to avoid synchronization and increase throughput
-         if (currentPage == null)
-         {
-            return false;
-         }
-      }
-      finally
-      {
-         currentPageLock.readLock().unlock();
-      }
-
-      writeLock.lock();
-
-      try
-      {
-         if (currentPage == null)
-         {
-            return false;
-         }
-
-         if (duplicateDetection)
-         {
-            // We set the duplicate detection header to prevent the message being depaged more than once in case of
-            // failure during depage
-
-            ServerMessage msg = message.getMessage(storageManager);
-
-            byte[] bytes = new byte[8];
-
-            ByteBuffer buff = ByteBuffer.wrap(bytes);
-
-            buff.putLong(msg.getMessageID());
-
-            msg.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
-         }
-
-         int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
-
-         if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
-         {
-            // Make sure nothing is currently validating or using currentPage
-            currentPageLock.writeLock().lock();
-            try
-            {
-               openNewPage();
-
-               // openNewPage will set currentPageSize to zero, we need to set it again
-               currentPageSize.addAndGet(bytesToWrite);
-            }
-            finally
-            {
-               currentPageLock.writeLock().unlock();
-            }
-         }
-
-         currentPageLock.readLock().lock();
-
-         try
-         {
-            if (currentPage != null)
-            {
-               currentPage.write(message);
-
-               if (sync)
-               {
-                  currentPage.sync();
-               }
-               return true;
-            }
-            else
-            {
-               return false;
-            }
-         }
-         finally
-         {
-            currentPageLock.readLock().unlock();
-         }
-      }
-      finally
-      {
-         writeLock.unlock();
-      }
-
+      // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
+      // of crash
+      return page(message, -1, syncNonTransactional && message.isDurable(), duplicateDetection);
    }
-
+   
    public void sync() throws Exception
    {
       currentPageLock.readLock().lock();
@@ -685,43 +514,12 @@
       }
    }
 
-   /**
-    * Depage one page-file, read it and send it to the pagingManager / postoffice
-    * @return
-    * @throws Exception
-    */
-   public boolean readPage() throws Exception
-   {
-      Page page = depage();
-
-      if (page == null)
-      {
-         return false;
-      }
-
-      page.open();
-
-      List<PagedMessage> messages = page.read();
-
-      if (onDepage(page.getPageId(), storeName, messages))
-      {
-         page.delete();
-         
-         return true;
-      }
-      else
-      {
-         return false;
-      }
-
-   }
-
+   
    public Page getCurrentPage()
    {
       return currentPage;
    }
 
-   
    public Page createPage(final int page) throws Exception
    {
       String fileName = createFileName(page);
@@ -748,13 +546,12 @@
 
       return new PageImpl(this.storeName, storageManager, fileFactory, file, page);
    }
-   
+
    public ServerProducerCreditManager getProducerCreditManager()
    {
       return creditManager;
    }
 
-   
    // TestSupportPageStore ------------------------------------------
 
    public void forceAnotherPage() throws Exception
@@ -851,6 +648,242 @@
    // Private -------------------------------------------------------
 
    /**
+    * Depage one page-file, read it and send it to the pagingManager / postoffice
+    * @return
+    * @throws Exception
+    */
+   private boolean readPage() throws Exception
+   {
+      Page page = depage();
+
+      if (page == null)
+      {
+         return false;
+      }
+
+      page.open();
+
+      List<PagedMessage> messages = page.read();
+
+      if (onDepage(page.getPageId(), storeName, messages))
+      {
+         page.delete();
+
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+
+   }
+
+   
+   private synchronized void checkReleaseProducerFlowControlCredits(final long size)
+   {
+      if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
+      {
+         long avail = availableProducerCredits.addAndGet(-size);
+
+         if (avail > 0)
+         {
+            int used = creditManager.creditsReleased((int)avail);
+
+            long num = availableProducerCredits.addAndGet(-used);
+
+            if (num < 0)
+            {
+               log.warn("Available credits has gone negative");
+
+               exceededAvailableCredits = true;
+            }
+         }
+      }
+   }
+
+   
+   private void addSize(final long size) throws Exception
+   {
+      if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
+      {
+         addAddressSize(size);
+
+         pagingManager.addSize(size);
+
+         return;
+      }
+      else
+      {
+         pagingManager.addSize(size);
+
+         final long addressSize = addAddressSize(size);
+
+         if (size > 0)
+         {
+            if (maxSize > 0 && addressSize > maxSize)
+            {
+               if (startPaging())
+               {
+                  if (isTrace)
+                  {
+                     trace("Starting paging on " + getStoreName() + ", size = " + addressSize + ", maxSize=" + maxSize);
+                  }
+               }
+            }
+         }
+         else
+         {
+            // When in Global mode, we use the default page size as the mark to start depage
+            if (maxSize > 0 && currentPage != null && addressSize <= maxSize - pageSize && !depaging.get())
+            {
+               if (startDepaging())
+               {
+                  if (isTrace)
+                  {
+                     trace("Starting depaging Thread, size = " + addressSize);
+                  }
+               }
+            }
+         }
+
+         return;
+      }
+   }
+   
+   private boolean page(final ServerMessage message, final long transactionID, final boolean sync, final boolean duplicateDetection) throws Exception
+   {
+      if (!running)
+      {
+         throw new IllegalStateException("PagingStore(" + getStoreName() + ") not initialized");
+      }
+
+      boolean full = isFull();
+
+      if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
+      {
+         if (full)
+         {
+            if (!printedDropMessagesWarning)
+            {
+               printedDropMessagesWarning = true;
+
+               log.warn("Messages are being dropped on address " + getStoreName());
+            }
+
+            // Address is full, we just pretend we are paging, and drop the data
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+      else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
+      {
+         return false;
+      }
+
+      // We need to ensure a read lock, as depage could change the paging state
+      currentPageLock.readLock().lock();
+
+      try
+      {
+         // First check done concurrently, to avoid synchronization and increase throughput
+         if (currentPage == null)
+         {
+            return false;
+         }
+      }
+      finally
+      {
+         currentPageLock.readLock().unlock();
+      }
+
+      writeLock.lock();
+
+      try
+      {
+         if (currentPage == null)
+         {
+            return false;
+         }
+
+         if (duplicateDetection)
+         {
+            // We set the duplicate detection header to prevent the message being depaged more than once in case of
+            // failure during depage
+            
+            byte[] bytes = new byte[8];
+
+            ByteBuffer buff = ByteBuffer.wrap(bytes);
+
+            buff.putLong(message.getMessageID());
+
+            message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
+         }
+
+         int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
+
+         if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+         {
+            // Make sure nothing is currently validating or using currentPage
+            currentPageLock.writeLock().lock();
+            try
+            {
+               openNewPage();
+
+               // openNewPage will set currentPageSize to zero, we need to set it again
+               currentPageSize.addAndGet(bytesToWrite);
+            }
+            finally
+            {
+               currentPageLock.writeLock().unlock();
+            }
+         }
+
+         currentPageLock.readLock().lock();
+
+         try
+         {
+            if (currentPage != null)
+            {
+               PagedMessage pagedMessage;
+               
+               if (transactionID != -1)
+               {
+                  pagedMessage = new PagedMessageImpl(message, transactionID);
+               }
+               else
+               {
+                  pagedMessage = new PagedMessageImpl(message);
+               }
+               
+               currentPage.write(pagedMessage);
+
+               if (sync)
+               {
+                  currentPage.sync();
+               }
+               return true;
+            }
+            else
+            {
+               return false;
+            }
+         }
+         finally
+         {
+            currentPageLock.readLock().unlock();
+         }
+      }
+      finally
+      {
+         writeLock.unlock();
+      }
+
+   }
+   
+   /**
     * This method will remove files from the page system and and route them, doing it transactionally
     *     
     * If persistent messages are also used, it will update eventual PageTransactions
@@ -868,13 +901,12 @@
          // nothing to be done on this case.
          return true;
       }
-      
 
       // Depage has to be done atomically, in case of failure it should be
       // back to where it was
 
       Transaction depageTransaction = new TransactionImpl(storageManager);
-      
+
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 
       HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
@@ -882,13 +914,14 @@
       for (PagedMessage pagedMessage : pagedMessages)
       {
          ServerMessage message = pagedMessage.getMessage(storageManager);
-         
+
          if (message.isLargeMessage())
          {
             LargeServerMessage largeMsg = (LargeServerMessage)message;
             if (!largeMsg.isFileExists())
             {
-               log.warn("File for large message " + largeMsg.getMessageID() + " doesn't exist, so ignoring depage for this large message");
+               log.warn("File for large message " + largeMsg.getMessageID() +
+                        " doesn't exist, so ignoring depage for this large message");
                continue;
             }
          }
@@ -931,7 +964,7 @@
                if (isTrace)
                {
                   trace("Rollback was called after prepare, ignoring message " + message);
-               }               
+               }
                continue;
             }
 
@@ -970,8 +1003,8 @@
       }
 
       depageTransaction.commit();
-      
-      // StorageManager does the check: if (replicated) -> do the proper cleanup already 
+
+      // StorageManager does the check: if (replicated) -> do the proper cleanup already
       storageManager.completeReplication();
 
       if (isTrace)
@@ -987,7 +1020,7 @@
     */
    private boolean isAddressFull(final long nextPageSize)
    {
-      return getMaxSizeBytes() > 0 && getAddressSize() + nextPageSize > getMaxSizeBytes();
+      return maxSize > 0 && getAddressSize() + nextPageSize > maxSize;
    }
 
    private long addAddressSize(final long delta)
@@ -1012,7 +1045,7 @@
                " addressSize = " +
                this.getAddressSize() +
                " addressMax " +
-               this.getMaxSizeBytes() +
+               maxSize +
                " isPaging = " +
                isPaging() +
                " addressFull = " +
@@ -1050,9 +1083,8 @@
          {
             currentPage.close();
          }
-         
+
          currentPage = createPage(currentPageId);
-         
 
          currentPageSize.set(0);
 
@@ -1084,7 +1116,7 @@
    // To be used on isDropMessagesWhenFull
    private boolean isFull()
    {
-      return getMaxSizeBytes() > 0 && getAddressSize() > getMaxSizeBytes();
+      return maxSize > 0 && getAddressSize() > maxSize;
    }
 
    // Inner classes -------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/paging/impl/TestSupportPageStore.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -37,4 +37,9 @@
    void forceAnotherPage() throws Exception;
    
    boolean isExceededAvailableCredits();
+   
+   /** @return true if paging was started, or false if paging was already started before this call */
+   boolean startPaging() throws Exception;
+   
+   Page getCurrentPage();         
 }

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -20,6 +20,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -598,18 +599,16 @@
 
       if (context.getTransaction() == null)
       {
-         if (pagingManager.page(message, true))
+         if (message.page(true))
          {
             return;
          }
       }
       else
       {
-         SimpleString destination = message.getDestination();
-
          boolean depage = context.getTransaction().getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
 
-         if (!depage && pagingManager.isPaging(destination))
+         if (!depage && message.storeIsPaging())
          {
             getPageOperation(context.getTransaction()).addMessageToPage(message);
 
@@ -1149,20 +1148,20 @@
 
             boolean pagingPersistent = false;
 
-            HashSet<SimpleString> pagedDestinationsToSync = new HashSet<SimpleString>();
+            Set<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
 
             // We only need to add the dupl id header once per transaction
             boolean first = true;
             for (ServerMessage message : messagesToPage)
             {
-               if (pagingManager.page(message, tx.getID(), first))
+               if (message.page(tx.getID(), first))
                {
                   if (message.isDurable())
                   {
                      // We only create pageTransactions if using persistent messages
                      pageTransaction.increment();
                      pagingPersistent = true;
-                     pagedDestinationsToSync.add(message.getDestination());
+                     pagingStoresToSync.add(message.getPagingStore());
                   }
                }
                else
@@ -1179,9 +1178,13 @@
             {
                tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
 
-               if (!pagedDestinationsToSync.isEmpty())
+               if (!pagingStoresToSync.isEmpty())
                {
-                  pagingManager.sync(pagedDestinationsToSync);
+                  for (PagingStore store: pagingStoresToSync)
+                  {
+                     store.sync();
+                  }
+
                   storageManager.storePageTransaction(tx.getID(), pageTransaction);
                }
             }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -200,10 +200,10 @@
       journalLoadInformation = storage.loadInternalOnly();
 
       pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
-                                                                    server.getExecutorFactory()),
+                                                                    server.getExecutorFactory(),
+                                                                    config.isJournalSyncNonTransactional()),
                                           storage,
-                                          server.getAddressSettingsRepository(),
-                                          false);
+                                          server.getAddressSettingsRepository());
 
       pageManager.start();
 

Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -52,4 +52,14 @@
    ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
    
    void setOriginalHeaders(ServerMessage other, boolean expiry);   
+   
+   void setPagingStore(PagingStore store);
+   
+   PagingStore getPagingStore();
+   
+   boolean page(boolean duplicateDetection) throws Exception;
+   
+   boolean page(long transactionID, boolean duplicateDetection) throws Exception;
+   
+   boolean storeIsPaging();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/DeliveryImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -9,7 +9,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  * implied.  See the License for the specific language governing
  * permissions and limitations under the License.
- */ 
+ */
 
 package org.hornetq.core.server.impl;
 
@@ -24,25 +24,24 @@
  *
  */
 public class DeliveryImpl implements Delivery
-{ 
+{
    private final long consumerID;
-   
+
    private final MessageReference reference;
-        
-   public DeliveryImpl(final long consumerID,
-                       final MessageReference reference)
-   {      
+
+   public DeliveryImpl(final long consumerID, final MessageReference reference)
+   {
       this.consumerID = consumerID;
-      this.reference = reference;      
+      this.reference = reference;
    }
 
    public long getConsumerID()
    {
       return consumerID;
    }
-   
+
    public MessageReference getReference()
    {
       return reference;
-   }     
+   }
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -28,12 +28,12 @@
 
    protected final List<Consumer> consumers = new ArrayList<Consumer>();
 
-   public void addConsumer(Consumer consumer)
+   public void addConsumer(final Consumer consumer)
    {
       consumers.add(consumer);
    }
 
-   public boolean removeConsumer(Consumer consumer)
+   public boolean removeConsumer(final Consumer consumer)
    {
       return consumers.remove(consumer);
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -50,7 +50,7 @@
    private final Filter filter;
 
    private final Transformer transformer;
-   
+
    public DivertImpl(final SimpleString forwardAddress,
                      final SimpleString uniqueName,
                      final SimpleString routingName,
@@ -74,19 +74,19 @@
       this.postOffice = postOffice;
    }
 
-   public void route(ServerMessage message, final RoutingContext context) throws Exception
-   {      
+   public void route(final ServerMessage message, final RoutingContext context) throws Exception
+   {
       SimpleString originalDestination = message.getDestination();
-     
-      //We must make a copy of the message, otherwise things like returning credits to the page won't work
-      //properly on ack, since the original destination will be overwritten
-      
-      //TODO we can optimise this so it doesn't copy if it's not routed anywhere else
-     
+
+      // We must make a copy of the message, otherwise things like returning credits to the page won't work
+      // properly on ack, since the original destination will be overwritten
+
+      // TODO we can optimise this so it doesn't copy if it's not routed anywhere else
+
       ServerMessage copy = message.copy();
-      
+
       copy.setDestination(forwardAddress);
-      
+
       copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
 
       if (transformer != null)
@@ -94,7 +94,7 @@
          copy = transformer.transform(copy);
       }
 
-      postOffice.route(copy, context.getTransaction());          
+      postOffice.route(copy, context.getTransaction());
    }
 
    public SimpleString getRoutingName()
@@ -111,7 +111,7 @@
    {
       return exclusive;
    }
-   
+
    public Filter getFilter()
    {
       return filter;

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -60,9 +60,9 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
@@ -93,14 +93,14 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.QueueFactory;
 import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.cluster.ClusterManager;
+import org.hornetq.core.server.cluster.Transformer;
+import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
 import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.core.server.group.impl.LocalGroupingHandler;
 import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
-import org.hornetq.core.server.group.impl.GroupBinding;
-import org.hornetq.core.server.cluster.ClusterManager;
-import org.hornetq.core.server.cluster.Transformer;
-import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
@@ -221,7 +221,7 @@
       this(configuration, null, null);
    }
 
-   public HornetQServerImpl(final Configuration configuration, MBeanServer mbeanServer)
+   public HornetQServerImpl(final Configuration configuration, final MBeanServer mbeanServer)
    {
       this(configuration, mbeanServer, null);
    }
@@ -256,7 +256,7 @@
 
       this.securityManager = securityManager;
 
-      this.addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
+      addressSettingsRepository = new HierarchicalObjectRepository<AddressSettings>();
 
       addressSettingsRepository.setDefault(new AddressSettings());
    }
@@ -289,8 +289,8 @@
       {
          if (!configuration.isSharedStore())
          {
-            this.replicationEndpoint = new ReplicationEndpointImpl(this);
-            this.replicationEndpoint.start();
+            replicationEndpoint = new ReplicationEndpointImpl(this);
+            replicationEndpoint.start();
          }
          // We defer actually initialisation until the live node has contacted the backup
          log.info("Backup server initialised");
@@ -529,8 +529,8 @@
    }
 
    public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
-                                                                      final String name,
-                                                                      final int lastReceivedCommandID) throws Exception
+                                                         final String name,
+                                                         final int lastReceivedCommandID) throws Exception
    {
       if (!started)
       {
@@ -580,17 +580,17 @@
    }
 
    public CreateSessionResponseMessage createSession(final String name,
-                                                                  final long channelID,
-                                                                  final String username,
-                                                                  final String password,
-                                                                  final int minLargeMessageSize,
-                                                                  final int incrementingVersion,
-                                                                  final RemotingConnection connection,
-                                                                  final boolean autoCommitSends,
-                                                                  final boolean autoCommitAcks,
-                                                                  final boolean preAcknowledge,
-                                                                  final boolean xa,
-                                                                  final int sendWindowSize) throws Exception
+                                                     final long channelID,
+                                                     final String username,
+                                                     final String password,
+                                                     final int minLargeMessageSize,
+                                                     final int incrementingVersion,
+                                                     final RemotingConnection connection,
+                                                     final boolean autoCommitSends,
+                                                     final boolean autoCommitAcks,
+                                                     final boolean preAcknowledge,
+                                                     final boolean xa,
+                                                     final int sendWindowSize) throws Exception
    {
       if (!started)
       {
@@ -607,14 +607,16 @@
                   ". " +
                   "Please ensure all clients and servers are upgraded to the same version for them to " +
                   "interoperate properly");
-         throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS, "Server and client versions incompatible");
+         throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
+                                    "Server and client versions incompatible");
       }
 
       if (!checkActivate())
       {
          // Backup server is not ready to accept connections
 
-         throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED, "Server will not accept create session requests");
+         throw new HornetQException(HornetQException.SESSION_CREATION_REJECTED,
+                                    "Server will not accept create session requests");
       }
 
       if (securityStore != null)
@@ -674,11 +676,11 @@
 
       if (replicationEndpoint.getChannel() != null)
       {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup replication server is already connected to another server");
+         throw new HornetQException(HornetQException.ILLEGAL_STATE,
+                                    "Backup replication server is already connected to another server");
       }
-      
+
       replicationEndpoint.setChannel(channel);
-      
 
       return replicationEndpoint;
    }
@@ -713,7 +715,7 @@
       return new HashSet<ServerSession>(sessions.values());
    }
 
-   //TODO - should this really be here?? It's only used in tests
+   // TODO - should this really be here?? It's only used in tests
    public boolean isInitialised()
    {
       synchronized (initialiseLock)
@@ -748,19 +750,19 @@
    }
 
    public Queue createQueue(final SimpleString address,
-                                         final SimpleString queueName,
-                                         final SimpleString filterString,
-                                         final boolean durable,
-                                         final boolean temporary) throws Exception
+                            final SimpleString queueName,
+                            final SimpleString filterString,
+                            final boolean durable,
+                            final boolean temporary) throws Exception
    {
       return createQueue(address, queueName, filterString, durable, temporary, false);
    }
 
    public Queue deployQueue(final SimpleString address,
-                                         final SimpleString queueName,
-                                         final SimpleString filterString,
-                                         final boolean durable,
-                                         final boolean temporary) throws Exception
+                            final SimpleString queueName,
+                            final SimpleString filterString,
+                            final boolean durable,
+                            final boolean temporary) throws Exception
    {
       return createQueue(address, queueName, filterString, durable, temporary, true);
    }
@@ -864,10 +866,11 @@
 
    protected PagingManager createPagingManager()
    {
-      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
+      return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                             executorFactory,
+                                                             configuration.isJournalSyncNonTransactional()),
                                    storageManager,
-                                   addressSettingsRepository,
-                                   configuration.isJournalSyncNonTransactional());
+                                   addressSettingsRepository);
    }
 
    /** for use on sub-classes */
@@ -912,7 +915,8 @@
 
             replicationFailoverManager = createBackupConnection(backupConnector, threadPool, scheduledPool);
 
-            this.replicationManager = new ReplicationManagerImpl(replicationFailoverManager, configuration.getBackupWindowSize());
+            replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+                                                            configuration.getBackupWindowSize());
             replicationManager.start();
          }
       }
@@ -1008,7 +1012,7 @@
 
       startReplication();
 
-      this.storageManager = createStorageManager();
+      storageManager = createStorageManager();
 
       securityRepository = new HierarchicalObjectRepository<Set<Role>>();
       securityRepository.setDefault(new HashSet<Role>());
@@ -1093,13 +1097,12 @@
          }
       }
       deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
-      
+
       // Load the journal and populate queues, transactions and caches in memory
       JournalLoadInformation[] journalInfo = loadJournals();
-      
+
       compareJournals(journalInfo);
 
-
       // Deploy any queues in the Configuration class - if there's no file deployment we still need
       // to load those
       deployQueuesFromConfiguration();
@@ -1163,7 +1166,7 @@
    /**
     * @param journalInfo
     */
-   private void compareJournals(JournalLoadInformation[] journalInfo) throws Exception
+   private void compareJournals(final JournalLoadInformation[] journalInfo) throws Exception
    {
       if (replicationManager != null)
       {
@@ -1185,7 +1188,7 @@
    private JournalLoadInformation[] loadJournals() throws Exception
    {
       JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
-      
+
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
 
       List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
@@ -1230,7 +1233,11 @@
 
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
-      journalInfo[1] = storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queues, duplicateIDMap);
+      journalInfo[1] = storageManager.loadMessageJournal(postOffice,
+                                                         pagingManager,
+                                                         resourceManager,
+                                                         queues,
+                                                         duplicateIDMap);
 
       for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
       {
@@ -1243,7 +1250,7 @@
             cache.load(entry.getValue());
          }
       }
-      
+
       return journalInfo;
    }
 
@@ -1405,10 +1412,10 @@
                                                         config.getName(),
                                                         config.getAddress(),
                                                         config.getTimeout());
-         }        
-         
+         }
+
          this.groupingHandler = groupingHandler;
-         
+
          managementService.addNotificationListener(groupingHandler);
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -67,6 +67,7 @@
             addressSettingsRepository);
    }
 
+   @Override
    public synchronized void add(final MessageReference ref, final boolean first)
    {
       SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
@@ -76,7 +77,7 @@
          HolderReference hr = map.get(prop);
 
          if (!first)
-         {            
+         {
             if (hr != null)
             {
                // We need to overwrite the old ref with the new one and ack the old one
@@ -169,7 +170,7 @@
          this.ref = ref;
       }
 
-      public MessageReference copy(Queue queue)
+      public MessageReference copy(final Queue queue)
       {
          return ref.copy(queue);
       }
@@ -209,12 +210,12 @@
          ref.incrementDeliveryCount();
       }
 
-      public void setDeliveryCount(int deliveryCount)
+      public void setDeliveryCount(final int deliveryCount)
       {
          ref.setDeliveryCount(deliveryCount);
       }
 
-      public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+      public void setScheduledDeliveryTime(final long scheduledDeliveryTime)
       {
          ref.setScheduledDeliveryTime(scheduledDeliveryTime);
       }

Modified: trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -28,79 +28,80 @@
 public class MemoryManagerImpl implements MemoryManager
 {
    private static final Logger log = Logger.getLogger(MemoryManagerImpl.class);
-   
-   private Runtime runtime;
-   
-   private long measureInterval;
-   
-   private int memoryWarningThreshold;
-   
+
+   private final Runtime runtime;
+
+   private final long measureInterval;
+
+   private final int memoryWarningThreshold;
+
    private volatile boolean started;
-   
+
    private Thread thread;
-   
+
    private volatile boolean low;
-   
-   public MemoryManagerImpl(int memoryWarningThreshold, long measureInterval)
+
+   public MemoryManagerImpl(final int memoryWarningThreshold, final long measureInterval)
    {
       runtime = Runtime.getRuntime();
-      
+
       this.measureInterval = measureInterval;
-      
-      this.memoryWarningThreshold = memoryWarningThreshold;    
+
+      this.memoryWarningThreshold = memoryWarningThreshold;
    }
-   
+
    public boolean isMemoryLow()
    {
       return low;
    }
-    
+
    public synchronized boolean isStarted()
    {
       return started;
    }
-   
+
    public synchronized void start()
    {
-      log.debug("Starting MemoryManager with MEASURE_INTERVAL: " + measureInterval
-               + " FREE_MEMORY_PERCENT: " + memoryWarningThreshold);
-      
+      log.debug("Starting MemoryManager with MEASURE_INTERVAL: " + measureInterval +
+                " FREE_MEMORY_PERCENT: " +
+                memoryWarningThreshold);
+
       if (started)
       {
-         //Already started
+         // Already started
          return;
       }
-      
+
       started = true;
-      
+
       thread = new Thread(new MemoryRunnable());
-      
+
       thread.setDaemon(true);
-      
+
       thread.start();
    }
-   
+
    public synchronized void stop()
-   {      
+   {
       if (!started)
       {
-         //Already stopped
+         // Already stopped
          return;
       }
-      
+
       started = false;
-      
+
       thread.interrupt();
-      
+
       try
       {
          thread.join();
       }
       catch (InterruptedException ignore)
-      {         
+      {
       }
    }
-   
+
    private class MemoryRunnable implements Runnable
    {
       public void run()
@@ -113,7 +114,7 @@
                {
                   break;
                }
-               
+
                Thread.sleep(measureInterval);
             }
             catch (InterruptedException ignore)
@@ -123,17 +124,17 @@
                   break;
                }
             }
-                                  
+
             long maxMemory = runtime.maxMemory();
-            
+
             long totalMemory = runtime.totalMemory();
-            
+
             long freeMemory = runtime.freeMemory();
-            
-            long availableMemory = freeMemory + (maxMemory - totalMemory);
-                                    
-            double availableMemoryPercent = 100.0 * (double)availableMemory / maxMemory;
-            
+
+            long availableMemory = freeMemory + maxMemory - totalMemory;
+
+            double availableMemoryPercent = 100.0 * availableMemory / maxMemory;
+
             String info = "";
             info += String.format("free memory:      %s\n", SizeFormatterUtil.sizeof(freeMemory));
             info += String.format("max memory:       %s\n", SizeFormatterUtil.sizeof(maxMemory));
@@ -144,21 +145,22 @@
             {
                log.debug(info);
             }
-            
+
             if (availableMemoryPercent <= memoryWarningThreshold)
             {
-               log.warn("Less than " + memoryWarningThreshold + "%\n" 
-                        + info +
+               log.warn("Less than " + memoryWarningThreshold +
+                        "%\n" +
+                        info +
                         "\nYou are in danger of running out of RAM. Have you set paging parameters " +
                         "on your addresses? (See user manual \"Paging\" chapter)");
-               
+
                low = true;
             }
             else
             {
                low = false;
             }
-             
+
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -47,11 +47,11 @@
 
    public MessageReferenceImpl(final MessageReferenceImpl other, final Queue queue)
    {
-      this.deliveryCount = other.deliveryCount;
+      deliveryCount = other.deliveryCount;
 
-      this.scheduledDeliveryTime = other.scheduledDeliveryTime;
+      scheduledDeliveryTime = other.scheduledDeliveryTime;
 
-      this.message = other.message;
+      message = other.message;
 
       this.queue = queue;
    }
@@ -94,7 +94,7 @@
    {
       deliveryCount++;
    }
-   
+
    public void decrementDeliveryCount()
    {
       deliveryCount--;
@@ -119,7 +119,7 @@
    {
       return queue;
    }
-   
+
    public void handled()
    {
       queue.referenceHandled();
@@ -127,6 +127,7 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public String toString()
    {
       return "Reference[" + getMessage().getMessageID() +

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -72,15 +72,15 @@
       if (addressSettings.isLastValueQueue())
       {
          queue = new LastValueQueue(persistenceID,
-                                   address,
-                                   name,
-                                   filter,
-                                   durable,
-                                   temporary,
-                                   scheduledExecutor,
-                                   postOffice,
-                                   storageManager,
-                                   addressSettingsRepository);
+                                    address,
+                                    name,
+                                    filter,
+                                    durable,
+                                    temporary,
+                                    scheduledExecutor,
+                                    postOffice,
+                                    storageManager,
+                                    addressSettingsRepository);
       }
       else
       {

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -123,7 +123,7 @@
 
    private final ScheduledExecutorService scheduledExecutor;
 
-   private SimpleString address;
+   private final SimpleString address;
 
    private Redistributor redistributor;
 
@@ -137,7 +137,7 @@
 
    private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
 
-   private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
+   private final ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
 
    private volatile SimpleString expiryAddress;
 
@@ -877,7 +877,7 @@
    @Override
    public String toString()
    {
-      return "QueueImpl(name=" + this.name.toString() + ")";
+      return "QueueImpl(name=" + name.toString() + ")";
    }
 
    // Private
@@ -1402,7 +1402,8 @@
             // ack isn't committed, then the server crashes and on
             // recovery the message is deleted even though the other ack never committed
 
-            //also note then when this happens as part of a trasaction its the tx commt of the ack that is important not this
+            // also note then when this happens as part of a trasaction its the tx commt of the ack that is important
+            // not this
             try
             {
                storageManager.deleteMessage(message.getMessageID());
@@ -1434,7 +1435,7 @@
 
    }
 
-   void postRollback(LinkedList<MessageReference> refs) throws Exception
+   void postRollback(final LinkedList<MessageReference> refs) throws Exception
    {
       synchronized (this)
       {
@@ -1449,7 +1450,7 @@
       }
    }
 
-   private synchronized void initPagingStore(SimpleString destination)
+   private synchronized void initPagingStore(final SimpleString destination)
    {
       // PagingManager would be null only on testcases
       if (pagingStore == null && pagingManager != null)
@@ -1496,14 +1497,14 @@
          // Must be set to false *before* executing to avoid race
          waitingToDeliver.set(false);
 
-         QueueImpl.this.lockDelivery();
+         lockDelivery();
          try
          {
             deliver();
          }
          finally
          {
-            QueueImpl.this.unlockDelivery();
+            unlockDelivery();
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -43,6 +43,7 @@
       return super.removeConsumer(consumer);
    }
 
+   @Override
    public synchronized int getConsumerCount()
    {
       return super.getConsumerCount();
@@ -54,11 +55,11 @@
       incrementPosition();
       return consumer;
    }
-   
+
    private synchronized void incrementPosition()
    {
       pos++;
-      
+
       if (pos == consumers.size())
       {
          pos = 0;

Modified: trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -29,19 +29,19 @@
  */
 public class RoutingContextImpl implements RoutingContext
 {
-   private List<Queue> queues = new ArrayList<Queue>();
-   
+   private final List<Queue> queues = new ArrayList<Queue>();
+
    private Transaction transaction;
-   
+
    private int depth;
-   
+
    public RoutingContextImpl(final Transaction transaction)
    {
       this.transaction = transaction;
    }
 
    public void addQueue(final Queue queue)
-   {  
+   {
       queues.add(queue);
    }
 
@@ -49,12 +49,12 @@
    {
       return transaction;
    }
-   
+
    public void setTransaction(final Transaction tx)
    {
-      this.transaction = tx;
+      transaction = tx;
    }
-   
+
    public List<Queue> getQueues()
    {
       return queues;

Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -41,7 +41,7 @@
    private final ScheduledExecutorService scheduledExecutor;
 
    private final Map<Long, ScheduledDeliveryRunnable> scheduledRunnables = new LinkedHashMap<Long, ScheduledDeliveryRunnable>();
-   
+
    private boolean rescheduled;
 
    public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor)
@@ -67,13 +67,13 @@
             scheduledRunnables.put(ref.getMessage().getMessageID(), runnable);
          }
 
-         scheduleDelivery(runnable, deliveryTime);         
+         scheduleDelivery(runnable, deliveryTime);
 
          return true;
       }
       return false;
    }
-   
+
    public void reSchedule()
    {
       synchronized (scheduledRunnables)
@@ -84,7 +84,7 @@
             {
                scheduleDelivery(runnable, runnable.getReference().getScheduledDeliveryTime());
             }
-            
+
             rescheduled = true;
          }
       }
@@ -98,7 +98,7 @@
    public List<MessageReference> getScheduledReferences()
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
-      
+
       synchronized (scheduledRunnables)
       {
          for (ScheduledDeliveryRunnable scheduledRunnable : scheduledRunnables.values())
@@ -112,13 +112,13 @@
    public List<MessageReference> cancel()
    {
       List<MessageReference> refs = new ArrayList<MessageReference>();
-      
+
       synchronized (scheduledRunnables)
       {
          for (ScheduledDeliveryRunnable runnable : scheduledRunnables.values())
          {
             runnable.cancel();
-            
+
             refs.add(runnable.getReference());
          }
 
@@ -126,8 +126,8 @@
       }
       return refs;
    }
-   
-   public MessageReference removeReferenceWithID(long id)
+
+   public MessageReference removeReferenceWithID(final long id)
    {
       synchronized (scheduledRunnables)
       {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerInfo.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -34,7 +34,7 @@
 {
    private final HornetQServer server;
 
-   private PagingManager pagingManager;
+   private final PagingManager pagingManager;
 
    // Constants -----------------------------------------------------
 
@@ -57,8 +57,8 @@
       long maxMemory = Runtime.getRuntime().maxMemory();
       long totalMemory = Runtime.getRuntime().totalMemory();
       long freeMemory = Runtime.getRuntime().freeMemory();
-      long availableMemory = freeMemory + (maxMemory - totalMemory);                              
-      double availableMemoryPercent = 100.0 * (double)availableMemory / maxMemory;     
+      long availableMemory = freeMemory + maxMemory - totalMemory;
+      double availableMemoryPercent = 100.0 * availableMemory / maxMemory;
       ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
 
       String info = "\n**** Server Dump ****\n";
@@ -90,7 +90,9 @@
          try
          {
             pageStore = pagingManager.getPageStore(storeName);
-            info += String.format("\t%s: %s\n", storeName, SizeFormatterUtil.sizeof(pageStore.getPageSizeBytes() * pageStore.getNumberOfPages()));         
+            info += String.format("\t%s: %s\n",
+                                  storeName,
+                                  SizeFormatterUtil.sizeof(pageStore.getPageSizeBytes() * pageStore.getNumberOfPages()));
          }
          catch (Exception e)
          {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -45,6 +45,8 @@
    // We cache this
    private volatile int memoryEstimate = -1;
 
+   private PagingStore pagingStore;
+
    /*
     * Constructor for when reading from network
     */
@@ -88,7 +90,7 @@
       messageID = id;
    }
 
-   public void setType(byte type)
+   public void setType(final byte type)
    {
       this.type = type;
    }
@@ -103,34 +105,34 @@
    public int incrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
    {
       int count = refCount.incrementAndGet();
-      
+
       if (pagingStore != null)
       {
          if (count == 1)
          {
             pagingStore.addSize(this, true);
          }
-         
+
          pagingStore.addSize(reference, true);
       }
-      
+
       return count;
    }
-   
+
    public int decrementRefCount(final PagingStore pagingStore, final MessageReference reference) throws Exception
    {
       int count = refCount.decrementAndGet();
-      
+
       if (pagingStore != null)
       {
          if (count == 0)
          {
             pagingStore.addSize(this, false);
          }
-         
+
          pagingStore.addSize(reference, false);
       }
-      
+
       return count;
    }
 
@@ -144,8 +146,6 @@
       return durableRefCount.decrementAndGet();
    }
 
-   
-
    public int getRefCount()
    {
       return refCount.get();
@@ -158,7 +158,7 @@
 
    public long getLargeBodySize()
    {
-      return (long)getBodySize();
+      return getBodySize();
    }
 
    public int getMemoryEstimate()
@@ -200,18 +200,18 @@
       */
 
       ServerMessage copy = copy(newID);
-      
+
       copy.setOriginalHeaders(this, expiry);
 
       return copy;
    }
-   
+
    public void setOriginalHeaders(final ServerMessage other, final boolean expiry)
    {
       if (other.getProperty(HDR_ORIG_MESSAGE_ID) != null)
       {
          putStringProperty(HDR_ORIGINAL_DESTINATION, (SimpleString)other.getProperty(HDR_ORIGINAL_DESTINATION));
-         
+
          putLongProperty(HDR_ORIG_MESSAGE_ID, (Long)other.getProperty(HDR_ORIG_MESSAGE_ID));
       }
       else
@@ -222,18 +222,68 @@
 
          putLongProperty(HDR_ORIG_MESSAGE_ID, other.getMessageID());
       }
-      
+
       // reset expiry
       setExpiration(0);
-            
+
       if (expiry)
-      {         
+      {
          long actualExpiryTime = System.currentTimeMillis();
 
          putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
       }
    }
 
+   public void setPagingStore(final PagingStore pagingStore)
+   {
+      this.pagingStore = pagingStore;
+
+      // On the server side, we reset the address to point to the instance of address in the paging store
+      // Otherwise each message would have its own copy of the address String which would take up more memory
+      destination = pagingStore.getAddress();
+   }
+
+   public PagingStore getPagingStore()
+   {
+      return pagingStore;
+   }
+
+   public boolean page(final boolean duplicateDetection) throws Exception
+   {
+      if (pagingStore != null)
+      {
+         return pagingStore.page(this, duplicateDetection);
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   public boolean page(final long transactionID, final boolean duplicateDetection) throws Exception
+   {
+      if (pagingStore != null)
+      {
+         return pagingStore.page(this, transactionID, duplicateDetection);
+      }
+      else
+      {
+         return false;
+      }
+   }
+
+   public boolean storeIsPaging()
+   {
+      if (pagingStore != null)
+      {
+         return pagingStore.isPaging();
+      }
+      else
+      {
+         return false;
+      }
+   }
+
    @Override
    public String toString()
    {

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -15,7 +15,6 @@
 
 import org.hornetq.core.paging.PagingStore;
 
-
 /**
  * A ServerProducerCreditManager
  *
@@ -26,10 +25,10 @@
 public interface ServerProducerCreditManager
 {
    int creditsReleased(int credits);
-   
+
    int acquireCredits(int credits, CreditsAvailableRunnable runnable);
-   
+
    int waitingEntries();
-   
+
    PagingStore getStore();
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManagerImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -91,7 +91,7 @@
                entry.credits -= credits;
 
                boolean sent = sendCredits(entry.waiting, credits);
-               
+
                if (sent)
                {
                   credits = 0;

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -168,6 +168,8 @@
 
    private boolean closed;
 
+   private final Map<SimpleString, CreditManagerHolder> creditManagerHolders = new HashMap<SimpleString, CreditManagerHolder>();
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final String name,
@@ -190,7 +192,7 @@
                             final HornetQServer server,
                             final SimpleString managementAddress) throws Exception
    {
-      this.id = channel.getID();
+      id = channel.getID();
 
       this.username = username;
 
@@ -332,7 +334,7 @@
       }
    }
 
-   public void promptDelivery(final Queue queue, boolean async)
+   public void promptDelivery(final Queue queue, final boolean async)
    {
       if (async)
       {
@@ -368,17 +370,17 @@
          Filter filter = FilterImpl.createFilter(filterString);;
 
          ServerConsumer consumer = new ServerConsumerImpl(packet.getID(),
-                                                           this,
-                                                           (QueueBinding)binding,
-                                                           filter,
-                                                           started,
-                                                           browseOnly,
-                                                           storageManager,
-                                                           channel,
-                                                           preAcknowledge,
-                                                           updateDeliveries,
-                                                           executor,
-                                                           managementService);
+                                                          this,
+                                                          (QueueBinding)binding,
+                                                          filter,
+                                                          started,
+                                                          browseOnly,
+                                                          storageManager,
+                                                          channel,
+                                                          preAcknowledge,
+                                                          updateDeliveries,
+                                                          executor,
+                                                          managementService);
 
          consumers.put(consumer.getID(), consumer);
 
@@ -635,7 +637,7 @@
       sendResponse(packet, response, false, false);
    }
 
-   public void handleForceConsumerDelivery(SessionForceConsumerDelivery message)
+   public void handleForceConsumerDelivery(final SessionForceConsumerDelivery message)
    {
       try
       {
@@ -1401,7 +1403,7 @@
       }
       catch (Exception e)
       {
-         log.error("Failed to receive credits " + this.server.getConfiguration().isBackup(), e);
+         log.error("Failed to receive credits " + server.getConfiguration().isBackup(), e);
       }
 
       sendResponse(packet, null, false, false);
@@ -1412,13 +1414,10 @@
       // need to create the LargeMessage before continue
       long id = storageManager.generateUniqueID();
 
-      final LargeServerMessage msg = doCreateLargeMessage(id, packet);
+      LargeServerMessage msg = doCreateLargeMessage(id, packet);
 
       if (msg != null)
       {
-         // With a send we must make sure it is replicated to backup before being processed on live
-         // or can end up with delivery being processed on backup before original send
-
          if (currentLargeMessage != null)
          {
             log.warn("Replacing incomplete LargeMessage with ID=" + currentLargeMessage.getMessageID());
@@ -1438,6 +1437,8 @@
 
       try
       {
+         setPagingStore(message);
+
          long id = storageManager.generateUniqueID();
 
          message.setMessageID(id);
@@ -1499,22 +1500,22 @@
          {
             throw new HornetQException(HornetQException.ILLEGAL_STATE, "large-message not initialized on server");
          }
-         
-         //Immediately release the credits for the continuations- these don't contrinute to the in-memory size
-         //of the message
-         
+
+         // Immediately release the credits for the continuations- these don't contrinute to the in-memory size
+         // of the message
+
          releaseOutStanding(currentLargeMessage, packet.getRequiredBufferSize());
-         
+
          currentLargeMessage.addBytes(packet.getBody());
 
          if (!packet.isContinues())
-         {                        
+         {
             currentLargeMessage.releaseResources();
 
             send(currentLargeMessage);
 
             releaseOutStanding(currentLargeMessage, currentLargeMessage.getEncodeSize());
-            
+
             currentLargeMessage = null;
          }
 
@@ -1543,40 +1544,6 @@
       sendResponse(packet, response, false, false);
    }
 
-   private static final class CreditManagerHolder
-   {
-      CreditManagerHolder(final PagingStore store)
-      {
-         this.store = store;
-
-         this.manager = store.getProducerCreditManager();
-      }
-
-      final PagingStore store;
-
-      final ServerProducerCreditManager manager;
-
-      volatile int outstandingCredits;
-   }
-
-   private Map<SimpleString, CreditManagerHolder> creditManagerHolders = new HashMap<SimpleString, CreditManagerHolder>();
-
-   private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws Exception
-   {
-      CreditManagerHolder holder = creditManagerHolders.get(address);
-
-      if (holder == null)
-      {
-         PagingStore store = postOffice.getPagingManager().getPageStore(address);
-
-         holder = new CreditManagerHolder(store);
-
-         creditManagerHolders.put(address, holder);
-      }
-
-      return holder;
-   }
-
    public void handleRequestProducerCredits(final SessionRequestProducerCreditsMessage packet) throws Exception
    {
       final SimpleString address = packet.getAddress();
@@ -1587,7 +1554,7 @@
 
       int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
       {
-         public boolean run(int credits)
+         public boolean run(final int credits)
          {
             synchronized (ServerSessionImpl.this)
             {
@@ -1613,22 +1580,13 @@
       sendResponse(packet, null, false, false);
    }
 
-   private void sendProducerCredits(final CreditManagerHolder holder, final int credits, final SimpleString address)
-   {
-      holder.outstandingCredits += credits;
-
-      Packet packet = new SessionProducerCreditsMessage(credits, address);
-
-      channel.send(packet);
-   }
-
    public int transferConnection(final RemotingConnection newConnection, final int lastReceivedCommandID)
    {
-      boolean wasStarted = this.started;
+      boolean wasStarted = started;
 
       if (wasStarted)
       {
-         this.setStarted(false);
+         setStarted(false);
       }
 
       remotingConnection.removeFailureListener(this);
@@ -1652,11 +1610,11 @@
 
       int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
 
-      channel.replayCommands(lastReceivedCommandID, this.id);
+      channel.replayCommands(lastReceivedCommandID, id);
 
       if (wasStarted)
       {
-         this.setStarted(true);
+         setStarted(true);
       }
 
       return serverLastReceivedCommandID;
@@ -1804,11 +1762,15 @@
     * @param packet
     * @throws Exception
     */
-   private LargeServerMessage doCreateLargeMessage(long id, final SessionSendLargeMessage packet)
+   private LargeServerMessage doCreateLargeMessage(final long id, final SessionSendLargeMessage packet)
    {
       try
       {
-         return createLargeMessageStorage(id, packet.getLargeMessageHeader());
+         LargeServerMessage msg = createLargeMessageStorage(id, packet.getLargeMessageHeader());
+
+         setPagingStore(msg);
+
+         return msg;
       }
       catch (Exception e)
       {
@@ -1888,7 +1850,7 @@
       }
    }
 
-   private void rollback(boolean lastMessageAsDelived) throws Exception
+   private void rollback(final boolean lastMessageAsDelived) throws Exception
    {
       if (tx == null)
       {
@@ -1915,15 +1877,66 @@
     */
    private void releaseOutStanding(final ServerMessage message, final int credits) throws Exception
    {
-      CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
+      CreditManagerHolder holder = getCreditManagerHolder(message);
 
       holder.outstandingCredits -= credits;
 
       holder.store.returnProducerCredits(credits);
    }
-   
+
+   // TODO can we combine these two methods....
+   private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws Exception
+   {
+      CreditManagerHolder holder = creditManagerHolders.get(address);
+
+      if (holder == null)
+      {
+         PagingStore store = postOffice.getPagingManager().getPageStore(address);
+
+         holder = new CreditManagerHolder(store);
+
+         creditManagerHolders.put(address, holder);
+      }
+
+      return holder;
+   }
+
+   private CreditManagerHolder getCreditManagerHolder(final ServerMessage message) throws Exception
+   {
+      SimpleString address = message.getDestination();
+
+      CreditManagerHolder holder = creditManagerHolders.get(address);
+
+      if (holder == null)
+      {
+         holder = new CreditManagerHolder(message.getPagingStore());
+
+         creditManagerHolders.put(address, holder);
+      }
+
+      return holder;
+   }
+
+   private void setPagingStore(final ServerMessage message) throws Exception
+   {
+      PagingStore store = postOffice.getPagingManager().getPageStore(message.getDestination());
+
+      message.setPagingStore(store);
+   }
+
+   private void sendProducerCredits(final CreditManagerHolder holder, final int credits, final SimpleString address)
+   {
+      holder.outstandingCredits += credits;
+
+      Packet packet = new SessionProducerCreditsMessage(credits, address);
+
+      channel.send(packet);
+   }
+
    private void send(final ServerMessage msg) throws Exception
    {
+      // Look up the paging store
+
       // check the user has write access to this address.
       try
       {
@@ -1947,4 +1960,20 @@
          postOffice.route(msg, tx);
       }
    }
+
+   private static final class CreditManagerHolder
+   {
+      CreditManagerHolder(final PagingStore store)
+      {
+         this.store = store;
+
+         manager = store.getProducerCreditManager();
+      }
+
+      final PagingStore store;
+
+      final ServerProducerCreditManager manager;
+
+      volatile int outstandingCredits;
+   }
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -20,10 +20,10 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
@@ -53,11 +53,11 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
 import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
@@ -275,12 +275,11 @@
             {
                SessionForceConsumerDelivery message = (SessionForceConsumerDelivery)packet;
                session.handleForceConsumerDelivery(message);
-               break;               
+               break;
             }
             case PacketImpl.SESS_PRODUCER_REQUEST_CREDITS:
             {
-               SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)
-               packet;
+               SessionRequestProducerCreditsMessage message = (SessionRequestProducerCreditsMessage)packet;
                session.handleRequestProducerCredits(message);
                break;
             }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -27,6 +27,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -47,17 +48,17 @@
  */
 public class PagingTest extends ServiceTestBase
 {
-   
+
    public PagingTest(String name)
    {
       super(name);
    }
-   
+
    public PagingTest()
    {
       super();
    }
-   
+
    // Constants -----------------------------------------------------
    private static final Logger log = Logger.getLogger(PagingTest.class);
 
@@ -160,7 +161,7 @@
             ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
 
             assertNotNull(message2);
-            
+
             assertEquals(i, ((Integer)message2.getProperty(new SimpleString("id"))).intValue());
 
             message2.acknowledge();
@@ -244,7 +245,7 @@
             message.setBody(bodyLocal);
 
             // Stop sending message as soon as we start paging
-            if (server.getPostOffice().getPagingManager().isPaging(ADDRESS))
+            if (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging())
             {
                break;
             }
@@ -253,7 +254,7 @@
             producer.send(message);
          }
 
-         assertTrue(server.getPostOffice().getPagingManager().isPaging(ADDRESS));
+         assertTrue(server.getPostOffice().getPagingManager().getPageStore(ADDRESS).isPaging());
 
          session.start();
 
@@ -389,8 +390,12 @@
             message.setBody(ChannelBuffers.wrappedBuffer(body));
             message.putIntProperty(new SimpleString("id"), i);
 
+            TestSupportPageStore store = (TestSupportPageStore)server.getPostOffice()
+                                                                     .getPagingManager()
+                                                                     .getPageStore(ADDRESS);
+
             // Worse scenario possible... only schedule what's on pages
-            if (server.getPostOffice().getPagingManager().getPageStore(ADDRESS).getCurrentPage() != null)
+            if (store.getCurrentPage() != null)
             {
                message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledTime);
             }
@@ -1112,7 +1117,7 @@
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
-   
+
    protected Configuration createDefaultConfig()
    {
       Configuration config = super.createDefaultConfig();

Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -242,10 +242,11 @@
       @Override
       protected PagingManager createPagingManager()
       {
-         return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory()),
+         return new PagingManagerImpl(new FailurePagingStoreFactoryNIO(super.getConfiguration().getPagingDirectory(),
+                                                                       super.getConfiguration()
+                                                                            .isJournalSyncNonTransactional()),
                                       super.getStorageManager(),
-                                      super.getAddressSettingsRepository(),
-                                      super.getConfiguration().isJournalSyncNonTransactional());
+                                      super.getAddressSettingsRepository());
       }
 
       class FailurePagingStoreFactoryNIO extends PagingStoreFactoryNIO
@@ -255,9 +256,9 @@
           * @param directory
           * @param maxThreads
           */
-         public FailurePagingStoreFactoryNIO(final String directory)
+         public FailurePagingStoreFactoryNIO(final String directory, final boolean syncNonTransactional)
          {
-            super(directory, new OrderedExecutorFactory(Executors.newCachedThreadPool()));
+            super(directory, new OrderedExecutorFactory(Executors.newCachedThreadPool()), syncNonTransactional);
          }
 
          // Constants -----------------------------------------------------
@@ -277,7 +278,7 @@
             factoryField.setAccessible(true);
 
             OrderedExecutorFactory factory = (org.hornetq.utils.OrderedExecutorFactory)factoryField.get(this);
-            return new FailingPagingStore(destinationName, settings, factory.getExecutor());
+            return new FailingPagingStore(destinationName, settings, factory.getExecutor(), syncNonTransactional);
          }
 
          // Package protected ---------------------------------------------
@@ -297,16 +298,19 @@
              */
             public FailingPagingStore(final SimpleString storeName,
                                       final AddressSettings addressSettings,
-                                      final Executor executor)
+                                      final Executor executor,
+                                      final boolean syncNonTransactional)
             {
-               super(getPostOffice().getPagingManager(),
+               super(storeName,
+                     getPostOffice().getPagingManager(),
                      getStorageManager(),
                      getPostOffice(),
                      null,
                      FailurePagingStoreFactoryNIO.this,
                      storeName,
                      addressSettings,
-                     executor);
+                     executor,
+                     syncNonTransactional);
             }
 
             @Override

Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -109,7 +109,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
          manager.stop();
       }
@@ -134,11 +135,13 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
          try
          {
-            manager.compareJournals(new JournalLoadInformation[]{new JournalLoadInformation(2,2), new JournalLoadInformation(2,2)});
+            manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(2, 2),
+                                                                  new JournalLoadInformation(2, 2) });
             fail("Exception was expected");
          }
          catch (HornetQException e)
@@ -147,7 +150,8 @@
             assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
          }
 
-         manager.compareJournals(new JournalLoadInformation[]{new JournalLoadInformation(), new JournalLoadInformation()});
+         manager.compareJournals(new JournalLoadInformation[] { new JournalLoadInformation(),
+                                                               new JournalLoadInformation() });
 
          manager.stop();
       }
@@ -173,13 +177,15 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
 
          manager.start();
 
          try
          {
-            ReplicationManagerImpl manager2 = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+            ReplicationManagerImpl manager2 = new ReplicationManagerImpl(failoverManager,
+                                                                         ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
 
             manager2.start();
             fail("Exception was expected");
@@ -212,7 +218,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
 
          try
          {
@@ -246,7 +253,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -363,7 +371,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -425,7 +434,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
          fail("Exception expected");
       }
@@ -450,7 +460,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -580,10 +591,10 @@
    {
 
       PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
-                                                                             executorFactory),
+                                                                             executorFactory,
+                                                                             false),
                                                    storageManager,
-                                                   addressSettingsRepository,
-                                                   false);
+                                                   addressSettingsRepository);
 
       paging.start();
       return paging;

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingManagerImplTest.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -64,10 +64,9 @@
       addressSettings.setDefault(settings);
       
       
-      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), new OrderedExecutorFactory(Executors.newCachedThreadPool())),
+      PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), new OrderedExecutorFactory(Executors.newCachedThreadPool()), true),
                                                             new NullStorageManager(),
-                                                            addressSettings,
-                                                            true);
+                                                            addressSettings);
 
       managerImpl.start();
 
@@ -75,11 +74,11 @@
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
-      assertFalse(store.page(new PagedMessageImpl(msg), true, true));
+      assertFalse(store.page(msg, true));
 
       store.startPaging();
 
-      assertTrue(store.page(new PagedMessageImpl(msg), true, true));
+      assertTrue(store.page(msg, true));
 
       Page page = store.depage();
 
@@ -97,7 +96,7 @@
 
       assertNull(store.depage());
 
-      assertFalse(store.page(new PagedMessageImpl(msg), true, true));
+      assertFalse(store.page(msg, true));
    }
 
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -18,6 +18,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
@@ -39,12 +40,11 @@
 import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.paging.PagingStoreFactory;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
-import org.hornetq.core.paging.impl.PagedMessageImpl;
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -56,8 +56,8 @@
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -142,14 +142,17 @@
 
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
+
+      PagingStore storeImpl = new PagingStoreImpl(destinationTestName,
+                                                  createMockManager(),
                                                   createStorageManagerMock(),
                                                   createPostOfficeMock(),
                                                   factory,
                                                   null,
                                                   destinationTestName,
                                                   addressSettings,
-                                                  executor);
+                                                  executor,
+                                                  true);
 
       storeImpl.start();
 
@@ -180,14 +183,16 @@
 
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
-                                                  createStorageManagerMock(),
-                                                  createPostOfficeMock(),
-                                                  factory,
-                                                  storeFactory,
-                                                  destinationTestName,
-                                                  addressSettings,
-                                                  executor);
+      TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+                                                           createMockManager(),
+                                                           createStorageManagerMock(),
+                                                           createPostOfficeMock(),
+                                                           factory,
+                                                           storeFactory,
+                                                           destinationTestName,
+                                                           addressSettings,
+                                                           executor,
+                                                           true);
 
       storeImpl.start();
 
@@ -204,24 +209,26 @@
       buffers.add(buffer);
       SimpleString destination = new SimpleString("test");
 
-      PagedMessageImpl msg = createMessage(destination, buffer);
+      ServerMessage msg = createMessage(storeImpl, destination, buffer);
 
       assertTrue(storeImpl.isPaging());
 
-      assertTrue(storeImpl.page(msg, true, true));
+      assertTrue(storeImpl.page(msg, true));
 
       assertEquals(1, storeImpl.getNumberOfPages());
 
       storeImpl.sync();
 
-      storeImpl = new PagingStoreImpl(createMockManager(),
+      storeImpl = new PagingStoreImpl(destinationTestName,
+                                      createMockManager(),
                                       createStorageManagerMock(),
                                       createPostOfficeMock(),
                                       factory,
                                       null,
                                       destinationTestName,
                                       addressSettings,
-                                      executor);
+                                      executor,
+                                      true);
 
       storeImpl.start();
 
@@ -239,14 +246,16 @@
 
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+      TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+                                                           createMockManager(),
                                                            createStorageManagerMock(),
                                                            createPostOfficeMock(),
                                                            factory,
                                                            storeFactory,
                                                            destinationTestName,
                                                            addressSettings,
-                                                           executor);
+                                                           executor,
+                                                           true);
 
       storeImpl.start();
 
@@ -263,9 +272,9 @@
 
          buffers.add(buffer);
 
-         PagedMessageImpl msg = createMessage(destination, buffer);
+         ServerMessage msg = createMessage(storeImpl, destination, buffer);
 
-         assertTrue(storeImpl.page(msg, true, true));
+         assertTrue(storeImpl.page(msg, true));
       }
 
       assertEquals(1, storeImpl.getNumberOfPages());
@@ -304,14 +313,16 @@
 
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
-      TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+      TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+                                                           createMockManager(),
                                                            createStorageManagerMock(),
                                                            createPostOfficeMock(),
                                                            factory,
                                                            storeFactory,
                                                            destinationTestName,
                                                            addressSettings,
-                                                           executor);
+                                                           executor,
+                                                           true);
 
       storeImpl.start();
 
@@ -335,9 +346,9 @@
             storeImpl.forceAnotherPage();
          }
 
-         PagedMessageImpl msg = createMessage(destination, buffer);
+         ServerMessage msg = createMessage(storeImpl, destination, buffer);
 
-         assertTrue(storeImpl.page(msg, true, true));
+         assertTrue(storeImpl.page(msg, true));
       }
 
       assertEquals(2, storeImpl.getNumberOfPages());
@@ -367,9 +378,9 @@
 
       assertTrue(storeImpl.isPaging());
 
-      PagedMessageImpl msg = createMessage(destination, buffers.get(0));
+      ServerMessage msg = createMessage(storeImpl, destination, buffers.get(0));
 
-      assertTrue(storeImpl.page(msg, true, true));
+      assertTrue(storeImpl.page(msg, true));
 
       Page newPage = storeImpl.depage();
 
@@ -387,11 +398,11 @@
 
       assertFalse(storeImpl.isPaging());
 
-      assertFalse(storeImpl.page(msg, true, true));
+      assertFalse(storeImpl.page(msg, true));
 
       storeImpl.startPaging();
 
-      assertTrue(storeImpl.page(msg, true, true));
+      assertTrue(storeImpl.page(msg, true));
 
       Page page = storeImpl.depage();
 
@@ -427,7 +438,6 @@
    protected void testConcurrentPaging(final SequentialFileFactory factory, final int numberOfThreads) throws Exception,
                                                                                                       InterruptedException
    {
-
       PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
 
       final int MAX_SIZE = 1024 * 10;
@@ -438,7 +448,7 @@
 
       final CountDownLatch latchStart = new CountDownLatch(numberOfThreads);
 
-      final ConcurrentHashMap<Long, PagedMessageImpl> buffers = new ConcurrentHashMap<Long, PagedMessageImpl>();
+      final ConcurrentHashMap<Long, ServerMessage> buffers = new ConcurrentHashMap<Long, ServerMessage>();
 
       final ArrayList<Page> readPages = new ArrayList<Page>();
 
@@ -446,14 +456,16 @@
       settings.setPageSizeBytes(MAX_SIZE);
       settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
 
-      final TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(destinationTestName,
+                                                                 createMockManager(),
                                                                  createStorageManagerMock(),
                                                                  createPostOfficeMock(),
                                                                  factory,
                                                                  storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 executor);
+                                                                 executor,
+                                                                 true);
 
       storeImpl.start();
 
@@ -480,8 +492,8 @@
                while (true)
                {
                   long id = messageIdGenerator.incrementAndGet();
-                  PagedMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
-                  if (storeImpl.page(msg, false, true))
+                  ServerMessage msg = createMessage(storeImpl, destination, createRandomBuffer(id, 5));                  
+                  if (storeImpl.page(msg, true))
                   {
                      buffers.put(id, msg);
                   }
@@ -564,7 +576,7 @@
          throw consumer.e;
       }
 
-      final ConcurrentHashMap<Long, PagedMessage> buffers2 = new ConcurrentHashMap<Long, PagedMessage>();
+      final ConcurrentMap<Long, ServerMessage> buffers2 = new ConcurrentHashMap<Long, ServerMessage>();
 
       for (Page page : readPages)
       {
@@ -577,13 +589,11 @@
             long id = msg.getMessage(null).getBody().readLong();
             msg.getMessage(null).getBody().resetReaderIndex();
 
-            PagedMessageImpl msgWritten = buffers.remove(id);
-            buffers2.put(id, msg);
+            ServerMessage msgWritten = buffers.remove(id);
+            buffers2.put(id, msg.getMessage(null));
             assertNotNull(msgWritten);
-            assertEquals(msg.getMessage(null).getDestination(), msgWritten.getMessage(null).getDestination());
-            assertEqualsByteArrays(msgWritten.getMessage(null).getBody().array(), msg.getMessage(null)
-                                                                                     .getBody()
-                                                                                     .array());
+            assertEquals(msg.getMessage(null).getDestination(), msgWritten.getDestination());
+            assertEqualsByteArrays(msgWritten.getBody().array(), msg.getMessage(null).getBody().array());
          }
       }
 
@@ -601,14 +611,16 @@
          fileTmp.close();
       }
 
-      TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(),
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(destinationTestName,
+                                                            createMockManager(),
                                                             createStorageManagerMock(),
                                                             createPostOfficeMock(),
                                                             factory,
                                                             storeFactory,
                                                             new SimpleString("test"),
                                                             settings,
-                                                            executor);
+                                                            executor,
+                                                            true);
       storeImpl2.start();
 
       int numberOfPages = storeImpl2.getNumberOfPages();
@@ -621,9 +633,9 @@
       assertEquals(numberOfPages, storeImpl2.getNumberOfPages());
 
       long lastMessageId = messageIdGenerator.incrementAndGet();
-      PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
+      ServerMessage lastMsg = createMessage(storeImpl, destination, createRandomBuffer(lastMessageId, 5));
 
-      storeImpl2.page(lastMsg, false, true);
+      storeImpl2.page(lastMsg, true);
       buffers2.put(lastMessageId, lastMsg);
 
       Page lastPage = null;
@@ -647,12 +659,10 @@
          {
 
             long id = msg.getMessage(null).getBody().readLong();
-            PagedMessage msgWritten = buffers2.remove(id);
+            ServerMessage msgWritten = buffers2.remove(id);
             assertNotNull(msgWritten);
-            assertEquals(msg.getMessage(null).getDestination(), msgWritten.getMessage(null).getDestination());
-            assertEqualsByteArrays(msgWritten.getMessage(null).getBody().array(), msg.getMessage(null)
-                                                                                     .getBody()
-                                                                                     .array());
+            assertEquals(msg.getMessage(null).getDestination(), msgWritten.getDestination());
+            assertEqualsByteArrays(msgWritten.getBody().array(), msg.getMessage(null).getBody().array());
          }
       }
 
@@ -663,14 +673,11 @@
 
       lastMessages.get(0).getMessage(null).getBody().resetReaderIndex();
       assertEquals(lastMessages.get(0).getMessage(null).getBody().readLong(), lastMessageId);
-      assertEqualsByteArrays(lastMessages.get(0).getMessage(null).getBody().array(), lastMsg.getMessage(null)
-                                                                                            .getBody()
-                                                                                            .array());
+      assertEqualsByteArrays(lastMessages.get(0).getMessage(null).getBody().array(), lastMsg.getBody().array());
 
       assertEquals(0, buffers2.size());
 
       assertEquals(0, storeImpl.getAddressSize());
-
    }
 
    /**
@@ -691,12 +698,15 @@
       return new FakePostOffice();
    }
 
-   private PagedMessageImpl createMessage(final SimpleString destination, final HornetQBuffer buffer)
+   private ServerMessage createMessage(final PagingStore store, final SimpleString destination, final HornetQBuffer buffer)
    {
       ServerMessage msg = new ServerMessageImpl((byte)1, true, 0, System.currentTimeMillis(), (byte)0, buffer);
 
       msg.setDestination(destination);
-      return new PagedMessageImpl(msg);
+      
+      msg.setPagingStore(store);
+
+      return msg;
    }
 
    private HornetQBuffer createRandomBuffer(final long id, final int size)
@@ -947,29 +957,30 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
        */
-      public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
+      public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
+                                                       List<GroupingInfo> groupingInfos) throws Exception
       {
          return new JournalLoadInformation();
       }
 
       public void addGrouping(GroupBinding groupBinding) throws Exception
       {
-         //To change body of implemented methods use File | Settings | File Templates.
+         // To change body of implemented methods use File | Settings | File Templates.
       }
 
       public void deleteGrouping(GroupBinding groupBinding) throws Exception
       {
-         //To change body of implemented methods use File | Settings | File Templates.
+         // To change body of implemented methods use File | Settings | File Templates.
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
        */
       public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
-                                     PagingManager pagingManager,
-                                     ResourceManager resourceManager,
-                                     Map<Long, Queue> queues,
-                                     Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+                                                       PagingManager pagingManager,
+                                                       ResourceManager resourceManager,
+                                                       Map<Long, Queue> queues,
+                                                       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
       {
          return new JournalLoadInformation();
       }
@@ -1136,8 +1147,7 @@
        */
       public void afterReplicated(Runnable run)
       {
-         
-         
+
       }
 
       /* (non-Javadoc)
@@ -1145,8 +1155,7 @@
        */
       public void completeReplication()
       {
-         
-         
+
       }
 
       /* (non-Javadoc)
@@ -1154,7 +1163,7 @@
        */
       public LargeServerMessage createLargeMessage(long messageId, byte[] header)
       {
-         
+
          return null;
       }
 
@@ -1163,7 +1172,7 @@
        */
       public boolean isReplicated()
       {
-         
+
          return false;
       }
 
@@ -1172,7 +1181,7 @@
        */
       public JournalLoadInformation[] loadInternalOnly() throws Exception
       {
-         return null; 
+         return null;
       }
 
       /* (non-Javadoc)
@@ -1180,8 +1189,7 @@
        */
       public void pageClosed(SimpleString storeName, int pageNumber)
       {
-         
-         
+
       }
 
       /* (non-Javadoc)
@@ -1189,8 +1197,7 @@
        */
       public void pageDeleted(SimpleString storeName, int pageNumber)
       {
-         
-         
+
       }
 
       /* (non-Javadoc)
@@ -1198,8 +1205,7 @@
        */
       public void pageWrite(PagedMessage message, int pageNumber)
       {
-         
-         
+
       }
 
       /* (non-Javadoc)
@@ -1293,8 +1299,6 @@
       {
       }
 
-
-
    }
 
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-05 13:30:49 UTC (rev 8222)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2009-11-05 14:23:20 UTC (rev 8223)
@@ -895,6 +895,36 @@
          return 0;
       }
 
+      public PagingStore getPagingStore()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
+      public void setPagingStore(PagingStore store)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      public boolean page(boolean duplicateDetection) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public boolean page(long transactionID, boolean duplicateDetection) throws Exception
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
+      public boolean storeIsPaging()
+      {
+         // TODO Auto-generated method stub
+         return false;
+      }
+
    }
 
    class FakeFilter implements Filter



More information about the hornetq-commits mailing list