[jboss-cvs] JBoss Messaging SVN: r4833 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/config/impl and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 18 19:26:08 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-08-18 19:26:08 -0400 (Mon, 18 Aug 2008)
New Revision: 4833

Modified:
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
   branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java
Log:
Integrating Paging into PostOffice (2nd commit)

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -125,6 +125,14 @@
    String getJournalDirectory();
    
    void setJournalDirectory(String dir);
+   
+   String getPagingDirectory();
+   
+   void setPagingDirectory(String dir);
+   
+   void setPageFileSize(long size);
+   
+   long getPageFileSize();
 
    JournalType getJournalType();
    

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -77,6 +77,10 @@
    
    public static final String DEFAULT_JOURNAL_DIR = "data/journal";
    
+   public static final String DEFAULT_PAGING_DIR = "data/paging";
+   
+   public static final long DEFAULT_PAGE_FILE_SIZE = 10 * 1024 * 1024;
+   
    public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
    
    public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
@@ -115,6 +119,10 @@
    
    protected String journalDirectory = DEFAULT_JOURNAL_DIR;
    
+   protected String pagingDirectory = DEFAULT_PAGING_DIR;
+   
+   protected long pageFileSize = DEFAULT_PAGE_FILE_SIZE;
+   
    protected boolean createJournalDir = DEFAULT_CREATE_JOURNAL_DIR;
    
    public JournalType journalType = DEFAULT_JOURNAL_TYPE;
@@ -357,6 +365,27 @@
 		return journalType;
 	}
 	
+	
+	public void setPagingDirectory(String dir)
+	{
+	   this.pagingDirectory = dir;
+	}
+	
+	public String getPagingDirectory()
+	{
+	   return this.pagingDirectory;
+	}
+	
+	public long getPageFileSize()
+	{
+	   return this.pageFileSize;
+	}
+	
+	public void setPageFileSize(long size)
+	{
+	   this.pageFileSize = size;
+	}
+	
 	public void setJournalType(JournalType type)
    {
       this.journalType = type;

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -26,9 +26,7 @@
 import java.io.Reader;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
 
 import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
@@ -189,6 +187,10 @@
       createBindingsDir = getBoolean(e, "create-bindings-dir", createBindingsDir);
 
       journalDirectory = getString(e, "journal-directory", journalDirectory);
+      
+      pagingDirectory = getString(e, "paging-directory", pagingDirectory);
+      
+      pageFileSize = getLong(e, "page-file-size", pageFileSize);
 
       createJournalDir = getBoolean(e, "create-journal-dir", createJournalDir);
 

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -23,8 +23,10 @@
 
 package org.jboss.messaging.core.paging;
 
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -40,9 +42,10 @@
    
    int getNumberOfPages();
    
-   String getStoreName();
+   SimpleString getStoreName();
    
-   void startPaging() throws Exception;
+   /** @return true if paging was started, or false if paging was already started before this call */
+   boolean startPaging() throws Exception;
    
    boolean isPaging();
    
@@ -58,5 +61,13 @@
     * @throws Exception 
     */
    Page dequeuePage() throws Exception;
+   
+   /**
+    * 
+    * @param postOffice
+    * @return false if a thread was already started, or if not in page mode
+    * @throws Exception 
+    */
+   boolean startDequeueThread(PostOffice postOffice, long maxSize) throws Exception;
 
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -33,6 +33,6 @@
 public interface PagingStoreFactory
 {
 
-   PagingStore newStore(String destinationName);   
+   PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);   
    
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -28,6 +28,7 @@
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -43,13 +44,13 @@
    // Attributes ----------------------------------------------------
    
    private final String directory;
-   private final int pageSize;
+   private final long pageSize;
    
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
    
-   public PagingManagerFactoryNIO(final String directory, final int pageSize)
+   public PagingManagerFactoryNIO(final String directory, final long pageSize)
    {
       this.directory = directory;
       this.pageSize = pageSize;
@@ -57,9 +58,9 @@
    
    // Public --------------------------------------------------------
 
-   public PagingStore newStore(String destinationName)
+   public PagingStore newStore(SimpleString destinationName)
    {
-      final String destinationDirectory = directory + "/" + destinationName;
+      final String destinationDirectory = directory + "/" + destinationName.toString();
       File destinationFile = new File(destinationDirectory);
       destinationFile.mkdirs();
       

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -59,7 +59,7 @@
    
    // Public --------------------------------------------------------
    
-   public PagingStore getPageStore(SimpleString storeName) throws Exception
+   public PagingStore getPageStore(final SimpleString storeName) throws Exception
    {
       validateStarted();
       
@@ -67,7 +67,7 @@
       if (store == null)
       {
          
-         store = newStore(storeName.toString());
+         store = newStore(storeName);
          
          PagingStore oldStore = stores.putIfAbsent(storeName, store);
          
@@ -111,7 +111,7 @@
    
    // Private -------------------------------------------------------
    
-   private PagingStore newStore(String destinationName)
+   private PagingStore newStore(final SimpleString destinationName)
    {
       return pagingSPI.newStore(destinationName);
    }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -32,9 +32,12 @@
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -45,6 +48,7 @@
 {
 
    // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(PagingStoreImpl.class);
    
    // Attributes ----------------------------------------------------
    
@@ -52,12 +56,14 @@
    
    private final AtomicInteger pageUsedSize = new AtomicInteger(0);
    
-   private final String storeName;
+   private final SimpleString storeName;
    
    private final SequentialFileFactory factory;
    
    private final long maxPageSize;
    
+   private volatile Thread dequeueThread;
+   
    private volatile int numberOfPages;
    private volatile int firstPageId = Integer.MAX_VALUE;
    private volatile int currentPageId;
@@ -75,7 +81,7 @@
    // Constructors --------------------------------------------------
    
    
-   public PagingStoreImpl(final SequentialFileFactory factory, final String storeName, final long maxPageSize) 
+   public PagingStoreImpl(final SequentialFileFactory factory, final SimpleString storeName, final long maxPageSize) 
    {
       this.factory = factory;
       this.storeName = storeName;
@@ -106,7 +112,7 @@
       return numberOfPages;
    }
    
-   public String getStoreName()
+   public SimpleString getStoreName()
    {
       return storeName;
    }
@@ -242,15 +248,39 @@
       }
    }
    
+   public boolean startDequeueThread(final PostOffice postOffice, final long maxSize) throws Exception
+   {
+      if (!isPaging())
+      {
+         return false;
+      }
+      else
+      {
+         synchronized (this)
+         {
+            if (this.dequeueThread == null)
+            {
+               this.dequeueThread = new DequeueThread(postOffice, maxSize);
+               this.dequeueThread.start();
+               return true;
+            }
+            else
+            {
+               return false;
+            }
+         }
+      }
+   }
    
+   
    // MessagingComponent implementation
    
-   public boolean isStarted()
+   public synchronized boolean isStarted()
    {
       return initialized;
    }
    
-   public void stop() throws Exception
+   public synchronized void stop() throws Exception
    {
       if (initialized)
       {
@@ -272,7 +302,7 @@
    }
 
    
-   public void start() throws Exception
+   public synchronized void start() throws Exception
    {
 
       if (initialized)
@@ -324,7 +354,7 @@
       }
    }
    
-   public void startPaging() throws Exception
+   public boolean startPaging() throws Exception
    {
       validateInit();
 
@@ -334,7 +364,12 @@
          if (currentPage == null)
          {
             openNewPage();
+            return true;
          }
+         else
+         {
+            return false;
+         }
       }
       finally
       {
@@ -359,7 +394,13 @@
    
    
    // Private -------------------------------------------------------
+
    
+   private synchronized void clearDequeueThread()
+   {
+      this.dequeueThread = null;
+   }
+   
 
    private void openNewPage() throws Exception
    {
@@ -442,4 +483,47 @@
    
    // Inner classes -------------------------------------------------
    
+   class DequeueThread extends Thread
+   {
+      final PostOffice postOffice;
+      final long maxSize;
+      
+      public DequeueThread(final PostOffice postOffice, final long maxSize)
+      {
+         this.postOffice = postOffice;
+         this.maxSize = maxSize;
+      }
+      
+      
+      public void run()
+      {
+         try
+         {
+            while (postOffice.getSize(storeName) < maxSize)
+            {
+               Page page = dequeuePage();
+               if (page == null)
+               {
+                  break;
+               }
+               page.open();
+               ServerMessage messages[] = page.read();
+               for (ServerMessage message: messages)
+               {
+                  postOffice.routeAndDeliver(message);
+               }
+               page.delete();
+            }
+         }
+         catch (Exception e)
+         {
+            log.error(e, e);
+         }
+         finally
+         {
+            PagingStoreImpl.this.clearDequeueThread();
+         }
+      }
+   }
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -81,6 +81,9 @@
 
    Set<SimpleString> listAllDestinations();
 
+   void routeAndDeliver(final ServerMessage msg) throws Exception;
+   
+   
    /**
     * To be used by transactions only.
     * If you're sure you will page if isPaging, just call the method page and look at its return. 
@@ -90,17 +93,22 @@
    boolean isPaging(SimpleString destination) throws Exception;
    
    /**
-    * If the destination is not being paged, this method will return false
+    * Page, only if destination is in page mode.
     * @param message
-    * @return
+    * @return false if destination is not on page mode
     */
    boolean page(ServerMessage message) throws Exception;
    
    /** 
     * 
-    * To be called whenever message is being deleted. (i.e. no references)
+    * To be called when there are no more references to the message
     * @param message
     */
-   void messageRemoved(ServerMessage message) throws Exception;
+   void messageDone(ServerMessage message) throws Exception;
    
+   /** To be called when a rollback is called after messageDone was called */
+   long addSize(ServerMessage message) throws Exception;
+   
+   long getSize(SimpleString destination) throws Exception;
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -38,6 +38,7 @@
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.FlowController;
@@ -50,6 +51,8 @@
 import org.jboss.messaging.util.ConcurrentSet;
 import org.jboss.messaging.util.SimpleString;
 
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
 /**
  * 
  * A PostOfficeImpl
@@ -59,6 +62,9 @@
  */
 public class PostOfficeImpl implements PostOffice
 {  
+   
+   private static final long MAX_SIZE = 100 * 1024 * 1024;
+   
    private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
    
    //private final int nodeID;
@@ -102,6 +108,8 @@
    
    public void start() throws Exception
    {
+      pagingManager.start();
+      
       loadBindings();
       
       started = true;
@@ -109,6 +117,8 @@
 
    public void stop() throws Exception
    {
+      pagingManager.stop();
+      
       mappings.clear();
       
       destinations.clear();
@@ -135,7 +145,7 @@
       	{
       		storageManager.addDestination(address);     
       	}
-      	 
+      	
          flowControllers.put(address, new FlowControllerImpl(address, this));
    	}
    	
@@ -214,10 +224,33 @@
    {
       return nameMap.get(queueName);
    }
+   
+   public void routeAndDeliver(final ServerMessage message) throws Exception
+   {
+      List<MessageReference> refs = this.route(message);
+      
+      if (message.getDurableRefCount() != 0)
+      {
+         storageManager.storeMessage(message);
+      }
+      
+      for (MessageReference ref : refs)
+      {
+         ref.getQueue().addLast(ref);
+      }
+   }
          
    public List<MessageReference> route(final ServerMessage message) throws Exception
    {
-      getQueueSize(message.getDestination()).addAndGet(message.getEncodeSize());
+      if (addSize(message.getDestination(), message.getEncodeSize()) > MAX_SIZE)
+      {
+         PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+         if (store.startPaging())
+         {
+            log.info("Starting paging on " + message.getDestination());
+         }
+      }
       
       SimpleString address = message.getDestination();
       
@@ -279,19 +312,34 @@
    {
       return pagingManager.getPageStore(destination).isPaging();
    }
-
-   public void messageRemoved(ServerMessage message)
+   
+   public void messageDone(ServerMessage message) throws Exception
    {
-      addSize(message.getDestination(), message.getEncodeSize() * -1);      
+      final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
+      
+      if (size < MAX_SIZE)
+      {
+         PagingStore manager = pagingManager.getPageStore(message.getDestination());
+         if (manager.startDequeueThread(this, MAX_SIZE))
+         {
+            log.info("Starting dequeing page for " + message.getDestination());
+         }
+         
+      }
    }
+   
+   /** To be called when a rollback is called after messageDone was called */
+   public long addSize(ServerMessage message) throws Exception
+   {
+      return addSize(message.getDestination(), message.getEncodeSize());      
+   }
+   
 
    public boolean page(ServerMessage message) throws Exception
    {
       return pagingManager.getPageStore(message.getDestination()).writeOnCurrentPage(message);
    }
 
-   
-
    public Map<SimpleString, List<Binding>> getMappings()
    {
       return mappings;
@@ -301,14 +349,19 @@
    {   	
    	return flowControllers.get(address);
    }
+   
+   public long getSize(SimpleString destination)
+   {
+      return getQueueSize(destination).get();
+   }
 
    // Private -----------------------------------------------------------------
    
  
-   private void addSize(SimpleString destination, long size)
+   private long addSize(SimpleString destination, long size)
    {
-      getQueueSize(destination).addAndGet(size);
       totalSize.addAndGet(size);
+      return getQueueSize(destination).addAndGet(size);
    }
    
    private AtomicLong getQueueSize(SimpleString destination)
@@ -424,5 +477,11 @@
       }
                  
       storageManager.loadMessages(this, queues);
+      
+      for (SimpleString destination: dests)
+      {
+         PagingStore store = pagingManager.getPageStore(destination);
+         store.startDequeueThread(this, MAX_SIZE);
+      }
    }
 }

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -29,6 +29,7 @@
  * A ServerMessage
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public interface ServerMessage extends Message
@@ -49,6 +50,12 @@
    
    int getDurableRefCount();
    
+   int decrementRefCount();
+   
+   int incrementRefCount();
+   
+   int getRefCount();
+   
    ServerMessage copy();
 }
 

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -33,6 +33,7 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.management.MessagingServerManagement;
 import org.jboss.messaging.core.management.impl.MessagingServerManagementImpl;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
@@ -168,8 +169,11 @@
       securityStore = new SecurityStoreImpl(configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled());  
       queueSettingsRepository.setDefault(new QueueSettings());
       scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));                  
-      queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);      
-      postOffice = new PostOfficeImpl(storageManager, new PagingManagerImpl(new PagingManagerFactoryNIO("/tmp/factory", 10*1024*1024)), 
+      queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
+      
+      PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+      
+      postOffice = new PostOfficeImpl(storageManager, pagingManager, 
             queueFactory, configuration.isRequireDestinations());
                        
       securityRepository = new HierarchicalObjectRepository<Set<Role>>();

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -36,6 +36,7 @@
  * 
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
  */
 public class ServerMessageImpl extends MessageImpl implements ServerMessage
@@ -45,6 +46,9 @@
    private long connectionID;
    
    private final AtomicInteger durableRefCount = new AtomicInteger(0);
+   
+   /** Global reference counts for paging control */
+   private final AtomicInteger refCount = new AtomicInteger(0);
               
    /*
     * Constructor for when reading from network
@@ -113,6 +117,8 @@
          durableRefCount.incrementAndGet();
       }
       
+      refCount.incrementAndGet();
+      
       return ref;
    }
    
@@ -131,6 +137,21 @@
       return durableRefCount.incrementAndGet();
    }
      
+   public int decrementRefCount()
+   {
+      return refCount.decrementAndGet();
+   }
+
+   public int getRefCount()
+   {
+      return refCount.get();
+   }
+
+   public int incrementRefCount()
+   {
+      return refCount.incrementAndGet();
+   }
+
    public ServerMessage copy()
    {
       return new ServerMessageImpl(this);

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -306,17 +306,20 @@
       
       if (autoCommitSends)
       {
-         List<MessageReference> refs = postOffice.route(msg);
-
-         if (msg.getDurableRefCount() != 0)
+         if (!postOffice.page(msg))
          {
-            storageManager.storeMessage(msg);
+            List<MessageReference> refs = postOffice.route(msg);
+   
+            if (msg.getDurableRefCount() != 0)
+            {
+               storageManager.storeMessage(msg);
+            }
+            
+            for (MessageReference ref : refs)
+            {
+               ref.getQueue().addLast(ref);
+            }
          }
-         
-         for (MessageReference ref : refs)
-         {
-            ref.getQueue().addLast(ref);
-         }
       }
       else
       {
@@ -1138,6 +1141,11 @@
 
       Queue queue = ref.getQueue();
       
+      if (message.decrementRefCount() == 0)
+      {
+         postOffice.messageDone(message);
+      }
+      
       if (message.isDurable() && queue.isDurable())
       {
          int count = message.decrementDurableRefCount();

Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -126,6 +126,14 @@
       acknowledgements.add(acknowledgement);
 
       ServerMessage message = acknowledgement.getMessage();
+      
+      if (message.decrementRefCount() == 0)
+      {
+         if (postOffice != null)
+         {
+            postOffice.messageDone(message);
+         }
+      }
 
       if (message.isDurable())
       {
@@ -269,6 +277,14 @@
             // Reverse the decrements we did in the tx
             message.incrementDurableRefCount();
          }
+         
+         // Putting back the size to control paging
+         if (message.incrementRefCount() == 1)
+         {
+            postOffice.addSize(message);
+         }
+         
+         message.incrementRefCount();
 
          LinkedList<MessageReference> list = queueMap.get(queue);
 

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -109,7 +109,6 @@
    public boolean removeDestination(SimpleString address, boolean temporary)
          throws Exception
    {
-      // TODO Auto-generated method stub
       return false;
    }
 
@@ -139,7 +138,7 @@
       return false;
    }
 
-   public void messageRemoved(ServerMessage message) throws Exception
+   public void messageDone(ServerMessage message) throws Exception
    {
    }
 
@@ -147,5 +146,22 @@
    {
       return false;
    }
+
+   public long addSize(ServerMessage message) throws Exception
+   {
+      return 0;
+   }
+
+   public long getSize(SimpleString destination) throws Exception
+   {
+      return 0;
+   }
+
+   public void routeAndDeliver(ServerMessage msg) throws Exception
+   {
+      throw new IllegalStateException("Not implemented!");
+   }
    
+   
+   
 }

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -69,7 +69,7 @@
       
       PagingStore store = EasyMock.createNiceMock(PagingStore.class);
       
-      EasyMock.expect(spi.newStore(destination.toString())).andReturn(store);
+      EasyMock.expect(spi.newStore(destination)).andReturn(store);
       
       store.start();
       
@@ -113,9 +113,9 @@
       
       EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
       
-      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination.toString(), 1);
+      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, 1);
       
-      EasyMock.expect(spi.newStore(destination.toString())).andStubReturn(storeImpl);
+      EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
       
       EasyMock.replay(spi, factory);
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -46,6 +46,8 @@
    
    // Constants -----------------------------------------------------
    
+   private final static SimpleString destinationTestName = new SimpleString("test");
+   
    // Attributes ----------------------------------------------------
    
    // Static --------------------------------------------------------
@@ -58,7 +60,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
       
       storeImpl.start();
       
@@ -74,7 +76,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
       
       storeImpl.start();
       
@@ -101,7 +103,7 @@
       
       storeImpl.sync();
       
-      storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+      storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
       
       storeImpl.start();
       
@@ -113,7 +115,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
       
       storeImpl.start();
       
@@ -165,7 +167,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
       
       storeImpl.start();
       

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -80,7 +80,7 @@
       
       final ArrayList<Page> readPages = new ArrayList<Page>();
       
-      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", MAX_SIZE);
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
       
       storeImpl.start();
       
@@ -226,7 +226,7 @@
          fileTmp.close();         
       }
       
-      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, "test", MAX_SIZE);
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
       storeImpl2.start();
       
       int numberOfPages = storeImpl2.getNumberOfPages();

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -145,6 +145,7 @@
       serverMessage.setExpiration(0);
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
+      EasyMock.expect(serverMessage.decrementRefCount()).andReturn(1);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
       EasyMock.replay(sm, po, repos, serverMessage, queue);
@@ -155,37 +156,61 @@
    public void testCancelToDLQDoesntExist() throws Exception
    {
       QueueSettings queueSettings = new QueueSettings();
+      
       queueSettings.setMaxDeliveryAttempts(1);
+      
       SimpleString dlqName = new SimpleString("testDLQ");
+      
       queueSettings.setDLQ(dlqName);
+      
       Binding dlqBinding = EasyMock.createStrictMock(Binding.class);
+      
       StorageManager sm = EasyMock.createNiceMock(StorageManager.class);
+      
       PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+      
       HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
+      
       ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+      
       Queue queue = EasyMock.createStrictMock(Queue.class);
+
       MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
+      
       messageReference.setDeliveryCount(1);
+      
       SimpleString queueName = new SimpleString("queueName");
+      
       queue.referenceCancelled();
+      
       EasyMock.expect(queue.getName()).andStubReturn(queueName);
       EasyMock.expect(repos.getMatch(queueName.toString())).andStubReturn(queueSettings);
+//      EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(1);
       EasyMock.expect(serverMessage.isDurable()).andStubReturn(true);
       EasyMock.expect(serverMessage.getMessageID()).andStubReturn(999l);
+      EasyMock.expect(serverMessage.decrementRefCount()).andStubReturn(1);
       EasyMock.expect(queue.isDurable()).andStubReturn(true);
+      
       sm.updateDeliveryCount(messageReference);
+      
       EasyMock.expect(po.getBinding(dlqName)).andReturn(null);
       EasyMock.expect(po.addBinding(dlqName, dlqName, null, true, false)).andReturn(dlqBinding);
       EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
       EasyMock.expect(sm.generateMessageID()).andReturn(2l);
+      
       serverMessage.setMessageID(2);
+      
       serverMessage.setExpiration(0);
+      
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
+      
       EasyMock.replay(sm, po, repos, serverMessage, queue);
+      
       assertFalse(messageReference.cancel(sm, po, repos));
+      
       EasyMock.verify(sm, po, repos, serverMessage, queue);
    }
 
@@ -208,6 +233,7 @@
       EasyMock.expect(repos.getMatch(queueName.toString())).andStubReturn(queueSettings);
       EasyMock.expect(serverMessage.isDurable()).andStubReturn(true);
       EasyMock.expect(serverMessage.getMessageID()).andStubReturn(999l);
+      EasyMock.expect(serverMessage.decrementRefCount()).andReturn(1);
       EasyMock.expect(queue.isDurable()).andStubReturn(true);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
       EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
@@ -250,6 +276,9 @@
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+      EasyMock.expect(serverMessage.decrementRefCount()).andStubReturn(1);
+
+      
       EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
       messageReference.expire(sm, po, repos);
       EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
@@ -289,6 +318,8 @@
       EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
       EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
       EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+      EasyMock.expect(serverMessage.decrementRefCount()).andStubReturn(1);
+      
       EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
       messageReference.expire(sm, po, repos);
       EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -34,6 +34,7 @@
 import org.easymock.EasyMock;
 import org.jboss.messaging.core.filter.Filter;
 import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.Consumer;
 import org.jboss.messaging.core.server.DistributionPolicy;
 import org.jboss.messaging.core.server.HandleStatus;
@@ -738,6 +739,9 @@
 
    public void testDeleteAllReferences() throws Exception
    {
+      
+      PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
+      
       Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
 
       StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);

Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java	2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java	2008-08-18 23:26:08 UTC (rev 4833)
@@ -195,6 +195,7 @@
    {
       MessageReference messageReference = createStrictMock(MessageReference.class);
       ServerMessage message = createStrictMock(ServerMessage.class);
+      expect(message.decrementRefCount()).andStubReturn(1);
       expect(messageReference.getMessage()).andStubReturn(message);
       ServerConsumerImpl consumer = create(1, 999l, false, true, true);
       expect(messageReference.getQueue()).andStubReturn(queue);




More information about the jboss-cvs-commits mailing list