[jboss-cvs] JBoss Messaging SVN: r4922 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Sep 8 19:43:58 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-09-08 19:43:58 -0400 (Mon, 08 Sep 2008)
New Revision: 4922

Modified:
   trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1314 - refactoring... Moving methods around.
(Tests on depaging on the delivery thread)

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -66,6 +66,8 @@
    
    void sync() throws Exception;
    
+   public boolean readPage() throws Exception;
+   
    boolean page(PageMessage message) throws Exception;
    
    /** 
@@ -79,11 +81,10 @@
    
    /**
     * 
-    * @param postOffice
     * @return false if a thread was already started, or if not in page mode
     * @throws Exception 
     */
-   boolean startDepaging(PagingManager listener) throws Exception;
+   boolean startDepaging() throws Exception;
 
    LastPageRecord getLastRecord();
 

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -38,4 +38,6 @@
 
    PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
    
+   void setPagingManager(PagingManager manager);
+   
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -29,6 +29,7 @@
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -52,6 +53,8 @@
    
    private final Executor executor;
    
+   private PagingManager pagingManager;
+   
    // Static --------------------------------------------------------
    
    // Constructors --------------------------------------------------
@@ -76,8 +79,13 @@
       File destinationFile = new File(destinationDirectory);
       destinationFile.mkdirs();
       
-      return new PagingStoreImpl(newFileFactory(destinationDirectory), destinationName, settings, executor);
+      return new PagingStoreImpl(pagingManager, newFileFactory(destinationDirectory), destinationName, settings, executor);
    }
+   
+   public void setPagingManager(PagingManager manager)
+   {
+      this.pagingManager = manager;
+   }
 
    // Package protected ---------------------------------------------
    

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -393,7 +393,7 @@
          {
             if ( maxSize > 0 && addressSize < (maxSize - pageSize))
             {
-               if (store.startDepaging(this))
+               if (store.startDepaging())
                {
                   log.info("Starting depaging Thread, size = " + addressSize);
                }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -71,6 +71,8 @@
    
    private boolean droppedMessages;
    
+   private final PagingManager pagingManager;
+   
    private final Executor executor;
    
    // Bytes consumed by the queue on the memory
@@ -96,7 +98,7 @@
    // Constructors --------------------------------------------------
    
    
-   public PagingStoreImpl(final SequentialFileFactory fileFactory, final SimpleString storeName, QueueSettings queueSettings, Executor executor) 
+   public PagingStoreImpl(final PagingManager pagingManager, final SequentialFileFactory fileFactory, final SimpleString storeName, QueueSettings queueSettings, Executor executor) 
    {
       this.fileFactory = fileFactory;
       this.storeName = storeName;
@@ -104,6 +106,7 @@
       this.pageSize = queueSettings.getPageSizeBytes();
       this.dropMessagesOnSize = queueSettings.isDropMessagesWhenFull();
       this.executor = executor;
+      this.pagingManager = pagingManager;
    }
    
    
@@ -173,6 +176,35 @@
       return storeName;
    }
    
+   
+
+   /**
+    * Depage one page-file, read it and send it to the pagingManager
+    * @return
+    * @throws Exception
+    */
+   public boolean readPage() throws Exception
+   {
+      Page page = depage();
+      if (page == null)
+      {
+         if (lastPageRecord != null)
+         {
+            pagingManager.clearLastPageRecord(lastPageRecord);
+         }
+         lastPageRecord = null;
+         return false;
+      }
+      page.open();
+      PageMessage messages[] = page.read();
+      boolean needMorePages = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
+      page.delete();
+      
+      return needMorePages;
+
+   }
+   
+   
    /** 
     *  It returns a Page out of the Page System without reading it. 
     *  The method calling this method will remove the page and will start reading it outside of any locks. 
@@ -331,7 +363,7 @@
       }
    }
    
-   public boolean startDepaging(final PagingManager manager) throws Exception
+   public boolean startDepaging() throws Exception
    {
       lock.readLock().lock();
       try
@@ -346,7 +378,7 @@
             {
                if (this.dequeueThread == null)
                {
-                  this.dequeueThread = new DepageRunnable(manager);
+                  this.dequeueThread = new DepageRunnable();
                   executor.execute(this.dequeueThread);
                   return true;
                }
@@ -607,11 +639,8 @@
    
    class DepageRunnable implements Runnable
    {
-      final PagingManager manager;
-      
-      public DepageRunnable(final PagingManager manager)
+      public DepageRunnable()
       {
-         this.manager = manager;
       }
       
       
@@ -622,20 +651,7 @@
             boolean needMorePages = false;
             do
             {
-               Page page = depage();
-               if (page == null)
-               {
-                  if (lastPageRecord != null)
-                  {
-                     manager.clearLastPageRecord(lastPageRecord);
-                  }
-                  lastPageRecord = null;
-                  break;
-               }
-               page.open();
-               PageMessage messages[] = page.read();
-               needMorePages = manager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
-               page.delete();
+               needMorePages = readPage();
             }
             while (needMorePages);
          }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -437,7 +437,7 @@
       for (SimpleString destination: dests)
       {
          PagingStore store = pagingManager.getPageStore(destination);
-         store.startDepaging(pagingManager);
+         store.startDepaging();
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -189,6 +189,8 @@
       PagingStoreFactory storeFactory = new PagingManagerFactoryNIO(configuration.getPagingDirectory());
 
       pagingManager = new PagingManagerImpl(storeFactory, storageManager, queueSettingsRepository);
+      
+      storeFactory.setPagingManager(pagingManager);
 
       resourceManager = new ResourceManagerImpl(0);
       postOffice =

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -129,7 +129,7 @@
       
       EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
       
-      PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, new QueueSettings(), Executors.newSingleThreadExecutor());
+      PagingStoreImpl storeImpl = new PagingStoreImpl(manager, factory, destination, new QueueSettings(), Executors.newSingleThreadExecutor());
       
       EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andStubReturn(storeImpl);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -62,7 +62,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+      PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
       
       storeImpl.start();
       
@@ -78,7 +78,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+      PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
       
       storeImpl.start();
       
@@ -105,7 +105,7 @@
       
       storeImpl.sync();
       
-      storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+      storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
       
       storeImpl.start();
       
@@ -117,7 +117,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+      PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
       
       storeImpl.start();
       
@@ -173,7 +173,7 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
       
-      TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+      TestSupportPageStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
       
       storeImpl.start();
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2008-09-08 23:43:58 UTC (rev 4922)
@@ -101,7 +101,7 @@
       QueueSettings settings = new QueueSettings();
       settings.setPageSizeBytes(MAX_SIZE);
       
-      final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), settings, executor);
+      final TestSupportPageStore storeImpl = new PagingStoreImpl(null, factory, new SimpleString("test"), settings, executor);
       
       storeImpl.start();
       
@@ -251,7 +251,7 @@
          fileTmp.close();         
       }
       
-      TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), settings, executor);
+      TestSupportPageStore storeImpl2 = new PagingStoreImpl(null, factory, new SimpleString("test"), settings, executor);
       storeImpl2.start();
       
       int numberOfPages = storeImpl2.getNumberOfPages();




More information about the jboss-cvs-commits mailing list