[jboss-cvs] JBoss Messaging SVN: r4890 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 28 15:59:29 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-28 15:59:28 -0400 (Thu, 28 Aug 2008)
New Revision: 4890

Removed:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
Log:
Merging Pager and PagingManager into a single class

Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -1,103 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import java.util.Collection;
-
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * 
- * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
- * 
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- */
-public interface Pager
-{
-   
-   /**
-    * @param pagingStoreImpl 
-    * @return false if the listener can't handle more pages
-    */
-   boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
-   
-   /**
-    * To be used by transactions only.
-    * If you're sure you will page if isPaging, just call the method page and look at its return. 
-    * @param destination
-    * @return
-    */
-   boolean isPaging(SimpleString destination) throws Exception;
-   
-   /**
-    * Page, only if destination is in page mode.
-    * @param message
-    * @return false if destination is not on page mode
-    */
-   boolean page(ServerMessage message) throws Exception;
-   
-   /**
-    * Page, only if destination is in page mode.
-    * @param message
-    * @return false if destination is not on page mode
-    */
-   boolean page(ServerMessage message, long transactionId) throws Exception;
-   
-   /**
-    * Point to inform/restoring Transactions used when the messages were added into paging
-    * */
-   void addTransaction(PageTransactionInfo pageTransaction);
-   
-   
-   void completeTransaction(long transactionId);
-   
-   
-   /**
-    * 
-    * Duplication detection for paging processing
-    *  */
-   void loadLastPage(LastPageRecord lastPage) throws Exception;
-   
-   /** 
-    * 
-    * To be called when there are no more references to the message
-    * @param message
-    */
-   void messageDone(ServerMessage message) throws Exception;
-   
-   /** To be called when a rollback is called after messageDone was called */
-   long addSize(ServerMessage message) throws Exception;
-
-   void sync(Collection<SimpleString> destinationsToSync) throws Exception;
-
-   /**
-    * When we stop depaging, The Last page record needs to removed.
-    * Or else the record could live forever on the journal. 
-    * @throws Exception 
-    * */
-   void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
-   
-   
-}

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -22,7 +22,11 @@
 
 package org.jboss.messaging.core.paging;
 
+import java.util.Collection;
+
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.util.SimpleString;
 
 /**
@@ -30,9 +34,85 @@
  * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public interface PagingManager extends MessagingComponent
 {
-   public PagingStore getPageStore(SimpleString storeName) throws Exception;
+
+   /** To return the PageStore associated with the address */
+   public PagingStore getPageStore(SimpleString address) throws Exception;
+   
+   /** An injection point for the PostOffice to inject itself */
+   void setPostOffice(PostOffice postOffice);
+   
+   /**
+    * @param pagingStoreImpl 
+    * @return false if the listener can't handle more pages
+    */
+   boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
+   
+   /**
+    * To be used by transactions only.
+    * If you're sure you will page if isPaging, just call the method page and look at its return. 
+    * @param destination
+    * @return
+    */
+   boolean isPaging(SimpleString destination) throws Exception;
+   
+   /**
+    * Page, only if destination is in page mode.
+    * @param message
+    * @return false if destination is not on page mode
+    */
+   boolean page(ServerMessage message) throws Exception;
+   
+   /**
+    * Page, only if destination is in page mode.
+    * @param message
+    * @return false if destination is not on page mode
+    */
+   boolean page(ServerMessage message, long transactionId) throws Exception;
+   
+   /**
+    * Point to inform/restoring Transactions used when the messages were added into paging
+    * */
+   void addTransaction(PageTransactionInfo pageTransaction);
+   
+   
+   /**
+    * Use this method to inform when a transaction was completed.
+    * @param transactionId
+    */
+   void completeTransaction(long transactionId);
+   
+   
+   /**
+    * 
+    * Duplication detection for paging processing
+    *  */
+   void loadLastPage(LastPageRecord lastPage) throws Exception;
+   
+   /** 
+    * 
+    * To be called when there are no more references to the message
+    * @param message
+    */
+   void messageDone(ServerMessage message) throws Exception;
+   
+   /** To be called when a rollback is called after messageDone was called */
+   long addSize(ServerMessage message) throws Exception;
+
+   /** Sync current-pages on disk for these destinations */
+   void sync(Collection<SimpleString> destinationsToSync) throws Exception;
+
+   /**
+    * When we stop depaging, The Last page record needs to removed.
+    * Or else the record could live forever on the journal. 
+    * @throws Exception 
+    * */
+   void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
+   
+   
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -75,7 +75,7 @@
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 
     */
-   boolean startDequeueThread(Pager listener) throws Exception;
+   boolean startDequeueThread(PagingManager listener) throws Exception;
 
    LastPageRecord getLastRecord();
 

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -23,23 +23,40 @@
 
 package org.jboss.messaging.core.paging.impl;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
 /**
+ *  <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
 public class PagingManagerImpl implements PagingManager
 {
+
+   
    // Constants -----------------------------------------------------
    
    // Attributes ----------------------------------------------------
@@ -52,18 +69,43 @@
    
    private final PagingStoreFactory pagingSPI;
    
-   // Static --------------------------------------------------------
+   private final StorageManager storageManager;
+
+   private PostOffice postOffice;
    
-   // Constructors --------------------------------------------------
+   private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
    
-   public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+
+   
+   // Static --------------------------------------------------------------------------------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+   
+   //private static final boolean isTrace = log.isTraceEnabled();
+   private static final boolean isTrace = true;
+   
+   // This is just a debug tool method.
+   // During debugs you could make log.trace as log.info, and change the variable isTrace above
+   private static void trace(String message)
    {
+      //log.trace(message);
+      log.info(message);
+   }
+   
+   
+   // Constructors --------------------------------------------------------------------------------------------------------------------
+   
+   public PagingManagerImpl(final PagingStoreFactory pagingSPI, StorageManager storageManager, final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+   {
       this.pagingSPI = pagingSPI;
       this.queueSettingsRepository = queueSettingsRepository;
+      this.storageManager = storageManager;
    }
    
-   // Public --------------------------------------------------------
+   // Public ---------------------------------------------------------------------------------------------------------------------------
    
+   // PagingManager implementation -----------------------------------------------------------------------------------------------------
+   
    public PagingStore getPageStore(final SimpleString storeName) throws Exception
    {
       validateStarted();
@@ -88,6 +130,181 @@
       
    }
    
+   /** this will be set by the postOffice itself.
+    *  There is no way to set this on the constructor as the PagingManager is constructed before the postOffice.
+    *  (There is a one-to-one relationship here) */
+   public void setPostOffice(PostOffice postOffice)
+   {
+      this.postOffice = postOffice;
+   }
+
+   public void clearLastPageRecord(LastPageRecord lastRecord) throws Exception
+   {
+      trace("Clearing lastRecord information " + lastRecord.getLastId());
+      storageManager.storeDelete(lastRecord.getRecordId());
+   }
+   
+   /**
+    * This method will remove files from the page system and add them into the journal, doing it transactionally
+    * 
+    * A Transaction will be opened only if persistent messages are used.
+    * If persistent messages are also used, it will update eventual PageTransactions
+    */
+   public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
+   {
+      log.info("Depaging....");
+      
+      /// Depage has to be done atomically, in case of failure it should be back to where it was
+      final long depageTransactionID = storageManager.generateTransactionID();
+      
+      LastPageRecord lastPage = pagingStore.getLastRecord(); 
+      
+      if (lastPage == null)
+      {
+         lastPage = new LastPageRecordImpl(pageId, destination);
+         pagingStore.setLastRecord(lastPage);
+      }
+      else
+      {
+         if (pageId <= lastPage.getLastId())
+         {
+            log.warn("Page " + pageId + " was already processed, ignoring the page");
+            return true;
+         }
+      }
+
+      lastPage.setLastId(pageId);
+      storageManager.storeLastPage(depageTransactionID, lastPage);
+      
+      HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+
+      final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+      
+      for (PageMessage msg: data)
+      {
+         final long transactionIdDuringPaging = msg.getTransactionID();
+         if (transactionIdDuringPaging > 0)
+         {
+            final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
+            
+            // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+            // This is the Step D described on the "Transactions on Paging" section
+            if (pageTransactionInfo == null)
+            {
+               if (isTrace)
+               {
+                  trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+               }
+               continue;
+            }
+            
+            // This is to avoid a race condition where messages are depaged before the commit arrived
+            pageTransactionInfo.waitCompletion();
+
+            /// Update information about transactions
+            if (msg.getMessage().isDurable())
+            {
+               pageTransactionInfo.decrement();
+               pageTransactionsToUpdate.add(pageTransactionInfo);
+            }
+         }
+         
+         msg.getMessage().setMessageID(storageManager.generateMessageID());
+
+         refsToAdd.addAll(postOffice.route(msg.getMessage()));
+         
+         if (msg.getMessage().getDurableRefCount() != 0)
+         {
+            storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
+         }
+      }
+      
+      
+      for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
+      {
+         if (pageWithTransaction.getNumberOfMessages() == 0)
+         { 
+            // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+            // numberOfReads==numberOfWrites -> We delete the record
+            storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
+            this.transactions.remove(pageWithTransaction.getTransactionID());
+         }
+         else
+         {
+            storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
+         }
+      }
+      
+      storageManager.commit(depageTransactionID);
+
+      for (MessageReference ref : refsToAdd)
+      {
+         ref.getQueue().addLast(ref);
+      }
+      
+      return pagingStore.getQueueSize() < pagingStore.getMaxSizeBytes(); 
+   }
+   
+   public void loadLastPage(LastPageRecord lastPage) throws Exception
+   {
+      System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
+      this.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
+   }
+
+   public boolean isPaging(SimpleString destination) throws Exception
+   {
+      return this.getPageStore(destination).isPaging();
+   }
+   
+   public void messageDone(ServerMessage message) throws Exception
+   {
+      addSize(message.getDestination(), message.getEncodeSize() * -1);
+   }
+   
+   public long addSize(final ServerMessage message) throws Exception
+   {
+      return addSize(message.getDestination(), message.getEncodeSize());      
+   }
+   
+   public boolean page(ServerMessage message, long transactionId)
+         throws Exception
+   {
+      return this.getPageStore(message.getDestination()).page(new PageMessageImpl(message, transactionId));
+   }
+
+
+   public boolean page(ServerMessage message) throws Exception
+   {
+      return this.getPageStore(message.getDestination()).page(new PageMessageImpl(message));
+   }
+
+   
+   public void addTransaction(PageTransactionInfo pageTransaction)
+   {
+      this.transactions.put(pageTransaction.getTransactionID(), pageTransaction);
+   }
+
+   public void completeTransaction(long transactionId)
+   {
+      PageTransactionInfo pageTrans = this.transactions.get(transactionId);
+      
+      // If nothing was paged.. we just remove the information to avoid memory leaks
+      if (pageTrans.getNumberOfMessages() == 0)
+      {
+         this.transactions.remove(pageTrans);
+      }
+   }
+   
+   public void sync(Collection<SimpleString> destinationsToSync) throws Exception
+   {
+      for (SimpleString destination: destinationsToSync)
+      {
+         this.getPageStore(destination).sync();
+      }
+   }
+   
+   // MessagingComponent implementation ------------------------------------------------------------------------------------------------
+   
    public boolean isStarted()
    {
       return started;
@@ -128,7 +345,45 @@
          throw new IllegalStateException("PagingManager is not started");
       }
    }
+
    
+   private long addSize(final SimpleString destination, final long size) throws Exception
+   {
+      final PagingStore store = this.getPageStore(destination);
+      
+      final long addressSize = store.addQueueSize(size);
+      
+      final long maxSize = store.getMaxSizeBytes();
+
+      if (size > 0)
+      {
+         
+         if (maxSize > 0 && addressSize > maxSize)
+         {
+            if (store.startPaging())
+            {
+               if (isTrace)
+               {
+                  trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
+               }
+            }
+         }
+      }
+      else
+      {
+         if ( maxSize > 0 && addressSize < maxSize)
+         {
+            if (store.startDequeueThread(this))
+            {
+               log.info("Starting depaging Thread, size = " + addressSize);
+            }
+         }
+      }
+      
+      return addressSize;
+   }
+
+   
    // Inner classes -------------------------------------------------
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -37,7 +37,7 @@
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PageMessage;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
@@ -294,7 +294,7 @@
       }
    }
    
-   public boolean startDequeueThread(final Pager listener) throws Exception
+   public boolean startDequeueThread(final PagingManager listener) throws Exception
    {
       lock.readLock().lock();
       try
@@ -424,16 +424,26 @@
    {
       validateInit();
 
+      
+      // First check without any global locks.
+      // (Faster)
+      lock.readLock().lock();
+      try
+      {
+         if (currentPage != null)
+         {
+            return false;
+         }
+      }
+      finally
+      {
+         lock.readLock().unlock();
+      }
+      
+      
+      // if the first check failed, we do it again under a global lock (positioningGlobalLock) this time
       positioningGlobalLock.acquire();
       
-      // StartPaging would change positioning (by changing currentPage), because of that it needs to be in a synchronized block.
-      // Case this lock becomes a contention, we will need to implement the dual-lock antipattern (which I tried to avoid):
-      //      if (currentPage == null)
-      //      {
-      //           synchronizedBlockLock.acquire();
-      //           if (currentPage == null) // this dual-verification should be fine as currentPage is volatile
-      //              etc, etc... 
-                      
       try
       {
          if (currentPage == null)
@@ -560,9 +570,9 @@
    
    class DequeueThread extends Thread
    {
-      final Pager listener;
+      final PagingManager listener;
       
-      public DequeueThread(final Pager listener)
+      public DequeueThread(final PagingManager listener)
       {
          this.listener = listener;
       }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -49,7 +49,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.LastPageRecord;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
@@ -326,9 +326,9 @@
                
                pageTransactionInfo.setRecordID(record.id);
                
-               Pager pager = postOffice.getPager();
+               PagingManager pagingManager = postOffice.getPagingManager();
                
-               pager.addTransaction(pageTransactionInfo);
+               pagingManager.addTransaction(pageTransactionInfo);
 
                break;
 			   }
@@ -341,10 +341,10 @@
 			      recordImpl.setRecordId(record.id);
 			      
 			      recordImpl.decode(buff);
-			      
-               Pager pager = postOffice.getPager();
                
-               pager.loadLastPage(recordImpl);
+               PagingManager pagingManager = postOffice.getPagingManager();
+               
+               pagingManager.loadLastPage(recordImpl);
 			      
 			      break;
 			   }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -27,7 +27,7 @@
 import java.util.Set;
 
 import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.server.MessageReference;
 import org.jboss.messaging.core.server.MessagingComponent;
@@ -83,7 +83,7 @@
 
    Set<SimpleString> listAllDestinations();
    
-   Pager getPager();
+   PagingManager getPagingManager();
    
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -23,10 +23,8 @@
 package org.jboss.messaging.core.postoffice.impl;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -38,14 +36,8 @@
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageMessage;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.paging.Pager;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -100,13 +92,12 @@
    
    private final PagingManager pagingManager;
    
-   private final Pager pager = new PagerImpl();
-   
    private volatile boolean started;
     
    public PostOfficeImpl(final StorageManager storageManager,
                          final PagingManager pagingManager,
-   		                final QueueFactory queueFactory, final boolean checkAllowable)
+   		                final QueueFactory queueFactory, 
+   		                final boolean checkAllowable)
    {
       this.storageManager = storageManager;
       
@@ -121,6 +112,8 @@
    
    public void start() throws Exception
    {
+      this.pagingManager.setPostOffice(this);
+
       pagingManager.start();
       
       loadBindings();
@@ -241,7 +234,7 @@
    public List<MessageReference> route(final ServerMessage message) throws Exception
    {
       
-      pager.addSize(message);
+      pagingManager.addSize(message);
       
       SimpleString address = message.getDestination();
       
@@ -298,9 +291,9 @@
 //      }
 //   }
    
-   public Pager getPager()
+   public PagingManager getPagingManager()
    {
-      return this.pager;
+      return this.pagingManager;
    }
 
 
@@ -417,221 +410,8 @@
       for (SimpleString destination: dests)
       {
          PagingStore store = pagingManager.getPageStore(destination);
-         store.startDequeueThread(pager);
+         store.startDequeueThread(pagingManager);
       }
    }
 
-   // TODO this probably will become a separate class?
-   private class PagerImpl implements Pager
-   {
-      
-      private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
-
-      public void clearLastPageRecord(LastPageRecord lastRecord) throws Exception
-      {
-         trace("Clearing lastRecord information " + lastRecord.getLastId());
-         storageManager.storeDelete(lastRecord.getRecordId());
-      }
-      
-      /**
-       * This method will remove files from the page system and add them into the journal, doing it transactionally
-       * 
-       * A Transaction will be opened only if persistent messages are used.
-       * If persistent messages are also used, it will update eventual PageTransactions
-       */
-      public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
-      {
-         log.info("Depaging....");
-         
-         /// Depage has to be done atomically, in case of failure it should be back to where it was
-         final long depageTransactionID = storageManager.generateTransactionID();
-         
-         LastPageRecord lastPage = pagingStore.getLastRecord(); 
-         
-         if (lastPage == null)
-         {
-            lastPage = new LastPageRecordImpl(pageId, destination);
-            pagingStore.setLastRecord(lastPage);
-         }
-         else
-         {
-            if (pageId <= lastPage.getLastId())
-            {
-               log.warn("Page " + pageId + " was already processed, ignoring the page");
-               return true;
-            }
-         }
-
-         lastPage.setLastId(pageId);
-         storageManager.storeLastPage(depageTransactionID, lastPage);
-         
-         HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
-         final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-         
-         for (PageMessage msg: data)
-         {
-            final long transactionIdDuringPaging = msg.getTransactionID();
-            if (transactionIdDuringPaging > 0)
-            {
-               final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
-               
-               // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
-               // This is the Step D described on the "Transactions on Paging" section
-               if (pageTransactionInfo == null)
-               {
-                  if (isTrace)
-                  {
-                     trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
-                  }
-                  continue;
-               }
-               
-               // This is to avoid a race condition where messages are depaged before the commit arrived
-               pageTransactionInfo.waitCompletion();
-
-               /// Update information about transactions
-               if (msg.getMessage().isDurable())
-               {
-                  pageTransactionInfo.decrement();
-                  pageTransactionsToUpdate.add(pageTransactionInfo);
-               }
-            }
-            
-            msg.getMessage().setMessageID(storageManager.generateMessageID());
-
-            refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
-            
-            if (msg.getMessage().getDurableRefCount() != 0)
-            {
-               storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
-            }
-         }
-         
-         
-         for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
-         {
-            if (pageWithTransaction.getNumberOfMessages() == 0)
-            { 
-               // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
-               // numberOfReads==numberOfWrites -> We delete the record
-               storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
-               this.transactions.remove(pageWithTransaction.getTransactionID());
-            }
-            else
-            {
-               storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
-            }
-         }
-         
-         storageManager.commit(depageTransactionID);
-
-         for (MessageReference ref : refsToAdd)
-         {
-            ref.getQueue().addLast(ref);
-         }
-         
-         return pagingStore.getQueueSize() < pagingStore.getMaxSizeBytes(); 
-      }
-      
-      public void loadLastPage(LastPageRecord lastPage) throws Exception
-      {
-         System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
-         pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
-      }
-
-      public boolean isPaging(SimpleString destination) throws Exception
-      {
-         return pagingManager.getPageStore(destination).isPaging();
-      }
-      
-      public void messageDone(ServerMessage message) throws Exception
-      {
-         addSize(message.getDestination(), message.getEncodeSize() * -1);
-      }
-      
-      /** To be called when a rollback is called after messageDone was called */
-      public long addSize(final ServerMessage message) throws Exception
-      {
-         return addSize(message.getDestination(), message.getEncodeSize());      
-      }
-      
-      public boolean page(ServerMessage message, long transactionId)
-            throws Exception
-      {
-         return pagingManager.getPageStore(message.getDestination()).page(new PageMessageImpl(message, transactionId));
-      }
-
-
-      public boolean page(ServerMessage message) throws Exception
-      {
-         return pagingManager.getPageStore(message.getDestination()).page(new PageMessageImpl(message));
-      }
-
-      
-      public void addTransaction(PageTransactionInfo pageTransaction)
-      {
-         this.transactions.put(pageTransaction.getTransactionID(), pageTransaction);
-      }
-
-      public void completeTransaction(long transactionId)
-      {
-         PageTransactionInfo pageTrans = this.transactions.get(transactionId);
-         
-         // If nothing was paged.. we just remove the information to avoid memory leaks
-         if (pageTrans.getNumberOfMessages() == 0)
-         {
-            this.transactions.remove(pageTrans);
-         }
-      }
-      
-      public void sync(Collection<SimpleString> destinationsToSync) throws Exception
-      {
-         for (SimpleString destination: destinationsToSync)
-         {
-            pagingManager.getPageStore(destination).sync();
-         }
-      }
-      
-      
-      
-      private long addSize(final SimpleString destination, final long size) throws Exception
-      {
-         final PagingStore store = pagingManager.getPageStore(destination);
-         
-         final long addressSize = store.addQueueSize(size);
-         
-         final long maxSize = store.getMaxSizeBytes();
-
-         if (size > 0)
-         {
-            
-            if (maxSize > 0 && addressSize > maxSize)
-            {
-               if (store.startPaging())
-               {
-                  if (isTrace)
-                  {
-                     trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
-                  }
-               }
-            }
-         }
-         else
-         {
-            if ( maxSize > 0 && addressSize < maxSize)
-            {
-               if (store.startDequeueThread(this))
-               {
-                  log.info("Starting depaging Thread, size = " + addressSize);
-               }
-            }
-         }
-         
-         return addressSize;
-      }
-      
-
-
-   }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -169,7 +169,7 @@
       scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));                  
       queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
       
-      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()), queueSettingsRepository);
+      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()), storageManager, queueSettingsRepository);
       
       postOffice = new PostOfficeImpl(storageManager, pagingManager, 
             queueFactory, configuration.isRequireDestinations());

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -39,7 +39,7 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.filter.impl.FilterImpl;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -130,7 +130,7 @@
    
    private final PostOffice postOffice;
    
-   private final Pager pager;
+   private final PagingManager pager;
 
    private final SecurityStore securityStore;
    
@@ -164,7 +164,7 @@
       
       this.postOffice = postOffice;
       
-      this.pager = postOffice.getPager();
+      this.pager = postOffice.getPagingManager();
       
       this.queueSettingsRepository = queueSettingsRepository;
       

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -33,7 +33,7 @@
 
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -59,7 +59,7 @@
 
    private final PostOffice postOffice;
    
-   private final Pager pager;
+   private final PagingManager pagingManager;
 
    private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
 
@@ -83,16 +83,16 @@
                           final PostOffice postOffice)
    {
       this.storageManager = storageManager;
-
+      
       this.postOffice = postOffice;
       
       if (postOffice == null)
       {
-         pager = null;
+         pagingManager = null;
       }
       else
       {
-         this.pager = postOffice.getPager();
+         this.pagingManager = postOffice.getPagingManager();
       }
 
       this.xid = null;
@@ -106,9 +106,17 @@
       this.storageManager = storageManager;
 
       this.postOffice = postOffice;
-      
-      this.pager = postOffice.getPager();
 
+      if (postOffice == null)
+      {
+         pagingManager = null;
+      }
+      else
+      {
+         this.pagingManager = postOffice.getPagingManager();
+      }
+
+
       this.xid = xid;
 
       this.id = storageManager.generateTransactionID();
@@ -129,7 +137,7 @@
          throw new IllegalStateException("Transaction is in invalid state " + state);
       }
       
-      if (pager.isPaging(message.getDestination()))
+      if (pagingManager.isPaging(message.getDestination()))
       {
          pagedMessages.add(message);
       }
@@ -153,9 +161,9 @@
       
       if (message.decrementRefCount() == 0)
       {
-         if (pager != null)
+         if (pagingManager != null)
          {
-            pager.messageDone(message);
+            pagingManager.messageDone(message);
          }
       }
 
@@ -320,7 +328,7 @@
          // Putting back the size to control paging
          if (message.incrementRefCount() == 1)
          {
-            pager.addSize(message);
+            pagingManager.addSize(message);
          }
          
          message.incrementRefCount();
@@ -441,7 +449,7 @@
          {
             pageTransaction = new PageTransactionInfoImpl(this.id);
             // To avoid a race condition where depage happens before the transaction is completed, we need to inform the pager about this transaction is being processed 
-            pager.addTransaction(pageTransaction);
+            pagingManager.addTransaction(pageTransaction);
          }
       }
 
@@ -451,7 +459,7 @@
        
          // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
          // Explained under Transaction On Paging. (This is the item B)
-         if (pager.page(message, id))
+         if (pagingManager.page(message, id))
          {
             if (message.isDurable())
             {
@@ -473,7 +481,7 @@
          containsPersistent = true;
          if (pagedDestinationsToSync.size() > 0)
          {
-            pager.sync(pagedDestinationsToSync);
+            pagingManager.sync(pagedDestinationsToSync);
             storageManager.storePageTransaction(id, pageTransaction);
          }
       }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -62,7 +62,7 @@
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
-public class TypedProperties implements EncodingSupport
+public class TypedProperties 
 {  
 	private static final Logger log = Logger.getLogger(TypedProperties.class);
 	

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -69,7 +69,7 @@
       
       
       PagingManagerImpl managerImpl = 
-         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024), queueSettings);
+         new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024), null, queueSettings);
       managerImpl.start();
       
       PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -28,7 +28,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -166,16 +166,12 @@
    public boolean page(ServerMessage message, long transactionID)
          throws Exception
    {
-      // TODO Auto-generated method stub
       return false;
    }
 
-   public Pager getPager()
+   public PagingManager getPagingManager()
    {
-      // TODO Auto-generated method stub
       return null;
    }
-   
-   
-   
+
 }

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-28 19:59:28 UTC (rev 4890)
@@ -67,7 +67,7 @@
       queueSettings.setDefault(new QueueSettings());
 
       PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      PagingManagerImpl manager = new PagingManagerImpl(spi, queueSettings);
+      PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings);
       
       SimpleString destination = new SimpleString("some-destination");
 
@@ -120,7 +120,7 @@
    public void testMultipleThreadsGetStore() throws Exception
    {
       PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
-      final PagingManagerImpl manager = new PagingManagerImpl(spi, repoSettings);
+      final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings);
       
       final SimpleString destination = new SimpleString("some-destination");
 




More information about the jboss-cvs-commits mailing list