[hornetq-commits] JBoss hornetq SVN: r11299 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl and 13 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 7 09:38:25 EDT 2011


Author: borges
Date: 2011-09-07 09:38:25 -0400 (Wed, 07 Sep 2011)
New Revision: 11299

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 Synchronization of Pages for replication

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/Page.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -15,11 +15,12 @@
 
 import java.util.List;
 
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.paging.cursor.LivePageCache;
 import org.hornetq.core.persistence.StorageManager;
 
 /**
- * 
+ *
  * @see PagingManager
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -31,7 +32,7 @@
    void write(PagedMessage message) throws Exception;
 
    List<PagedMessage> read(StorageManager storage) throws Exception;
-   
+
    void setLiveCache(LivePageCache pageCache);
 
    int getSize();
@@ -45,4 +46,6 @@
    void close() throws Exception;
 
    boolean delete(PagedMessage[] messages) throws Exception;
+
+   SequentialFile getFile();
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingManager.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -17,37 +17,32 @@
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.postoffice.Address;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
 
 /**
- * 
- * 
-<PRE>
-
-+------------+      1  +-------------+       N +------------+       N +-------+       1 +----------------+
-| {@link PostOffice} |-------&gt; |PagingManager|-------&gt; |{@link PagingStore} | ------&gt; | {@link Page}  | ------&gt; | {@link SequentialFile} |
-+------------+         +-------------+         +------------+         +-------+         +----------------+
-                              |                       1 ^
-                              |                         |
-                              |                         |
-                              |                         | 1
-                              |        N +---------+
-                              +--------&gt; | Address |
-                                         +---------+   
-
-</PRE>
-
- * 
+ * <PRE>
+ *
+ * +------------+      1  +-------------+       N +------------+       N +-------+       1 +----------------+
+ * | {@link PostOffice} |-------&gt; |{@link PagingManager}|-------&gt; |{@link PagingStore} | ------&gt; | {@link Page}  | ------&gt; | {@link SequentialFile} |
+ * +------------+         +-------------+         +------------+         +-------+         +----------------+
+ *                               |                       1 ^
+ *                               |                         |
+ *                               |                         |
+ *                               |                         | 1
+ *                               |        N +---------+   /
+ *                               +--------&gt; | {@link Address} |
+ *                                          +---------+
+ * </PRE>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
- *
  */
 public interface PagingManager extends HornetQComponent, HierarchicalRepositoryChangeListener
 {
-   /** To return the PageStore associated with the address */
+   /** Returns the PageStore associated with the address. A new page store is created if necessary. */
    PagingStore getPageStore(SimpleString address) throws Exception;
 
    /** An injection point for the PostOffice to inject itself */
@@ -67,18 +62,30 @@
     * @param transactionID
     */
    void removeTransaction(long transactionID);
-   
+
    Map<Long, PageTransactionInfo> getTransactions();
 
    /**
     * Reload previously created PagingStores into memory
-    * @throws Exception 
+    * @throws Exception
     */
    void reloadStores() throws Exception;
 
    SimpleString[] getStoreNames();
 
    void deletePageStore(SimpleString storeName) throws Exception;
-   
+
    void processReload() throws Exception;
+
+   /**
+    * Lock the manager, and all its {@link PagingStore}s. This method should not be called during
+    * normal PagingManager usage.
+    */
+   void lockAll();
+
+   /**
+    * Unlock the manager.
+    * @see #lockAll()
+    */
+   void unlockAll();
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,8 +13,12 @@
 
 package org.hornetq.core.paging;
 
+import java.util.Collection;
+
 import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.paging.cursor.PageCursorProvider;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.RouteContextList;
 import org.hornetq.core.server.RoutingContext;
@@ -23,46 +27,46 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 
 /**
- * 
- * <p>The implementation will take care of details such as PageSize.</p>
- * <p>The producers will write directly to PagingStore and that will decide what
- * Page file should be used based on configured size</p>
- * 
+ * <p>
+ * The implementation will take care of details such as PageSize.
+ * </p>
+ * <p>
+ * The producers will write directly to PagingStore, and the store will decide what Page file should
+ * be used based on configured size.
+ * </p>
  * @see PagingManager
-
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
  */
 public interface PagingStore extends HornetQComponent
 {
    SimpleString getAddress();
 
    int getNumberOfPages();
-   
+
    // The current page in which the system is writing files
    int getCurrentWritingPage();
 
    SimpleString getStoreName();
 
    AddressFullMessagePolicy getAddressFullMessagePolicy();
-   
+
    long getFirstPage();
-   
+
    long getTopPage();
 
    long getPageSizeBytes();
 
    long getAddressSize();
-   
+
    long getMaxSize();
-   
+
    void applySetting(AddressSettings addressSettings);
 
    boolean isPaging();
 
    // It will schedule sync to the file storage
    void sync() throws Exception;
-   
+
    // It will perform a real sync on the current IO file
    void ioSync() throws Exception;
 
@@ -71,23 +75,23 @@
    boolean page(ServerMessage message, RoutingContext ctx, RouteContextList listCtx) throws Exception;
 
    Page createPage(final int page) throws Exception;
-   
-   boolean checkPage(final int page) throws Exception;
-   
+
+   boolean checkPageFileExists(final int page) throws Exception;
+
    PagingManager getPagingManager();
-   
+
    PageCursorProvider getCursorProvier();
-   
+
    void processReload() throws Exception;
-   
-   /** 
+
+   /**
     * Remove the first page from the Writing Queue.
-    * The file will still exist until Page.delete is called, 
+    * The file will still exist until Page.delete is called,
     * So, case the system is reloaded the same Page will be loaded back if delete is not called.
     *
     * @throws Exception
-    * 
-    * Note: This should still be part of the interface, even though HornetQ only uses through the 
+    *
+    * Note: This should still be part of the interface, even though HornetQ only uses through the
     */
    Page depage() throws Exception;
 
@@ -103,20 +107,37 @@
    void stopPaging() throws Exception;
 
    void addSize(int size);
-   
+
    void executeRunnableWhenMemoryAvailable(Runnable runnable);
-   
+
    /** This method will hold and producer, but it wait operations to finish before locking (write lock) */
    void lock();
-   
-   /** 
-    * 
+
+   /**
+    *
     * Call this method using the same thread used by the last call of {@link PagingStore#lock()}
-    * 
+    *
     */
     void unlock();
 
     /** This is used mostly by tests.
      *  We will wait any pending runnable to finish its execution */
     void flushExecutors();
+
+   /**
+    * Files to synchronize with backup.
+    * @return
+    * @throws Exception
+    */
+   Collection<Integer> getCurrentIds() throws Exception;
+
+   /**
+    * Sends the pages with given ids to the replicator.
+    * <p>
+    * Sending is done here to avoid exposing the internal {@link SequentialFile}s.
+    * @param replicator
+    * @param pageIds
+    * @throws Exception
+    */
+   void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -150,7 +150,7 @@
             cache = softCache.get(pageId);
             if (cache == null)
             {
-               if (!pagingStore.checkPage((int)pageId))
+               if (!pagingStore.checkPageFileExists((int)pageId))
                {
                   return null;
                }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PageImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -32,7 +32,7 @@
 import org.hornetq.utils.DataConstants;
 
 /**
- * 
+ *
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
@@ -59,7 +59,7 @@
    private final SequentialFile file;
 
    private final SequentialFileFactory fileFactory;
-   
+
    /**
     * The page cache that will be filled with data as we write more data
     */
@@ -96,7 +96,7 @@
    {
       return pageId;
    }
-   
+
    public void setLiveCache(LivePageCache pageCache)
    {
       this.pageCache = pageCache;
@@ -109,7 +109,7 @@
       size.set((int)file.size());
       // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
       ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-      
+
       file.position(0);
       file.read(buffer2);
 
@@ -181,7 +181,7 @@
       buffer.rewind();
 
       file.writeDirect(buffer, false);
-      
+
       if (pageCache != null)
       {
          pageCache.addLiveMessage(message);
@@ -234,7 +234,7 @@
             if (msg.getMessage().isLargeMessage())
             {
                LargeServerMessage lmsg = (LargeServerMessage)msg.getMessage();
-               
+
                // Remember, cannot call delete directly here
                // Because the large-message may be linked to another message
                // or it may still being delivered even though it has been acked already
@@ -257,7 +257,7 @@
          {
             file.delete();
          }
-         
+
          return true;
       }
       catch (Exception e)
@@ -276,13 +276,14 @@
    {
       return size.intValue();
    }
-   
+
+   @Override
    public String toString()
    {
       return "PageImpl::pageID="  + this.pageId + ", file=" + this.file;
    }
-   
 
+
    /* (non-Javadoc)
     * @see java.lang.Comparable#compareTo(java.lang.Object)
     */
@@ -339,5 +340,11 @@
       suspiciousRecords = true;
    }
 
+   @Override
+   public SequentialFile getFile()
+   {
+      return file;
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagedMessageImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -24,9 +24,9 @@
 import org.hornetq.utils.DataConstants;
 
 /**
- * 
+ *
  * This class represents a paged message
- * 
+ *
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  *
@@ -45,11 +45,14 @@
 
    // Public --------------------------------------------------------
 
-   /** Large messages will need to be instatiated lazily during getMessage when the StorageManager is available */
+   /**
+    * Large messages will need to be instantiated lazily during getMessage when the StorageManager
+    * is available
+    */
    private byte[] largeMessageLazyData;
 
    private ServerMessage message;
-   
+
    private long queueIDs[];
 
    private long transactionID = 0;
@@ -74,17 +77,17 @@
    {
       return message;
    }
-   
+
    public void initMessage(StorageManager storage)
    {
       if (largeMessageLazyData != null)
       {
          LargeServerMessage lgMessage = storage.createLargeMessage();
-         message = lgMessage;
          HornetQBuffer buffer = HornetQBuffers.dynamicBuffer(largeMessageLazyData);
-         message.decodeHeadersAndProperties(buffer);
+         lgMessage.decodeHeadersAndProperties(buffer);
          lgMessage.incrementDelayDeletionCount();
          lgMessage.setPaged();
+         message = lgMessage;
          largeMessageLazyData = null;
       }
    }
@@ -93,7 +96,7 @@
    {
       return transactionID;
    }
-   
+
    public long[] getQueueIDs()
    {
       return queueIDs;
@@ -123,11 +126,11 @@
 
          message.decode(buffer);
       }
-      
+
       int queueIDsSize = buffer.readInt();
-      
+
       queueIDs = new long[queueIDsSize];
-      
+
       for (int i = 0 ; i < queueIDsSize; i++)
       {
          queueIDs[i] = buffer.readLong();
@@ -143,18 +146,18 @@
       buffer.writeInt(message.getEncodeSize());
 
       message.encode(buffer);
-      
+
       buffer.writeInt(queueIDs.length);
-      
-      for (int i = 0 ; i < queueIDs.length; i++)
+
+      for (long queueID : queueIDs)
       {
-         buffer.writeLong(queueIDs[i]);
+         buffer.writeLong(queueID);
       }
    }
 
    public int getEncodeSize()
    {
-      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() + 
+      return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + message.getEncodeSize() +
              DataConstants.SIZE_INT + queueIDs.length * DataConstants.SIZE_LONG;
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -18,6 +18,7 @@
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
@@ -31,7 +32,7 @@
 import org.hornetq.core.settings.impl.AddressSettings;
 
 /**
- * 
+ *
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
@@ -45,6 +46,13 @@
 
    private volatile boolean started = false;
 
+   /**
+    * Lock used at the start of synchronization between a live server and its backup.
+    * Synchronization will lock all {@link PagingStore} instances, and so any operation here that
+    * requires a lock on a {@link PagingStore} instance needs to take a read-lock on
+    * {@link #syncLock} to avoid dead-locks.
+    */
+   private final ReentrantReadWriteLock syncLock = new ReentrantReadWriteLock();
    private final ConcurrentMap<SimpleString, PagingStore> stores = new ConcurrentHashMap<SimpleString, PagingStore>();
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -53,7 +61,8 @@
 
    private final StorageManager storageManager;
 
-   private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
+   private final ConcurrentMap<Long, PageTransactionInfo> transactions =
+            new ConcurrentHashMap<Long, PageTransactionInfo>();
 
    // Static
    // --------------------------------------------------------------------------------------------------------------------------
@@ -76,9 +85,9 @@
    // Public
    // ---------------------------------------------------------------------------------------------------------------------------
 
-   
+
    // Hierarchical changes listener
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
     */
@@ -87,60 +96,58 @@
       reaplySettings();
    }
 
-
-   
    // PagingManager implementation
    // -----------------------------------------------------------------------------------------------------
 
    public void reaplySettings()
    {
-      for (PagingStore store : stores.values())
+    for (PagingStore store : stores.values())
       {
          AddressSettings settings = this.addressSettingsRepository.getMatch(store.getAddress().toString());
          store.applySetting(settings);
       }
    }
-   
+
    public SimpleString[] getStoreNames()
    {
       Set<SimpleString> names = stores.keySet();
       return names.toArray(new SimpleString[names.size()]);
    }
 
-   public synchronized void reloadStores() throws Exception
+   public void reloadStores() throws Exception
    {
-      List<PagingStore> reloadedStores = pagingStoreFactory.reloadStores(addressSettingsRepository);
+      lock();
+      try
+      {
+         List<PagingStore> reloadedStores = pagingStoreFactory.reloadStores(addressSettingsRepository);
 
-      for (PagingStore store : reloadedStores)
+         for (PagingStore store : reloadedStores)
+         {
+            store.start();
+            stores.put(store.getStoreName(), store);
+         }
+      }
+      finally
       {
-         store.start();
-         stores.put(store.getStoreName(), store);
+         unlock();
       }
 
    }
 
-   private synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
+   public void deletePageStore(final SimpleString storeName) throws Exception
    {
-      PagingStore store = stores.get(storeName);
-
-      if (store == null)
+      syncLock.readLock().lock();
+      try
       {
-         store = newStore(storeName);
-
-         store.start();
-
-         stores.put(storeName, store);
+         PagingStore store = stores.remove(storeName);
+         if (store != null)
+         {
+            store.stop();
+         }
       }
-
-      return store;
-   }
-   
-   public void deletePageStore(final SimpleString storeName) throws Exception
-   {
-      PagingStore store = stores.remove(storeName);
-      if (store != null)
+      finally
       {
-         store.stop();
+         syncLock.readLock().unlock();
       }
    }
 
@@ -149,12 +156,11 @@
    {
       PagingStore store = stores.get(storeName);
 
-      if (store == null)
+      if (store != null)
       {
-         store = createPageStore(storeName);
+         return store;
       }
-
-      return store;
+      return newStore(storeName);
    }
 
    /** this will be set by the postOffice itself.
@@ -179,7 +185,7 @@
    {
       return transactions.get(id);
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.paging.PagingManager#getTransactions()
     */
@@ -197,37 +203,53 @@
       return started;
    }
 
-   public synchronized void start() throws Exception
+   public void start() throws Exception
    {
-      if (started)
+      lock();
+      try
       {
-         return;
-      }
+         if (started)
+         {
+            return;
+         }
 
-      pagingStoreFactory.setPagingManager(this);
+         pagingStoreFactory.setPagingManager(this);
 
-      pagingStoreFactory.setStorageManager(storageManager);
+         pagingStoreFactory.setStorageManager(storageManager);
 
-      reloadStores();
+         reloadStores();
 
-      started = true;
+         started = true;
+      }
+      finally
+      {
+         unlock();
+      }
    }
 
-   public synchronized void stop() throws Exception
+   public void stop() throws Exception
    {
-      if (!started)
+      lock();
+      try
       {
-         return;
-      }
+         if (!started)
+         {
+            return;
+         }
 
-      started = false;
+         started = false;
 
-      for (PagingStore store : stores.values())
+         for (PagingStore store : stores.values())
+         {
+            store.stop();
+         }
+
+         pagingStoreFactory.stop();
+      }
+      finally
       {
-         store.stop();
+         unlock();
       }
-
-      pagingStoreFactory.stop();
    }
 
    public void processReload() throws Exception
@@ -245,17 +267,52 @@
 
    // Private -------------------------------------------------------
 
-   protected PagingStore newStore(final SimpleString address) 
+   private PagingStore newStore(final SimpleString address) throws Exception
    {
-      return pagingStoreFactory.newStore(address,
-                                         addressSettingsRepository.getMatch(address.toString()));
+      lock();
+      try {
+         PagingStore store = stores.get(address);
+         if (store == null)
+         {
+            store = pagingStoreFactory.newStore(address, addressSettingsRepository.getMatch(address.toString()));
+            store.start();
+            stores.put(address, store);
+         }
+         return store;
+      }
+      finally
+      {
+         unlock();
+      }
    }
-   
-   protected PagingStoreFactory getStoreFactory()
+
+   private void unlock()
    {
-      return pagingStoreFactory;
+      syncLock.writeLock().unlock();
    }
 
-   // Inner classes -------------------------------------------------
+   private void lock()
+   {
+      syncLock.writeLock().lock();
+   }
 
+   @Override
+   public synchronized void lockAll()
+   {
+      syncLock.writeLock().lock();
+      for (PagingStore store : stores.values())
+      {
+         store.lock();
+      }
+   }
+
+   @Override
+   public void unlockAll()
+   {
+      for (PagingStore store : stores.values())
+      {
+         store.unlock();
+      }
+      syncLock.writeLock().unlock();
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -40,7 +40,7 @@
 import org.hornetq.utils.UUIDGenerator;
 
 /**
- * 
+ *
  * Integration point between Paging and NIO
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -62,9 +62,9 @@
    protected final boolean syncNonTransactional;
 
    private PagingManager pagingManager;
-   
+
    private final ScheduledExecutorService scheduledExecutor;
-   
+
    private final long syncTimeout;
 
    private StorageManager storageManager;
@@ -86,9 +86,9 @@
       this.executorFactory = executorFactory;
 
       this.syncNonTransactional = syncNonTransactional;
-      
+
       this.scheduledExecutor = scheduledExecutor;
-      
+
       this.syncTimeout = syncTimeout;
    }
 
@@ -155,6 +155,7 @@
 
    public void setPostOffice(final PostOffice postOffice)
    {
+      assert this.postOffice == null;
       this.postOffice = postOffice;
    }
 
@@ -233,22 +234,22 @@
    {
       return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
    }
-   
+
    protected PagingManager getPagingManager()
    {
       return pagingManager;
    }
-   
+
    protected StorageManager getStorageManager()
    {
       return storageManager;
    }
-   
+
    protected PostOffice getPostOffice()
    {
       return postOffice;
    }
-   
+
    protected ExecutorFactory getExecutorFactory()
    {
       return executorFactory;

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,8 +13,9 @@
 
 package org.hornetq.core.paging.impl;
 
-import java.io.File;
 import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -42,10 +43,8 @@
 import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.impl.LivePageCacheImpl;
 import org.hornetq.core.paging.cursor.impl.PageCursorProviderImpl;
-import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
-import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.RouteContextList;
@@ -56,15 +55,13 @@
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.TransactionOperation;
-import org.hornetq.core.transaction.TransactionOperationAbstract;
 import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Future;
 
 /**
- * 
+ *
  * @see PagingStore
- * 
+ *
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
@@ -210,7 +207,7 @@
       pageSize = addressSettings.getPageSizeBytes();
 
       addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-      
+
       if (cursorProvider != null)
       {
          cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
@@ -219,6 +216,7 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public String toString()
    {
       return "PagingStoreImpl(" + this.address + ")";
@@ -375,21 +373,28 @@
 
    public synchronized void stop() throws Exception
    {
-      if (running)
+      lock();
+      try
       {
+         if (running)
+         {
+            cursorProvider.stop();
 
-         cursorProvider.stop();
+            running = false;
 
-         running = false;
+            flushExecutors();
 
-         flushExecutors();
-
-         if (currentPage != null)
-         {
-            currentPage.close();
-            currentPage = null;
+            if (currentPage != null)
+            {
+               currentPage.close();
+               currentPage = null;
+            }
          }
       }
+      finally
+      {
+         unlock();
+      }
    }
 
    public void flushExecutors()
@@ -556,8 +561,8 @@
    {
       return currentPage;
    }
-   
-   public boolean checkPage(final int pageNumber)
+
+   public boolean checkPageFileExists(final int pageNumber)
    {
       String fileName = createFileName(pageNumber);
       SequentialFile file = fileFactory.createSequentialFile(fileName, 1);
@@ -566,7 +571,10 @@
 
    public Page createPage(final int pageNumber) throws Exception
    {
-      String fileName = createFileName(pageNumber);
+      lock();
+      try
+      {
+         String fileName = createFileName(pageNumber);
 
       if (fileFactory == null)
       {
@@ -585,6 +593,12 @@
       file.close();
 
       return page;
+      }
+
+      finally
+      {
+         unlock();
+      }
    }
 
    public void forceAnotherPage() throws Exception
@@ -592,13 +606,13 @@
       openNewPage();
    }
 
-   /** 
-    *  It returns a Page out of the Page System without reading it. 
+   /**
+    *  It returns a Page out of the Page System without reading it.
     *  The method calling this method will remove the page and will start reading it outside of any locks.
     *  This method could also replace the current file by a new file, and that process is done through acquiring a writeLock on currentPageLock
-    *   
-    *  Observation: This method is used internally as part of the regular depage process, but externally is used only on tests, 
-    *               and that's why this method is part of the Testable Interface 
+    *
+    *  Observation: This method is used internally as part of the regular depage process, but externally is used only on tests,
+    *               and that's why this method is part of the Testable Interface
     * */
    public Page depage() throws Exception
    {
@@ -681,7 +695,7 @@
     * @return
     * @throws Exception
     */
-   private Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
+   private final Queue<OurRunnable> onMemoryFreedRunnables = new ConcurrentLinkedQueue<OurRunnable>();
 
    private class MemoryFreedRunnablesExecutor implements Runnable
    {
@@ -866,7 +880,7 @@
 
 
          PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
-         
+
          if (message.isLargeMessage())
          {
             ((LargeServerMessage)message).setPaged();
@@ -880,9 +894,9 @@
             openNewPage();
             currentPageSize.addAndGet(bytesToWrite);
          }
- 
+
          currentPage.write(pagedMessage);
-         
+
          if (tx != null)
          {
             installPageTransaction(tx, listCtx);
@@ -945,15 +959,15 @@
    private static class FinishPageMessageOperation implements TransactionOperation
    {
       public final PageTransactionInfo pageTransaction;
-      
+
       private final StorageManager storageManager;
-      
+
       private final PagingManager pagingManager;
-      
+
       private final Set<PagingStore> usedStores = new HashSet<PagingStore>();
 
       private boolean stored = false;
-      
+
       public void addStore(PagingStore store)
       {
          this.usedStores.add(store);
@@ -1078,9 +1092,9 @@
    }
 
    /**
-    * 
+    *
     * Note: Decimalformat is not thread safe, Use synchronization before calling this method
-    * 
+    *
     * @param pageID
     * @return
     */
@@ -1100,5 +1114,41 @@
       return maxSize > 0 && getAddressSize() > maxSize;
    }
 
+   @Override
+   public Collection<Integer> getCurrentIds() throws Exception
+   {
+      List<Integer> ids = new ArrayList<Integer>();
+      if (fileFactory != null)
+      {
+         for (String fileName : fileFactory.listFiles("page"))
+         {
+            ids.add(getPageIdFromFileName(fileName));
+         }
+      }
+      return ids;
+   }
+
+   @Override
+   public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception
+   {
+      lock();
+      try
+      {
+         for (Integer id : pageIds)
+         {
+            SequentialFile sFile = fileFactory.createSequentialFile(createFileName(id), 1);
+            if (!sFile.exists())
+            {
+               continue;
+            }
+            replicator.syncPages(sFile, id, getAddress());
+         }
+      }
+      finally
+      {
+         unlock();
+      }
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,7 +14,6 @@
 package org.hornetq.core.persistence.impl.journal;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
@@ -350,9 +349,10 @@
    /**
     * XXX FIXME HORNETQ-720 Method ignores the synchronization of Paging.
     * @param replicationManager
+    * @param pagingManager
     * @throws HornetQException
     */
-   public void startReplication(ReplicationManager replicationManager) throws Exception
+   public void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception
    {
       if (!started)
       {
@@ -374,6 +374,7 @@
       final boolean messageJournalAutoReclaim = localMessageJournal.getAutoReclaim();
       final boolean bindingsJournalAutoReclaim = localBindingsJournal.getAutoReclaim();
       Map<String, Long> largeMessageFilesToSync;
+      Map<SimpleString, Collection<Integer>> pageFilesToSync;
       try
       {
          storageManagerLock.writeLock().lock();
@@ -385,9 +386,18 @@
             localBindingsJournal.writeLock();
             try
             {
-               messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
-               bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
-               largeMessageFilesToSync = getLargeMessageInformation();
+               pagingManager.lockAll();
+               try
+               {
+                  messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+                  bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+                  pageFilesToSync = getPageInformationForSync(pagingManager);
+                  largeMessageFilesToSync = getLargeMessageInformation();
+               }
+               finally
+               {
+                  pagingManager.unlockAll();
+               }
             }
             finally
             {
@@ -405,6 +415,7 @@
          sendJournalFile(messageFiles, JournalContent.MESSAGES);
          sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
          sendLargeMessageFiles(largeMessageFilesToSync);
+         sendPagesToBackup(pageFilesToSync, pagingManager);
 
          storageManagerLock.writeLock().lock();
          try
@@ -424,6 +435,42 @@
       }
    }
 
+   /**
+    * @param pageFilesToSync
+    * @throws Exception
+    */
+   private void sendPagesToBackup(Map<SimpleString, Collection<Integer>> pageFilesToSync, PagingManager manager)
+            throws Exception
+   {
+      for (Entry<SimpleString, Collection<Integer>> entry : pageFilesToSync.entrySet())
+      {
+         PagingStore store = manager.getPageStore(entry.getKey());
+         store.sendPages(replicator, entry.getValue());
+      }
+
+   }
+
+   /**
+    * @param pagingManager
+    * @return
+    * @throws Exception
+    */
+   private Map<SimpleString, Collection<Integer>> getPageInformationForSync(PagingManager pagingManager)
+            throws Exception
+   {
+      Map<SimpleString, Collection<Integer>> info = new HashMap<SimpleString, Collection<Integer>>();
+      for (SimpleString storeName : pagingManager.getStoreNames())
+      {
+         PagingStore store = pagingManager.getPageStore(storeName);
+         List<Integer> ids = new ArrayList<Integer>();
+         info.put(storeName, store.getCurrentIds());
+         // XXX perhaps before? unnecessary?
+         store.forceAnotherPage();
+      }
+      replicator.sendPagingInfo(info);
+      return info;
+   }
+
    private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync) throws Exception
    {
       for (Entry<String, Long> entry : largeMessageFilesToSync.entrySet())
@@ -464,11 +511,7 @@
    }
 
    /**
-    * Send an entire journal file to a replicating server (a backup server that is).
-    * @param jf
-    * @param replicator2
-    * @throws IOException
-    * @throws HornetQException
+    * Send an entire journal file to a replicating backup server.
     */
    private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception
    {
@@ -809,7 +852,7 @@
       readLock();
       try
       {
-      messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
+         messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
       }
       finally
       {
@@ -855,9 +898,9 @@
       readLock();
       try
       {
-      pageTransaction.setRecordID(generateUniqueID());
+         pageTransaction.setRecordID(generateUniqueID());
 
-      messageJournal.appendAddRecordTransactional(txID,
+         messageJournal.appendAddRecordTransactional(txID,
                                                   pageTransaction.getRecordID(),
                                                   JournalStorageManager.PAGE_TRANSACTION,
                                                   pageTransaction);
@@ -873,11 +916,10 @@
       readLock();
       try
       {
-      messageJournal.appendUpdateRecordTransactional(txID,
-                                                     pageTransaction.getRecordID(),
-                                                     JournalStorageManager.PAGE_TRANSACTION,
-                                                     new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
-                                                                              depages));
+         messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
+                                                        JournalStorageManager.PAGE_TRANSACTION,
+                                                        new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+                                                                                 depages));
       }
       finally
       {
@@ -890,7 +932,7 @@
       readLock();
       try
       {
-      messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
+         messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
                                         JournalStorageManager.PAGE_TRANSACTION,
                                         new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
                                         syncNonTransactional,
@@ -907,7 +949,7 @@
       readLock();
       try
       {
-      messageJournal.appendUpdateRecordTransactional(txID,
+         messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      JournalStorageManager.ADD_REF,
                                                      new RefEncoding(queueID));
@@ -923,7 +965,7 @@
       readLock();
       try
       {
-      messageJournal.appendUpdateRecordTransactional(txID,
+         messageJournal.appendUpdateRecordTransactional(txID,
                                                      messageID,
                                                      JournalStorageManager.ACKNOWLEDGE_REF,
                                                      new RefEncoding(queueID));
@@ -1042,7 +1084,7 @@
       readLock();
       try
       {
-      messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
+         messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
       }
       finally
       {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -102,6 +102,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -536,6 +537,11 @@
             packet = new ReplicationSyncFileMessage();
             break;
          }
+         case PacketImpl.REPLICATION_CURRENT_PAGES_INFO:
+         {
+            packet = new ReplicationCurrentPagesMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -196,6 +196,7 @@
    public static final byte HA_BACKUP_REGISTRATION = 113;
 
    public static final byte REPLICATION_START_STOP_SYNC = 120;
+   public static final byte REPLICATION_CURRENT_PAGES_INFO = 121;
 
    // Static --------------------------------------------------------
 

Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCurrentPagesMessage.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -0,0 +1,77 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+public final class ReplicationCurrentPagesMessage extends PacketImpl
+{
+
+   private Map<SimpleString, Collection<Integer>> info;
+
+   /**
+    * @param type
+    */
+   public ReplicationCurrentPagesMessage()
+   {
+      super(REPLICATION_CURRENT_PAGES_INFO);
+   }
+
+   /**
+    * @param info
+    */
+   public ReplicationCurrentPagesMessage(Map<SimpleString, Collection<Integer>> info)
+   {
+      this();
+      this.info = info;
+   }
+
+   @Override
+   public void decodeRest(HornetQBuffer buffer)
+   {
+      info = new HashMap<SimpleString, Collection<Integer>>();
+      int entries = buffer.readInt();
+      for (int i = 0; i < entries; i++)
+      {
+         SimpleString name = buffer.readSimpleString();
+         int nPages = buffer.readInt();
+         List<Integer> ids = new ArrayList<Integer>(nPages);
+         for (int j = 0; j < nPages; j++)
+         {
+            ids.add(Integer.valueOf(buffer.readInt()));
+         }
+         info.put(name, ids);
+      }
+   }
+
+   @Override
+   public void encodeRest(HornetQBuffer buffer)
+   {
+      buffer.writeInt(info.size());
+      for (Entry<SimpleString, Collection<Integer>> entry : info.entrySet())
+      {
+         buffer.writeSimpleString(entry.getKey());
+         Collection<Integer> value = entry.getValue();
+         buffer.writeInt(value.size());
+         for (Integer id : value)
+         {
+            buffer.writeInt(id);
+         }
+      }
+   }
+
+   public Map<SimpleString, Collection<Integer>> getInfo()
+   {
+      return info;
+   }
+}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationSyncFileMessage.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -1,17 +1,17 @@
 package org.hornetq.core.protocol.core.impl.wireformat;
 
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Message is used to:
- * <ol>
- * <li>copy JournalFile data over to the backup during synchronization;
- * <li>send a up-to-date signal to backup;
- * </ol>
+ * Message is used to sync {@link SequentialFile}s to a backup server. The {@link FileType} controls
+ * which extra information is sent.
  */
 public final class ReplicationSyncFileMessage extends PacketImpl
 {
@@ -28,31 +28,89 @@
    private int dataSize;
    private ByteBuffer byteBuffer;
    private byte[] byteArray;
+   private SimpleString pageStoreName;
+   private FileType fileType;
 
+   public enum FileType
+   {
+      JOURNAL(0), PAGE(1), LARGE_MESSAGE(2);
+
+      private byte code;
+
+      private FileType(int code)
+      {
+         this.code = (byte)code;
+      }
+
+      /**
+       * @param readByte
+       * @return
+       */
+      public static FileType getFileType(byte readByte)
+      {
+         for (FileType type : EnumSet.allOf(FileType.class))
+         {
+            if (type.code == readByte)
+               return type;
+         }
+         throw new InternalError("Unsupported byte value for " + FileType.class);
+      }
+   }
+
    public ReplicationSyncFileMessage()
    {
       super(REPLICATION_SYNC_FILE);
    }
 
-   public ReplicationSyncFileMessage(JournalContent content, long id, int size, ByteBuffer buffer)
+   public ReplicationSyncFileMessage(JournalContent content, SimpleString storeName, long id, int size,
+                                     ByteBuffer buffer)
    {
       this();
       this.byteBuffer = buffer;
+      this.pageStoreName = storeName;
       this.dataSize = size;
       this.fileId = id;
       this.journalType = content;
+      determineType();
    }
 
+   private void determineType()
+   {
+      if (journalType != null)
+      {
+         fileType = FileType.JOURNAL;
+      }
+      else if (pageStoreName != null)
+      {
+         fileType = FileType.PAGE;
+      }
+      else
+      {
+         fileType = FileType.LARGE_MESSAGE;
+      }
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeLong(fileId);
       if (fileId == -1)
          return;
-      boolean isJournal = journalType != null;
-      buffer.writeBoolean(isJournal);
-      if (isJournal)
-         buffer.writeByte(journalType.typeByte);
+      buffer.writeByte(fileType.code);
+      switch (fileType)
+      {
+         case JOURNAL:
+         {
+            buffer.writeByte(journalType.typeByte);
+            break;
+         }
+         case PAGE:
+         {
+            buffer.writeSimpleString(pageStoreName);
+            break;
+         }
+      }
+
       buffer.writeInt(dataSize);
       /*
        * sending -1 will close the file in case of a journal, but not in case of a largeMessage
@@ -68,9 +126,25 @@
    public void decodeRest(final HornetQBuffer buffer)
    {
       fileId = buffer.readLong();
-      if (buffer.readBoolean())
+      switch (FileType.getFileType(buffer.readByte()))
       {
-         journalType = JournalContent.getType(buffer.readByte());
+         case JOURNAL:
+         {
+            journalType = JournalContent.getType(buffer.readByte());
+            fileType = FileType.JOURNAL;
+            break;
+         }
+         case PAGE:
+         {
+            pageStoreName = buffer.readSimpleString();
+            fileType = FileType.PAGE;
+            break;
+         }
+         case LARGE_MESSAGE:
+         {
+            fileType = FileType.LARGE_MESSAGE;
+            break;
+         }
       }
       int size = buffer.readInt();
       if (size > 0)
@@ -90,19 +164,18 @@
       return journalType;
    }
 
-   /**
-    * @return
-    */
    public byte[] getData()
    {
       return byteArray;
    }
 
-   /**
-    * @return
-    */
-   public boolean isLargeMessage()
+   public FileType getFileType()
    {
-      return journalType == null;
+      return fileType;
    }
+
+   public SimpleString getPageStore()
+   {
+      return pageStoreName;
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.replication;
 
+import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 
 import org.hornetq.api.core.HornetQException;
@@ -114,4 +116,14 @@
     * @throws Exception
     */
    void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws Exception;
+
+   void sendPagingInfo(Map<SimpleString, Collection<Integer>> info);
+
+   /**
+    * @param file
+    * @param id
+    * @param pageStore
+    * @throws Exception
+    */
+   void syncPages(SequentialFile file, long id, SimpleString pageStore) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,6 +14,7 @@
 package org.hornetq.core.replication.impl;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
@@ -47,6 +48,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -205,6 +207,10 @@
          {
             handleReplicationSynchronization((ReplicationSyncFileMessage)packet);
          }
+         else if (type == PacketImpl.REPLICATION_CURRENT_PAGES_INFO)
+         {
+            handleCurrentPagesInfo((ReplicationCurrentPagesMessage)packet);
+         }
          else
          {
             log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -224,17 +230,22 @@
       channel.send(response);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.HornetQComponent#isStarted()
+   /**
+    * @param packet
     */
+   private void handleCurrentPagesInfo(ReplicationCurrentPagesMessage packet)
+   {
+      for (Entry<SimpleString, Collection<Integer>> entry : packet.getInfo().entrySet())
+      {
+         // ignore the actual file list for the moment...
+      }
+   }
+
    public boolean isStarted()
    {
       return started;
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.HornetQComponent#start()
-    */
    public void start() throws Exception
    {
       Configuration config = server.getConfiguration();
@@ -429,7 +440,6 @@
                   SequentialFile sq = lm.getFile();
                   LargeServerMessage mainLM = largeMessagesOnSync.get(id);
                   SequentialFile mainSeqFile = mainLM.getFile();
-                  System.out.println(mainSeqFile);
                   for (;;)
                   {
                      buffer.rewind();
@@ -463,27 +473,41 @@
       Long id = Long.valueOf(msg.getId());
       byte[] data = msg.getData();
       SequentialFile sf;
-      if (msg.isLargeMessage())
+      switch (msg.getFileType())
       {
-         synchronized (largeMessagesOnSync)
+         case LARGE_MESSAGE:
          {
-            LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
-            if (largeMessage == null)
+            synchronized (largeMessagesOnSync)
             {
-               largeMessage = storage.createLargeMessage();
-               largeMessage.setDurable(true);
-               largeMessage.setMessageID(id);
-               largeMessagesOnSync.put(id, largeMessage);
+               LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
+               if (largeMessage == null)
+               {
+                  largeMessage = storage.createLargeMessage();
+                  largeMessage.setDurable(true);
+                  largeMessage.setMessageID(id);
+                  largeMessagesOnSync.put(id, largeMessage);
+               }
+               sf = largeMessage.getFile();
             }
-            sf = largeMessage.getFile();
+            break;
          }
+         case JOURNAL:
+         {
+            JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
+            sf = journalFile.getFile();
+            break;
+         }
+         case PAGE:
+         {
+            Page page = getPage(msg.getPageStore(), (int)msg.getId());
+
+            sf = page.getFile();
+            break;
+         }
+         default:
+            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled file type " + msg.getFileType());
       }
-      else
-      {
-         JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(id);
-         sf = journalFile.getFile();
 
-      }
       if (data == null)
       {
          sf.close();
@@ -751,7 +775,6 @@
             page.close();
          }
       }
-
    }
 
    /**

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,7 +14,9 @@
 package org.hornetq.core.replication.impl;
 
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.LinkedHashSet;
+import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -42,6 +44,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCurrentPagesMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -508,24 +511,32 @@
    {
       SequentialFile file = jf.getFile().copy();
       log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
-      sendLargeFile(content, jf.getFileID(), file, Long.MAX_VALUE);
+      sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
    }
 
    @Override
    public void syncLargeMessageFile(SequentialFile file, long size, long id) throws Exception
    {
-      sendLargeFile(null, id, file, size);
+      sendLargeFile(null, null, id, file, size);
    }
 
+   @Override
+   public void syncPages(SequentialFile file, long id, SimpleString queueName) throws Exception
+   {
+      sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
+   }
+
    /**
     * Sends large files in reasonably sized chunks to the backup during replication synchronization.
-    * @param content journal type or {@code null} for large-messages
+    * @param content journal type or {@code null} for large-messages and pages
+    * @param pageStore page store name for pages, or {@code null} otherwise
     * @param id journal file id or (large) message id
     * @param file
     * @param maxBytesToSend maximum number of bytes to read and send from the file
     * @throws Exception
     */
-   private void sendLargeFile(JournalContent content, final long id, SequentialFile file, long maxBytesToSend)
+   private void sendLargeFile(JournalContent content, SimpleString pageStore, final long id, SequentialFile file,
+            long maxBytesToSend)
             throws Exception
    {
       if (!file.isOpen())
@@ -554,7 +565,7 @@
          buffer.rewind();
 
          // sending -1 or 0 bytes will close the file at the backup
-         sendReplicatePacket(new ReplicationSyncFileMessage(content, id, bytesRead, buffer));
+         sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
          if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
             break;
       }
@@ -572,4 +583,10 @@
       ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
       sendReplicatePacket(msg);
    }
+
+   @Override
+   public void sendPagingInfo(Map<SimpleString, Collection<Integer>> info)
+   {
+      sendReplicatePacket(new ReplicationCurrentPagesMessage(info));
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -58,7 +58,6 @@
 import org.hornetq.core.deployers.impl.SecurityDeployer;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.impl.SyncSpeedTest;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
@@ -1564,7 +1563,7 @@
 
       pagingManager.reloadStores();
 
-      JournalLoadInformation[] journalInfo = loadJournals();
+      loadJournals();
 
       final ServerInfo dumper = new ServerInfo(this, pagingManager);
 
@@ -1646,15 +1645,13 @@
       }
    }
 
-   private JournalLoadInformation[] loadJournals() throws Exception
+   private void loadJournals() throws Exception
    {
-      JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
-
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
 
       List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
 
-      journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+      storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
 
       recoverStoredConfigs();
 
@@ -1685,8 +1682,6 @@
 
          managementService.registerAddress(queueBindingInfo.getAddress());
          managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager);
-
-
       }
 
       for (GroupingInfo groupingInfo : groupingInfos)
@@ -1701,7 +1696,7 @@
 
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
-      journalInfo[1] = storageManager.loadMessageJournal(postOffice,
+      storageManager.loadMessageJournal(postOffice,
                                                          pagingManager,
                                                          resourceManager,
                                                          queues,
@@ -1720,7 +1715,6 @@
          }
       }
 
-      return journalInfo;
    }
 
    /**
@@ -2012,11 +2006,10 @@
       }
 
       JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
-
       replicationManager = new ReplicationManagerImpl(rc, executorFactory);
       replicationManager.start();
 
-      journalStorageManager.startReplication(replicationManager);
+      journalStorageManager.startReplication(replicationManager, pagingManager);
    }
 
    /**

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -19,6 +19,7 @@
 import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
 import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
 import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
 import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
 
 /**
@@ -118,7 +119,8 @@
    @Override
    public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      JournalInternalRecord deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
+      writeRecord(deleteRecordTX, false, null);
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -13,6 +13,7 @@
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
@@ -27,7 +28,7 @@
    private ClientSession session;
    private ClientProducer producer;
    private BackupSyncDelay syncDelay;
-   protected static final int N_MSGS = 10;
+   protected int n_msgs = 10;
 
    @Override
    protected void setUp() throws Exception
@@ -59,17 +60,22 @@
       for (int i = 0; i < totalRounds; i++)
       {
          messageJournal.forceMoveNextFile();
-         sendMessages(session, producer, N_MSGS);
+         sendMessages(session, producer, n_msgs);
       }
 
       backupServer.start();
-
+      syncDelay.deliverUpToDateMsg();
       waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
 
       // SEND more messages, now with the backup replicating
-      sendMessages(session, producer, N_MSGS);
+      sendMessages(session, producer, n_msgs);
       Set<Long> liveIds = getFileIds(messageJournal);
-
+      PagingStore ps = liveServer.getServer().getPagingManager().getPageStore(ADDRESS);
+      if (ps.getPageSizeBytes() == PAGE_SIZE)
+      {
+         assertTrue("isStarted", ps.isStarted());
+         assertFalse("start paging should return false, because we expect paging to be running", ps.startPaging());
+      }
       finishSyncAndFailover();
 
       JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
@@ -79,7 +85,7 @@
       // "+ 2": there two other calls that send N_MSGS.
       for (int i = 0; i < totalRounds + 2; i++)
       {
-         receiveMsgsInRange(0, N_MSGS);
+         receiveMsgsInRange(0, n_msgs);
       }
       assertNoMoreMessages();
    }
@@ -108,13 +114,13 @@
       backupServer.start();
       waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
 
-      sendMessages(session, producer, N_MSGS);
+      sendMessages(session, producer, n_msgs);
       session.commit();
-      receiveMsgsInRange(0, N_MSGS);
+      receiveMsgsInRange(0, n_msgs);
 
       finishSyncAndFailover();
 
-      receiveMsgsInRange(0, N_MSGS);
+      receiveMsgsInRange(0, n_msgs);
       assertNoMoreMessages();
    }
 
@@ -131,16 +137,16 @@
    {
       createProducerSendSomeMessages();
       startBackupCrashLive();
-      receiveMsgsInRange(0, N_MSGS);
+      receiveMsgsInRange(0, n_msgs);
       assertNoMoreMessages();
    }
 
    public void testMessageSync() throws Exception
    {
       createProducerSendSomeMessages();
-      receiveMsgsInRange(0, N_MSGS / 2);
+      receiveMsgsInRange(0, n_msgs / 2);
       startBackupCrashLive();
-      receiveMsgsInRange(N_MSGS / 2, N_MSGS);
+      receiveMsgsInRange(n_msgs / 2, n_msgs);
       assertNoMoreMessages();
    }
 
@@ -159,7 +165,7 @@
       session = sessionFactory.createSession(true, true);
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
       producer = session.createProducer(FailoverTestBase.ADDRESS);
-      sendMessages(session, producer, N_MSGS);
+      sendMessages(session, producer, n_msgs);
       session.commit();
    }
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -40,8 +40,8 @@
       File dir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
       System.out.println("Dir " + dir.getAbsolutePath() + " " + dir.exists());
       // Set<Long> idsOnBkp = getAllMessageFileIds(dir);
-      receiveMsgsInRange(0, N_MSGS / 2);
-      assertEquals("we really ought to delete these after delivery", N_MSGS / 2, getAllMessageFileIds(dir).size());
+      receiveMsgsInRange(0, n_msgs / 2);
+      assertEquals("we really ought to delete these after delivery", n_msgs / 2, getAllMessageFileIds(dir).size());
    }
 
    private Set<Long> getAllMessageFileIds(File dir)

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncPagingTest.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -0,0 +1,34 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+
+public class BackupSyncPagingTest extends BackupSyncJournalTest
+{
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      n_msgs = 100;
+      super.setUp();
+   }
+
+   @Override
+   protected HornetQServer createInVMFailoverServer(final boolean realFiles, final Configuration configuration,
+            final NodeManager nodeManager)
+   {
+      Map<String, AddressSettings> conf = new HashMap<String, AddressSettings>();
+      AddressSettings as = new AddressSettings();
+      as.setMaxSizeBytes(PAGE_MAX);
+      as.setPageSizeBytes(PAGE_SIZE);
+      as.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      conf.put(ADDRESS.toString(), as);
+      return createInVMFailoverServer(realFiles, configuration, PAGE_SIZE, PAGE_MAX, conf, nodeManager);
+   }
+}

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -68,6 +68,9 @@
    protected static final int MIN_LARGE_MESSAGE = 1024;
    private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
 
+   protected static final int PAGE_MAX = 2 * 1024;
+   protected static final int PAGE_SIZE = 1024;
+
    // Attributes ----------------------------------------------------
 
    protected TestableServer liveServer;

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -46,12 +46,8 @@
 
    // Constants -----------------------------------------------------
 
-   private static final int PAGE_MAX = 100 * 1024;
+   private static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
 
-   private static final int PAGE_SIZE = 10 * 1024;
-
-   static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
-
    // Attributes ----------------------------------------------------
    private ServerLocator locator;
 
@@ -91,7 +87,7 @@
       internalTestPage(true, false);
    }
 
-   public void testPageTransactionedFailBeforeconsume() throws Exception
+   public void testPageTransactionedFailBeforeConsume() throws Exception
    {
       internalTestPage(true, true);
    }
@@ -130,9 +126,12 @@
          if (failBeforeConsume)
          {
             crash(session);
+            waitForBackup(sf, 60);
          }
 
 
+
+
          session.close();
 
          session = sf.createSession(!transacted, !transacted, 0);
@@ -217,11 +216,7 @@
    @Override
    protected HornetQServer createServer(final boolean realFiles, final Configuration configuration)
    {
-      return createInVMFailoverServer(true,
-                                      configuration,
-                                      PagingFailoverTest.PAGE_SIZE,
-                                      PagingFailoverTest.PAGE_MAX,
-                                      new HashMap<String, AddressSettings>(),
+      return createInVMFailoverServer(true, configuration, PAGE_SIZE, PAGE_MAX, new HashMap<String, AddressSettings>(),
                                       nodeManager);
    }
 

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,9 +14,7 @@
 package org.hornetq.tests.unit.core.paging.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -53,6 +51,7 @@
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.hornetq.tests.unit.util.FakePagingManager;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.UnitTestCase;
 import org.hornetq.utils.ExecutorFactory;
@@ -939,155 +938,6 @@
 
    // Inner classes -------------------------------------------------
 
-   class FakePagingManager implements PagingManager
-   {
-
-      public void activate()
-      {
-      }
-
-      public long addSize(final long size)
-      {
-         return 0;
-      }
-
-      public void addTransaction(final PageTransactionInfo pageTransaction)
-      {
-      }
-
-      public PagingStore createPageStore(final SimpleString destination) throws Exception
-      {
-         return null;
-      }
-
-      public long getTotalMemory()
-      {
-         return 0;
-      }
-
-      public SimpleString[] getStoreNames()
-      {
-         return null;
-      }
-
-      public long getMaxMemory()
-      {
-         return 0;
-      }
-
-      public PagingStore getPageStore(final SimpleString address) throws Exception
-      {
-         return null;
-      }
-
-      public void deletePageStore(SimpleString storeName) throws Exception
-      {
-      }
-
-      public PageTransactionInfo getTransaction(final long transactionID)
-      {
-         return null;
-      }
-
-      public boolean isBackup()
-      {
-         return false;
-      }
-
-      public boolean isGlobalPageMode()
-      {
-         return false;
-      }
-
-      public boolean isPaging(final SimpleString destination) throws Exception
-      {
-         return false;
-      }
-
-      public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
-      {
-         return false;
-      }
-
-      public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection) throws Exception
-      {
-         return false;
-      }
-
-      public void reloadStores() throws Exception
-      {
-      }
-
-      public void removeTransaction(final long transactionID)
-      {
-
-      }
-
-      public void setGlobalPageMode(final boolean globalMode)
-      {
-      }
-
-      public void setPostOffice(final PostOffice postOffice)
-      {
-      }
-
-      public void resumeDepages()
-      {
-      }
-
-      public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
-      {
-      }
-
-      public boolean isStarted()
-      {
-         return false;
-      }
-
-      public void start() throws Exception
-      {
-      }
-
-      public void stop() throws Exception
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
-       */
-      public boolean isGlobalFull()
-      {
-         return false;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.paging.PagingManager#getTransactions()
-       */
-      public Map<Long, PageTransactionInfo> getTransactions()
-      {
-         // TODO Auto-generated method stub
-         return null;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.paging.PagingManager#processReload()
-       */
-      public void processReload()
-      {
-         // TODO Auto-generated method stub
-
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
-       */
-      public void onChange()
-      {
-      }
-
-   }
-
-
    class FakeStoreFactory implements PagingStoreFactory
    {
 

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -14,10 +14,8 @@
 package org.hornetq.tests.unit.core.postoffice.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -28,18 +26,15 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.paging.PageTransactionInfo;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
 import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.postoffice.impl.DuplicateIDCacheImpl;
 import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.transaction.impl.ResourceManagerImpl;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
+import org.hornetq.tests.unit.util.FakePagingManager;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.ExecutorFactory;
@@ -186,160 +181,4 @@
       }
 
    }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-   static class FakePagingManager implements PagingManager
-   {
-
-      public void activate()
-      {
-      }
-
-      public long addSize(final long size)
-      {
-         return 0;
-      }
-
-      public void addTransaction(final PageTransactionInfo pageTransaction)
-      {
-      }
-
-      public PagingStore createPageStore(final SimpleString destination) throws Exception
-      {
-         return null;
-      }
-
-      public long getTotalMemory()
-      {
-         return 0;
-      }
-
-      public SimpleString[] getStoreNames()
-      {
-         return null;
-      }
-
-      public long getMaxMemory()
-      {
-         return 0;
-      }
-
-      public PagingStore getPageStore(final SimpleString address) throws Exception
-      {
-         return null;
-      }
-
-      public void deletePageStore(SimpleString storeName) throws Exception
-      {
-      }
-
-      public PageTransactionInfo getTransaction(final long transactionID)
-      {
-         return null;
-      }
-
-      public boolean isBackup()
-      {
-         return false;
-      }
-
-      public boolean isGlobalPageMode()
-      {
-         return false;
-      }
-
-      public boolean isPaging(final SimpleString destination) throws Exception
-      {
-         return false;
-      }
-
-      public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
-      {
-         return false;
-      }
-
-      public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection) throws Exception
-      {
-         return false;
-      }
-
-      public void reloadStores() throws Exception
-      {
-      }
-
-      public void removeTransaction(final long transactionID)
-      {
-
-      }
-
-      public void setGlobalPageMode(final boolean globalMode)
-      {
-      }
-
-      public void setPostOffice(final PostOffice postOffice)
-      {
-      }
-
-      public void resumeDepages()
-      {
-      }
-
-      public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
-      {
-      }
-
-      public boolean isStarted()
-      {
-         return false;
-      }
-
-      public void start() throws Exception
-      {
-      }
-
-      public void stop() throws Exception
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
-       */
-      public boolean isGlobalFull()
-      {
-         return false;
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.paging.PagingManager#getTransactions()
-       */
-      public Map<Long, PageTransactionInfo> getTransactions()
-      {
-         return null;
-      }
-
-      
-      
-      
-      /* (non-Javadoc)
-       * @see org.hornetq.core.paging.PagingManager#processReload()
-       */
-      public void processReload()
-      {
-      }
-
-      /* (non-Javadoc)
-       * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
-       */
-      public void onChange()
-      {
-      }
-
-   }
-
 }

Added: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/util/FakePagingManager.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -0,0 +1,173 @@
+package org.hornetq.tests.unit.util;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.ServerMessage;
+
+public final class FakePagingManager implements PagingManager
+{
+
+   public void activate()
+   {
+   }
+
+   public long addSize(final long size)
+   {
+      return 0;
+   }
+
+   public void addTransaction(final PageTransactionInfo pageTransaction)
+   {
+   }
+
+   public PagingStore createPageStore(final SimpleString destination) throws Exception
+   {
+      return null;
+   }
+
+   public long getTotalMemory()
+   {
+      return 0;
+   }
+
+   public SimpleString[] getStoreNames()
+   {
+      return null;
+   }
+
+   public long getMaxMemory()
+   {
+      return 0;
+   }
+
+   public PagingStore getPageStore(final SimpleString address) throws Exception
+   {
+      return null;
+   }
+
+   public void deletePageStore(SimpleString storeName) throws Exception
+   {
+   }
+
+   public PageTransactionInfo getTransaction(final long transactionID)
+   {
+      return null;
+   }
+
+   public boolean isBackup()
+   {
+      return false;
+   }
+
+   public boolean isGlobalPageMode()
+   {
+      return false;
+   }
+
+   public boolean isPaging(final SimpleString destination) throws Exception
+   {
+      return false;
+   }
+
+   public boolean page(final ServerMessage message, final boolean duplicateDetection) throws Exception
+   {
+      return false;
+   }
+
+   public boolean page(final ServerMessage message, final long transactionId, final boolean duplicateDetection)
+            throws Exception
+   {
+      return false;
+   }
+
+   public void reloadStores() throws Exception
+   {
+   }
+
+   public void removeTransaction(final long transactionID)
+   {
+
+   }
+
+   public void setGlobalPageMode(final boolean globalMode)
+   {
+   }
+
+   public void setPostOffice(final PostOffice postOffice)
+   {
+   }
+
+   public void resumeDepages()
+   {
+   }
+
+   public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
+   {
+   }
+
+   public boolean isStarted()
+   {
+      return false;
+   }
+
+   public void start() throws Exception
+   {
+   }
+
+   public void stop() throws Exception
+   {
+   }
+
+   /*
+    * (non-Javadoc)
+    * @see org.hornetq.core.paging.PagingManager#isGlobalFull()
+    */
+   public boolean isGlobalFull()
+   {
+      return false;
+   }
+
+   /*
+    * (non-Javadoc)
+    * @see org.hornetq.core.paging.PagingManager#getTransactions()
+    */
+   public Map<Long, PageTransactionInfo> getTransactions()
+   {
+      return null;
+   }
+
+   /*
+    * (non-Javadoc)
+    * @see org.hornetq.core.paging.PagingManager#processReload()
+    */
+   public void processReload()
+   {
+   }
+
+   /*
+    * (non-Javadoc)
+    * @see org.hornetq.core.settings.HierarchicalRepositoryChangeListener#onChange()
+    */
+   public void onChange()
+   {
+   }
+
+   @Override
+   public void lockAll()
+   {
+      // no-op
+   }
+
+   @Override
+   public void unlockAll()
+   {
+      // no-op
+   }
+
+}

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-09-07 12:45:04 UTC (rev 11298)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-09-07 13:38:25 UTC (rev 11299)
@@ -509,7 +509,7 @@
     */
    protected void assertMessageBody(final int i, final ClientMessage message)
    {
-      Assert.assertEquals("message" + i, message.getBodyBuffer().readString());
+      Assert.assertEquals(message.toString(), "message" + i, message.getBodyBuffer().readString());
    }
 
    /**



More information about the hornetq-commits mailing list