[jboss-cvs] JBoss Messaging SVN: r5453 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Dec 2 16:59:25 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-02 16:59:25 -0500 (Tue, 02 Dec 2008)
New Revision: 5453

Removed:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Few tweaks on Paging...
I wanted to make this commit before my next change on Executors

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -77,12 +77,6 @@
    void setPostOffice(PostOffice postOffice);
 
    /**
-    * @param pagingStoreImpl 
-    * @return false if the listener can't handle more pages
-    */
-   boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PagedMessage[] data) throws Exception;
-
-   /**
     * To be used by transactions only.
     * If you're sure you will page if isPaging, just call the method page and look at its return. 
     * @param destination
@@ -108,6 +102,11 @@
     * Point to inform/restoring Transactions used when the messages were added into paging
     * */
    void addTransaction(PageTransactionInfo pageTransaction);
+   
+   /**
+    * Point to inform/restoring Transactions used when the messages were added into paging
+    * */
+   PageTransactionInfo getTransaction(long transactionID);
 
    /**
     * 
@@ -130,14 +129,33 @@
    void sync(Collection<SimpleString> destinationsToSync) throws Exception;
 
    /**
-    * When we stop depaging, The Last page record needs to removed.
-    * Or else the record could live forever on the journal. 
-    * @throws Exception 
-    * */
-   void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
+    * @return
+    */
+   long getDefaultPageSize();
 
    /**
+    * @param transactionID
+    */
+   void removeTransaction(long transactionID);
+
+   /**
     * @return
     */
-   long getDefaultPageSize();
+   long getMaxGlobalSize();
+
+   /**
+    * @return
+    */
+   long getGlobalSize();
+
+   /**
+    * @param size
+    * @return
+    */
+   long addGlobalSize(long size);
+
+   /**
+    * 
+    */
+   void startGlobalDepage();
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -46,18 +46,12 @@
    /** Maximum number of bytes allowed in memory */
    long getMaxSizeBytes();
 
-   boolean isPrintedDropMessagesWarning();
-
-   void setPrintedDropMessagesWarning(boolean droppedMessages);
-
    boolean isDropWhenMaxSize();
 
    long getPageSizeBytes();
 
    long getAddressSize();
 
-   long addAddressSize(long add);
-
    /** @return true if paging was started, or false if paging was already started before this call */
    boolean startPaging() throws Exception;
 
@@ -68,16 +62,7 @@
    public boolean readPage() throws Exception;
 
    boolean page(PagedMessage message) throws Exception;
-
-   /** 
-    * Remove the first page from the Writing Queue.
-    * The file will still exist until Page.delete is called, 
-    * So, case the system is reloaded the same Page will be loaded back if delete is not called.
-    * @return
-    * @throws Exception 
-    */
-   Page depage() throws Exception;
-
+   
    /**
     * 
     * @return false if a thread was already started, or if not in page mode
@@ -88,4 +73,11 @@
    LastPageRecord getLastPageRecord();
 
    void setLastPageRecord(LastPageRecord record);
+
+   /**
+    * @param memoryEstimate
+    * @return
+    * @throws Exception 
+    */
+   long addSize(long memoryEstimate) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -24,6 +24,8 @@
 
 import java.util.concurrent.Executor;
 
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
@@ -42,4 +44,8 @@
    void stop() throws InterruptedException;
 
    void setPagingManager(PagingManager manager);
+   
+   void setStorageManager(StorageManager storageManager);
+
+   void setPostOffice(PostOffice office);
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -22,10 +22,7 @@
 
 package org.jboss.messaging.core.paging.impl;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,14 +30,12 @@
 
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -80,8 +75,6 @@
 
    private final long defaultPageSize;
 
-   private PostOffice postOffice;
-
    private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
 
    // Static
@@ -177,148 +170,9 @@
     *  (There is a one-to-one relationship here) */
    public void setPostOffice(final PostOffice postOffice)
    {
-      this.postOffice = postOffice;
+      pagingSPI.setPostOffice(postOffice);
    }
 
-   public void clearLastPageRecord(final LastPageRecord lastRecord) throws Exception
-   {
-      trace("Clearing lastRecord information " + lastRecord.getLastId());
-      
-      storageManager.storeDelete(lastRecord.getRecordId());
-   }
-
-   /**
-    * This method will remove files from the page system and and route them, doing it transactionally
-    * 
-    * A Transaction will be opened only if persistent messages are used.
-    * 
-    * If persistent messages are also used, it will update eventual PageTransactions
-    */
-   
-   //TODO - this method should be moved to PagingStoreImpl
-   public boolean onDepage(final int pageId,
-                           final SimpleString destination,
-                           final PagingStore pagingStore,
-                           final PagedMessage[] data) throws Exception
-   {
-      trace("Depaging....");
-
-      // Depage has to be done atomically, in case of failure it should be
-      // back to where it was
-      final long depageTransactionID = storageManager.generateUniqueID();
-
-      LastPageRecord lastPage = pagingStore.getLastPageRecord();
-
-      if (lastPage == null)
-      {
-         lastPage = new LastPageRecordImpl(pageId, destination);
-         
-         pagingStore.setLastPageRecord(lastPage);
-      }
-      else
-      {
-         if (pageId <= lastPage.getLastId())
-         {
-            log.warn("Page " + pageId + " was already processed, ignoring the page");
-            return true;
-         }
-      }
-
-      lastPage.setLastId(pageId);
-      
-      storageManager.storeLastPage(depageTransactionID, lastPage);
-
-      HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
-      final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
-      for (PagedMessage msg : data)
-      {
-         ServerMessage pagedMessage = null;
-
-         pagedMessage = (ServerMessage)msg.getMessage(storageManager);
-
-         final long transactionIdDuringPaging = msg.getTransactionID();
-         
-         if (transactionIdDuringPaging >= 0)
-         {
-            final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
-
-            // http://wiki.jboss.org/wiki/JBossMessaging2Paging
-            // This is the Step D described on the "Transactions on Paging"
-            // section
-            if (pageTransactionInfo == null)
-            {
-               if (isTrace)
-               {
-                  trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + pagedMessage);
-               }
-               continue;
-            }
-
-            // This is to avoid a race condition where messages are depaged
-            // before the commit arrived
-            if (!pageTransactionInfo.waitCompletion())
-            {
-               trace("Rollback was called after prepare, ignoring message " + pagedMessage);
-               continue;
-            }
-
-            // Update information about transactions
-            if (pagedMessage.isDurable())
-            {
-               pageTransactionInfo.decrement();
-               pageTransactionsToUpdate.add(pageTransactionInfo);
-            }
-         }
-
-         refsToAdd.addAll(postOffice.route(pagedMessage));
-
-         if (pagedMessage.getDurableRefCount() != 0)
-         {
-            storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
-         }
-      }
-
-      for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
-      {
-         if (pageWithTransaction.getNumberOfMessages() == 0)
-         {
-            // http://wiki.jboss.org/wiki/JBossMessaging2Paging
-            // numberOfReads==numberOfWrites -> We delete the record
-            storageManager.storeDeletePageTransaction(depageTransactionID, pageWithTransaction.getRecordID());
-            transactions.remove(pageWithTransaction.getTransactionID());
-         }
-         else
-         {
-            storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
-         }
-      }
-
-      storageManager.commit(depageTransactionID);
-
-      trace("Depage committed");
-
-      for (MessageReference ref : refsToAdd)
-      {
-         ref.getQueue().addLast(ref);
-      }
-
-      if (globalMode.get())
-      {
-         // We use the Default Page Size when in global mode for the calculation of the Watermark
-         return globalSize.get() < maxGlobalSize - defaultPageSize && pagingStore.getMaxSizeBytes() <= 0 ||
-                pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
-      }
-      else
-      {
-         // If Max-size is not configured (-1) it will aways return true, as
-         // this method was probably called by global-depage
-         return pagingStore.getMaxSizeBytes() <= 0 || pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
-      }
-
-   }
-
    public long getDefaultPageSize()
    {
       return defaultPageSize;
@@ -338,12 +192,12 @@
 
    public void messageDone(final ServerMessage message) throws Exception
    {
-      addSize(message.getDestination(), message.getMemoryEstimate() * -1);
+      getPageStore(message.getDestination()).addSize(message.getMemoryEstimate() * -1);
    }
 
    public long addSize(final ServerMessage message) throws Exception
    {
-      return addSize(message.getDestination(), message.getMemoryEstimate());
+      return getPageStore(message.getDestination()).addSize(message.getMemoryEstimate());
    }
 
    public boolean page(final ServerMessage message, final long transactionId) throws Exception
@@ -360,6 +214,17 @@
    {
       transactions.put(pageTransaction.getTransactionID(), pageTransaction);
    }
+   
+   public void removeTransaction(final long id)
+   {
+      transactions.remove(id);
+   }
+   
+   public PageTransactionInfo getTransaction(final long id)
+   {
+      return transactions.get(id);
+   }
+   
 
    public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
    {
@@ -386,6 +251,8 @@
       
       pagingSPI.setPagingManager(this);
       
+      pagingSPI.setStorageManager(storageManager);
+      
       started = true;
    }
 
@@ -405,120 +272,52 @@
          store.stop();
       }
    }
+   
+   public void startGlobalDepage()
+   {
+      if (globalDepageRunning.compareAndSet(false, true))
+      {
+         Runnable globalDepageRunnable = new GlobalDepager();
+         pagingSPI.getPagingExecutor().execute(globalDepageRunnable);
+      }
+   }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private PagingStore newStore(final SimpleString destinationName)
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.paging.PagingManager#getGlobalSize()
+    */
+   public long getGlobalSize()
    {
-      return pagingSPI.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
+      return this.globalSize.get();
    }
+   
 
-   //TODO - this method should be moved to PagingStoreImpl
-   private long addSize(final SimpleString destination, final long size) throws Exception
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.paging.PagingManager#addGlobalSize(long)
+    */
+   public long addGlobalSize(long size)
    {
-      final PagingStore store = getPageStore(destination);
+      return globalSize.addAndGet(size);
+   }
 
-      final long maxSize = store.getMaxSizeBytes();
 
-      final long pageSize = store.getPageSizeBytes();
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.paging.PagingManager#getMaxGlobalSize()
+    */
+   public long getMaxGlobalSize()
+   {
+      return maxGlobalSize;
+   }
+   
 
-      if (store.isDropWhenMaxSize() && size > 0)
-      {
-         // if destination configured to drop messages && size is over the
-         // limit, we return -1 which means drop the message
-         if (store.getAddressSize() + size > maxSize || maxGlobalSize > 0 && globalSize.get() + size > maxGlobalSize)
-         {
-            if (!store.isPrintedDropMessagesWarning())
-            {
-               store.setPrintedDropMessagesWarning(true);
-               
-               log.warn("Messages are being dropped on adress " + store.getStoreName());
-            }
+   // Package protected ---------------------------------------------
 
-            return -1l;
-         }
-         else
-         {
-            return store.addAddressSize(size);
-         }
-      }
-      else
-      {
-         long currentGlobalSize = globalSize.addAndGet(size);
+   // Protected -----------------------------------------------------
 
-         final long addressSize = store.addAddressSize(size);
+   // Private -------------------------------------------------------
 
-         if (size > 0)
-         {
-            if (maxGlobalSize > 0 && currentGlobalSize > maxGlobalSize)
-            {
-               globalMode.set(true);
-               
-               if (store.startPaging())
-               {
-                  if (isTrace)
-                  {
-                     trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
-                  }
-               }
-            }
-            else if (maxSize > 0 && addressSize > maxSize)
-            {
-               if (store.startPaging())
-               {
-                  if (isTrace)
-                  {
-                     trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
-                  }
-               }
-            }
-         }
-         else
-         {
-            // When in Global mode, we use the default page size as the minimal
-            // watermark to start depage
-
-            if (isTrace)
-            {
-               log.trace("globalMode.get = " + globalMode.get() +
-                         " currentGlobalSize = " +
-                         currentGlobalSize +
-                         " defaultPageSize = " +
-                         defaultPageSize +
-                         " maxGlobalSize = " +
-                         maxGlobalSize +
-                         "maxGlobalSize - defaultPageSize = " +
-                         (maxGlobalSize - defaultPageSize));
-            }
-
-            if (globalMode.get() && currentGlobalSize < maxGlobalSize - defaultPageSize)
-            {
-               startGlobalDepage();
-            }
-            else if (maxSize > 0 && addressSize < maxSize - pageSize)
-            {
-               if (store.startDepaging())
-               {
-                  log.info("Starting depaging Thread, size = " + addressSize);
-               }
-            }
-         }
-
-         return addressSize;
-      }
-   }
-
-   private void startGlobalDepage()
+   private PagingStore newStore(final SimpleString destinationName)
    {
-      if (globalDepageRunning.compareAndSet(false, true))
-      {
-         Runnable globalDepageRunnable = new GlobalDepager();
-         pagingSPI.getPagingExecutor().execute(globalDepageRunnable);
-      }
+      return pagingSPI.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
    }
 
    // Inner classes -------------------------------------------------
@@ -574,4 +373,5 @@
          }
       }
    }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -33,6 +33,8 @@
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.SimpleString;
@@ -54,6 +56,10 @@
    private final ExecutorService executor;
 
    private PagingManager pagingManager;
+   
+   private StorageManager storageManager;
+   
+   private PostOffice postOffice;
 
    // Static --------------------------------------------------------
 
@@ -91,16 +97,28 @@
       destinationFile.mkdirs();
 
       return new PagingStoreImpl(pagingManager,
+                                 storageManager,
+                                 postOffice,
                                  newFileFactory(destinationDirectory),
                                  destinationName,
                                  settings,
                                  executor);
    }
 
-   public void setPagingManager(final PagingManager manager)
+   public void setPagingManager(final PagingManager pagingManager)
    {
-      pagingManager = manager;
+      this.pagingManager = pagingManager;
    }
+   
+   public void setStorageManager(final StorageManager storageManager)
+   {
+      this.storageManager = storageManager; 
+   }
+   
+   public void setPostOffice(final PostOffice postOffice)
+   {
+      this.postOffice = postOffice;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -23,6 +23,8 @@
 package org.jboss.messaging.core.paging.impl;
 
 import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -37,9 +39,14 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
@@ -57,6 +64,10 @@
 
    // Attributes ----------------------------------------------------
 
+   private final StorageManager storageManager;
+   
+   private final PostOffice postOffice;
+
    private final DecimalFormat format = new DecimalFormat("000000000");
 
    private final AtomicInteger pageUsedSize = new AtomicInteger(0);
@@ -103,9 +114,24 @@
 
    // Static --------------------------------------------------------
 
+   // private static final boolean isTrace = log.isTraceEnabled();
+   private static final boolean isTrace = true;
+
+   // This is just a debug tool method.
+   // During debugs you could make log.trace as log.info, and change the
+   // variable isTrace above
+   private static void trace(final String message)
+   {
+      // log.trace(message);
+      log.info(message);
+   }
+
+
    // Constructors --------------------------------------------------
 
    public PagingStoreImpl(final PagingManager pagingManager,
+                          final StorageManager storageManager,
+                          final PostOffice postOffice,
                           final SequentialFileFactory fileFactory,
                           final SimpleString storeName,
                           final QueueSettings queueSettings,
@@ -116,6 +142,10 @@
          throw new IllegalStateException("Paging Manager can't be null");
       }
       
+      this.storageManager = storageManager;
+      
+      this.postOffice = postOffice;
+      
       this.fileFactory = fileFactory;
       
       this.storeName = storeName;
@@ -142,28 +172,11 @@
 
    // PagingStore implementation ------------------------------------
 
-   //TODO - this methods shouldn't be necessary if move functionality from
-   //PagingManagerImpl to PagingStoreImpl
-   public boolean isPrintedDropMessagesWarning()
-   {
-      return printedDropMessagesWarning;
-   }
-
-   public void setPrintedDropMessagesWarning(final boolean droppedMessages)
-   {
-      this.printedDropMessagesWarning = droppedMessages;
-   }
-
    public long getAddressSize()
    {
       return sizeInBytes.get();
    }
 
-   public long addAddressSize(final long delta)
-   {
-      return sizeInBytes.addAndGet(delta);
-   }
-
    /** Maximum number of bytes allowed in memory */
    public long getMaxSizeBytes()
    {
@@ -217,7 +230,7 @@
       {
          if (lastPageRecord != null)
          {
-            pagingManager.clearLastPageRecord(lastPageRecord);
+            clearLastPageRecord(lastPageRecord);
          }
          
          lastPageRecord = null;
@@ -229,7 +242,7 @@
       
       PagedMessage messages[] = page.read();
       
-      boolean addressNotFull = pagingManager.onDepage(page.getPageId(), storeName, PagingStoreImpl.this, messages);
+      boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
       
       page.delete();
 
@@ -241,7 +254,6 @@
     *  The method calling this method will remove the page and will start reading it outside of any locks. 
     *  
     * */
-   //FIXME - why is this public?
    public Page depage() throws Exception
    {
       writeLock.lock();
@@ -303,6 +315,102 @@
 
    }
 
+   
+   public long addSize(final long size) throws Exception
+   {
+      final long maxSize = getMaxSizeBytes();
+
+      final long pageSize = getPageSizeBytes();
+
+      if (isDropWhenMaxSize() && size > 0)
+      {
+         // if destination configured to drop messages && size is over the
+         // limit, we return -1 which means drop the message
+         if (getAddressSize() + size > maxSize || pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() + size > pagingManager.getMaxGlobalSize())
+         {
+            if (!printedDropMessagesWarning)
+            {
+               printedDropMessagesWarning = true;
+               
+               log.warn("Messages are being dropped on adress " + getStoreName());
+            }
+
+            return -1l;
+         }
+         else
+         {
+            return addAddressSize(size);
+         }
+      }
+      else
+      {
+         final long currentGlobalSize = pagingManager.addGlobalSize(size);
+         
+         final long maxGlobalSize = pagingManager.getMaxGlobalSize();
+
+         final long addressSize = addAddressSize(size);
+         
+         if (size > 0)
+         {
+            if (maxGlobalSize > 0 && currentGlobalSize > maxGlobalSize)
+            {
+               pagingManager.setGlobalPageMode(true);
+               
+               if (startPaging())
+               {
+                  if (isTrace)
+                  {
+                     trace("Starting paging on " + getStoreName() + ", size = " + addressSize + ", maxSize=" + maxSize);
+                  }
+               }
+            }
+            else if (maxSize > 0 && addressSize > maxSize)
+            {
+               if (startPaging())
+               {
+                  if (isTrace)
+                  {
+                     trace("Starting paging on " + getStoreName() + ", size = " + addressSize + ", maxSize=" + maxSize);
+                  }
+               }
+            }
+         }
+         else
+         {
+            // When in Global mode, we use the default page size as the minimal
+            // watermark to start depage
+
+            if (isTrace)
+            {
+               log.trace("globalMode.get = " + pagingManager.isGlobalPageMode() +
+                         " currentGlobalSize = " +
+                         currentGlobalSize +
+                         " defaultPageSize = " +
+                         pagingManager.getDefaultPageSize() +
+                         " maxGlobalSize = " +
+                         maxGlobalSize +
+                         "maxGlobalSize - defaultPageSize = " +
+                         (maxGlobalSize - pagingManager.getDefaultPageSize()));
+            }
+
+            if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
+            {
+               pagingManager.startGlobalDepage();
+            }
+            else if (maxSize > 0 && addressSize < maxSize - pageSize)
+            {
+               if (startDepaging())
+               {
+                  log.info("Starting depaging Thread, size = " + addressSize);
+               }
+            }
+         }
+
+         return addressSize;
+      }
+   }
+
+   
    public boolean page(final PagedMessage message) throws Exception
    {
       // Max-size is set, but reject is activated, what means.. never page on
@@ -564,6 +672,8 @@
          writeLock.unlock();
       }
    }
+   
+   
 
    // TestSupportPageStore ------------------------------------------
 
@@ -577,7 +687,144 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
+   
+   
+   /**
+    * This method will remove files from the page system and and route them, doing it transactionally
+    * 
+    * A Transaction will be opened only if persistent messages are used.
+    * 
+    * If persistent messages are also used, it will update eventual PageTransactions
+    */
+   
+   private boolean onDepage(final int pageId,
+                           final SimpleString destination,
+                           final PagedMessage[] data) throws Exception
+   {
+      trace("Depaging....");
 
+      // Depage has to be done atomically, in case of failure it should be
+      // back to where it was
+      final long depageTransactionID = storageManager.generateUniqueID();
+
+      LastPageRecord lastPage = getLastPageRecord();
+
+      if (lastPage == null)
+      {
+         lastPage = new LastPageRecordImpl(pageId, destination);
+         
+         setLastPageRecord(lastPage);
+      }
+      else
+      {
+         if (pageId <= lastPage.getLastId())
+         {
+            log.warn("Page " + pageId + " was already processed, ignoring the page");
+            return true;
+         }
+      }
+
+      lastPage.setLastId(pageId);
+      
+      storageManager.storeLastPage(depageTransactionID, lastPage);
+
+      HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+
+      final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
+      for (PagedMessage msg : data)
+      {
+         ServerMessage pagedMessage = null;
+
+         pagedMessage = (ServerMessage)msg.getMessage(storageManager);
+
+         final long transactionIdDuringPaging = msg.getTransactionID();
+         
+         if (transactionIdDuringPaging >= 0)
+         {
+            final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging); 
+
+            // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+            // This is the Step D described on the "Transactions on Paging"
+            // section
+            if (pageTransactionInfo == null)
+            {
+               if (isTrace)
+               {
+                  trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + pagedMessage);
+               }
+               continue;
+            }
+
+            // This is to avoid a race condition where messages are depaged
+            // before the commit arrived
+            if (!pageTransactionInfo.waitCompletion())
+            {
+               trace("Rollback was called after prepare, ignoring message " + pagedMessage);
+               continue;
+            }
+
+            // Update information about transactions
+            if (pagedMessage.isDurable())
+            {
+               pageTransactionInfo.decrement();
+               pageTransactionsToUpdate.add(pageTransactionInfo);
+            }
+         }
+
+         refsToAdd.addAll(postOffice.route(pagedMessage));
+
+         if (pagedMessage.getDurableRefCount() != 0)
+         {
+            storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
+         }
+      }
+
+      for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+      {
+         if (pageWithTransaction.getNumberOfMessages() == 0)
+         {
+            // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+            // numberOfReads==numberOfWrites -> We delete the record
+            storageManager.storeDeletePageTransaction(depageTransactionID, pageWithTransaction.getRecordID());
+            pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
+         }
+         else
+         {
+            storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
+         }
+      }
+
+      storageManager.commit(depageTransactionID);
+
+      trace("Depage committed");
+
+      for (MessageReference ref : refsToAdd)
+      {
+         ref.getQueue().addLast(ref);
+      }
+
+      if (pagingManager.isGlobalPageMode())
+      {
+         // We use the Default Page Size when in global mode for the calculation of the Watermark
+         return pagingManager.getGlobalSize() < pagingManager.getMaxGlobalSize() - pagingManager.getDefaultPageSize() && getMaxSizeBytes() <= 0 ||
+                getAddressSize() < getMaxSizeBytes();
+      }
+      else
+      {
+         // If Max-size is not configured (-1) it will aways return true, as
+         // this method was probably called by global-depage
+         return getMaxSizeBytes() <= 0 || getAddressSize() < getMaxSizeBytes();
+      }
+
+   }
+
+
+   private long addAddressSize(final long delta)
+   {
+      return sizeInBytes.addAndGet(delta);
+   }
+
    private synchronized void clearDequeueThread()
    {
       dequeueThread = null;
@@ -615,6 +862,13 @@
       }
    }
 
+   public void clearLastPageRecord(final LastPageRecord lastRecord) throws Exception
+   {
+      trace("Clearing lastRecord information " + lastRecord.getLastId());
+      
+      storageManager.storeDelete(lastRecord.getRecordId());
+   }
+
    private Page createPage(final int page) throws Exception
    {
       String fileName = createFileName(page);

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.core.paging.impl;
 
+import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagingStore;
 
 /**
@@ -31,5 +32,16 @@
  */
 public interface TestSupportPageStore extends PagingStore
 {
+   /** 
+    * Remove the first page from the Writing Queue.
+    * The file will still exist until Page.delete is called, 
+    * So, case the system is reloaded the same Page will be loaded back if delete is not called.
+    * @return
+    * @throws Exception
+    * 
+    * Note: This should still be part of the interface, even though JBossMessaging only uses through the 
+    */
+   Page depage() throws Exception;
+
    void forceAnotherPage() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -25,7 +25,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -48,11 +47,11 @@
 import org.jboss.messaging.core.server.SendLock;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.SendLockImpl;
-import org.jboss.messaging.core.transaction.ResourceManager;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * A PostOfficeImpl
@@ -495,36 +494,21 @@
 
          queues.put(binding.getQueue().getPersistenceID(), binding.getQueue());
       }
-      // TODO: This is related to http://www.jboss.com/index.html?module=bb&op=viewtopic&t=145597
-
-      //FIXME This is incorrect - you cannot assume there is an allowable address in existence
-      //for every address in the post office.
-      //This code is unnecessary if paging stores are loaded lazily
-      HashSet<SimpleString> addresses = new HashSet<SimpleString>();
-
-      for (Binding binding : bindings)
+      
+      for (SimpleString destination : addressManager.getMappings().keySet())
       {
-         addresses.add(binding.getAddress());
-      }
-
-      for (SimpleString destination : dests)
-      {
-         addresses.add(destination);
-      }
-
-      for (SimpleString destination : addresses)
-      {
          pagingManager.createPageStore(destination);
       }
 
-      // End TODO -------------------------------------
-
-
       storageManager.loadMessages(this, queues, resourceManager);
 
-      for (SimpleString destination : addresses)
+
+      // If Paging was interrupted due a server stop, during restart we need to resume depaging those addresses
+      for (SimpleString destination : addressManager.getMappings().keySet())
       {
          PagingStore store = pagingManager.getPageStore(destination);
+         
+         // FIXME this should be changed as soon as we change the threading model
          if (!pagingManager.isGlobalPageMode())
          {
             if (store.isPaging() && store.getMaxSizeBytes() < 0)
@@ -537,6 +521,7 @@
             }
          }
       }
+
    }
 
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -31,6 +31,7 @@
 import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -65,7 +66,7 @@
    {
       HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
       queueSettings.setDefault(new QueueSettings());
-      
+
       PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(journalDir),
                                                             null,
                                                             queueSettings,
@@ -74,7 +75,7 @@
 
       managerImpl.start();
 
-      PagingStore store = managerImpl.createPageStore(new SimpleString("simple-test"));
+      TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"));
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
 
@@ -120,7 +121,7 @@
                                                             -1,
                                                             1024 * 1024);
       managerImpl.start();
-      
+
       managerImpl.createPageStore(new SimpleString("simple-test"));
 
       ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));

Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.paging.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.easymock.EasyMock;
-import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PagedMessage;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
-import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
-import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
-
-public class PageManagerImplTest extends UnitTestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   private static HierarchicalRepository<QueueSettings> repoSettings = new HierarchicalObjectRepository<QueueSettings>();
-   static
-   {
-      repoSettings.setDefault(new QueueSettings());
-   }
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   public void testOnDepage() throws Exception
-   {
-      long time = System.currentTimeMillis() + 10000;
-      List<MessageReference> refs = new ArrayList<MessageReference>();
-      MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
-      refs.add(ref);
-      Queue queue = EasyMock.createStrictMock(Queue.class);
-      HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
-      queueSettings.setDefault(new QueueSettings());
-      PostOffice po = EasyMock.createStrictMock(PostOffice.class);
-      PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      PagingStore store = EasyMock.createNiceMock(PagingStore.class);
-      StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
-      PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1, 1024 * 1024);
-      manager.setPostOffice(po);
-      ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
-
-      EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
-      EasyMock.expect(po.route(message)).andReturn(refs);
-      EasyMock.expect(message.getDurableRefCount()).andReturn(1);
-      storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
-      storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
-      storageManager.commit(EasyMock.anyLong());
-      EasyMock.expect(ref.getQueue()).andReturn(queue);
-      EasyMock.expect(queue.addLast(ref)).andReturn(null);
-      EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
-      SimpleString queueName = new SimpleString("aq");
-      PagedMessageImpl pageMessage = new PagedMessageImpl(message);
-
-      manager.onDepage(0, queueName, store, new PagedMessage[] {pageMessage} );
-      EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -61,7 +61,13 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
-      PagingStore storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+      PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
+                                                  createStorageManagerMock(),
+                                                  createPostOfficeMock(),
+                                                  factory,
+                                                  destinationTestName,
+                                                  new QueueSettings(),
+                                                  executor);
 
       storeImpl.start();
 
@@ -78,7 +84,13 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
-      PagingStore storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+      PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
+                                                  createStorageManagerMock(),
+                                                  createPostOfficeMock(),
+                                                  factory,
+                                                  destinationTestName,
+                                                  new QueueSettings(),
+                                                  executor);
 
       storeImpl.start();
 
@@ -105,7 +117,13 @@
 
       storeImpl.sync();
 
-      storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+      storeImpl = new PagingStoreImpl(createMockManager(),
+                                      createStorageManagerMock(),
+                                      createPostOfficeMock(),
+                                      factory,
+                                      destinationTestName,
+                                      new QueueSettings(),
+                                      executor);
 
       storeImpl.start();
 
@@ -113,12 +131,17 @@
 
    }
 
-   
    public void testDepageOnCurrentPage() throws Exception
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
-      PagingStore storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+      TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+                                                  createStorageManagerMock(),
+                                                  createPostOfficeMock(),
+                                                  factory,
+                                                  destinationTestName,
+                                                  new QueueSettings(),
+                                                  executor);
 
       storeImpl.start();
 
@@ -174,6 +197,8 @@
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
       TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+                                                           createStorageManagerMock(),
+                                                           createPostOfficeMock(),
                                                            factory,
                                                            destinationTestName,
                                                            new QueueSettings(),
@@ -292,9 +317,9 @@
       testConcurrentPaging(factory, 10);
 
    }
-   
+
    public void testFoo()
-   {      
+   {
    }
 
    // Protected ----------------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-12-02 21:59:25 UTC (rev 5453)
@@ -42,6 +42,8 @@
 import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -106,6 +108,8 @@
       settings.setPageSizeBytes(MAX_SIZE);
 
       final TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+                                                                 createStorageManagerMock(),
+                                                                 createPostOfficeMock(),
                                                                  factory,
                                                                  new SimpleString("test"),
                                                                  settings,
@@ -259,7 +263,7 @@
          fileTmp.close();
       }
 
-      TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(), factory, new SimpleString("test"), settings, executor);
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(), createStorageManagerMock(), createPostOfficeMock(), factory, new SimpleString("test"), settings, executor);
       storeImpl2.start();
 
       int numberOfPages = storeImpl2.getNumberOfPages();
@@ -357,6 +361,21 @@
       EasyMock.replay(mockManager);
       return mockManager;
    }
+   
+   protected StorageManager createStorageManagerMock()
+   {
+      StorageManager storageManager = EasyMock.createNiceMock(StorageManager.class);
+      EasyMock.replay(storageManager);
+      return storageManager;
+   }
+   
+   
+   protected PostOffice createPostOfficeMock()
+   {
+      PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
+      EasyMock.replay(postOffice);
+      return postOffice;
+   }
 
    // Private -------------------------------------------------------
 




More information about the jboss-cvs-commits mailing list