[jboss-cvs] JBoss Messaging SVN: r5579 - in trunk: src/main/org/jboss/messaging/core/paging and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jan 5 23:57:50 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-05 23:57:50 -0500 (Mon, 05 Jan 2009)
New Revision: 5579

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
   trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Using storageManager to store page-dir and https://jira.jboss.org/jira/browse/JBMESSAGING-1477

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -26,10 +26,12 @@
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingStore;
 
 /**
  * 
@@ -42,28 +44,26 @@
 public abstract class AbstractSequentialFactory implements SequentialFileFactory
 {
    private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
-   
+
    protected final String journalDir;
 
    public AbstractSequentialFactory(final String journalDir)
    {
       this.journalDir = journalDir;
    }
-   
+
    /** 
     * Create the directory if it doesn't exist yet
     */
    public void createDirs() throws Exception
-   {      
+   {
       File file = new File(journalDir);
       boolean ok = file.mkdirs();
-//FIXME - uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1477 is complete      
-//      if (!ok)
-//      {
-//         throw new IOException("Failed to create directory " + journalDir);
-//      }
+      if (!ok)
+      {
+         throw new IOException("Failed to create directory " + journalDir);
+      }
    }
-   
 
    public List<String> listFiles(final String extension) throws Exception
    {

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -23,6 +23,7 @@
 package org.jboss.messaging.core.paging;
 
 import java.util.Collection;
+import java.util.Map;
 
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -171,6 +172,6 @@
     * Reload previously created PagingStores into memory
     * @throws Exception 
     */
-   void reloadStores() throws Exception;
+   void reloadStores(Map<SimpleString, Long> pageDestinations) throws Exception;
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -23,10 +23,13 @@
 package org.jboss.messaging.core.paging;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 
+import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.util.SimpleString;
 
@@ -38,7 +41,7 @@
  */
 public interface PagingStoreFactory
 {
-   PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings, boolean createDir);
+   PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings) throws Exception;
 
    Executor getGlobalDepagerExecutor();
 
@@ -50,5 +53,17 @@
 
    void setPostOffice(PostOffice office);
 
-   List<SimpleString> getStoredDestinations() throws Exception;
+   List<PagingStore> reloadStores(Map<SimpleString, Long> pageDestinations,
+                                  HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+
+   /**
+    * @param storeName
+    * @return
+    */
+   SequentialFileFactory newFileFactory(SimpleString destinationName) throws Exception;
+
+   /**
+    * @param storageManager
+    */
+   void deleteFileFactory(SimpleString storageName) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -24,6 +24,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,15 +84,6 @@
 
    private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
 
-   // This is just a debug tool method.
-   // During debugs you could make log.trace as log.info, and change the
-   // variable isTrace above
-   private static void trace(final String message)
-   {
-      // log.trace(message);
-      log.info(message);
-   }
-
    // Constructors
    // --------------------------------------------------------------------------------------------------------------------
 
@@ -129,14 +121,16 @@
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.paging.PagingManager#reloadStores()
     */
-   public void reloadStores() throws Exception
+   public void reloadStores(Map<SimpleString, Long> pageDestinations) throws Exception
    {
-      List<SimpleString> destinations = pagingStoreFactory.getStoredDestinations();
+      List<PagingStore> destinations = pagingStoreFactory.reloadStores(pageDestinations, queueSettingsRepository);
 
-      for (SimpleString dest : destinations)
+      for (PagingStore store: destinations)
       {
-         createPageStore(dest, false);
+         stores.put(store.getStoreName(), store);
+         store.start();
       }
+
    }
 
    /**
@@ -149,7 +143,7 @@
 
       if (store == null)
       {
-         store = newStore(storeName, createDir);
+         store = newStore(storeName);
 
          PagingStore oldStore = stores.putIfAbsent(storeName, store);
 
@@ -334,9 +328,9 @@
 
    // Private -------------------------------------------------------
 
-   private PagingStore newStore(final SimpleString destinationName, final boolean createDir)
+   private PagingStore newStore(final SimpleString destinationName) throws Exception
    {
-      return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()), createDir);
+      return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
    }
 
    // Inner classes -------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -23,15 +23,20 @@
 package org.jboss.messaging.core.paging.impl;
 
 import java.io.File;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
@@ -40,9 +45,8 @@
 import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
 import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.util.Base64;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.SimpleString;
@@ -60,18 +64,22 @@
 
    // Attributes ----------------------------------------------------
 
+   private final DecimalFormat format = new DecimalFormat("000000000");
+
    private final String directory;
+   
+   private final HashMap<SimpleString, Long> pageDirectories = new HashMap<SimpleString, Long>();
 
    private final ExecutorService parentExecutor;
-   
+
    private final OrderedExecutorFactory executorFactory;
-   
+
    private final Executor globalDepagerExecutor;
 
    private PagingManager pagingManager;
-   
+
    private StorageManager storageManager;
-   
+
    private PostOffice postOffice;
 
    // Static --------------------------------------------------------
@@ -82,13 +90,15 @@
    {
       this.directory = directory;
 
-      parentExecutor = new ThreadPoolExecutor(0, maxThreads,
-                             60L, TimeUnit.SECONDS,
-                             new SynchronousQueue<Runnable>(),
-                             new JBMThreadFactory("JBM-depaging-threads"));
-      
+      parentExecutor = new ThreadPoolExecutor(0,
+                                              maxThreads,
+                                              60L,
+                                              TimeUnit.SECONDS,
+                                              new SynchronousQueue<Runnable>(),
+                                              new JBMThreadFactory("JBM-depaging-threads"));
+
       executorFactory = new OrderedExecutorFactory(parentExecutor);
-      
+
       globalDepagerExecutor = executorFactory.getExecutor();
    }
 
@@ -106,66 +116,112 @@
       parentExecutor.awaitTermination(30, TimeUnit.SECONDS);
    }
 
-   public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings, final boolean createDir)
-   {      
-      final String destinationDirectory = directory + "/" + Base64.encodeBytes(destinationName.getData(), Base64.URL_SAFE);
-      
+   public synchronized PagingStore newStore(final SimpleString destinationName, final QueueSettings settings) throws Exception
+   {
+
       return new PagingStoreImpl(pagingManager,
                                  storageManager,
                                  postOffice,
-                                 newFileFactory(destinationDirectory),
+                                 null,
+                                 this,
                                  destinationName,
                                  settings,
-                                 executorFactory.getExecutor(),
-                                 createDir);
+                                 executorFactory.getExecutor());
    }
+   
+   /**
+    * @param storeName
+    * @return
+    */
+   public synchronized SequentialFileFactory newFileFactory(SimpleString destinationName) throws Exception
+   {
+      long id = storageManager.addPageDirDestination(destinationName);
+      
+      pageDirectories.put(destinationName, id);
+      
+      SequentialFileFactory factory = newFileFactory(id);
+      
+      factory.createDirs();
+      
+      return factory;
+   }
 
+   /**
+    * @param storageManager
+    */
+   public synchronized void deleteFileFactory(final SimpleString storageName) throws Exception
+   {
+      Long id = pageDirectories.get(storageName);
+      
+      if (id == null)
+      {
+         throw new IllegalStateException("Storage " + storageName + " didn't have a record on Bindings Journal");
+      }
+      
+      
+      File destinationDirectory = new File(directory + File.separatorChar + format.format(id));
+      
+      destinationDirectory.delete();
+      
+      
+      storageManager.deletePageDirDestination(id);
+   }
+
    public void setPagingManager(final PagingManager pagingManager)
    {
       this.pagingManager = pagingManager;
    }
-   
+
    public void setStorageManager(final StorageManager storageManager)
    {
-      this.storageManager = storageManager; 
+      this.storageManager = storageManager;
    }
-   
+
    public void setPostOffice(final PostOffice postOffice)
    {
       this.postOffice = postOffice;
    }
-   
-   public List<SimpleString> getStoredDestinations() throws Exception
+
+   public List<PagingStore> reloadStores(final Map<SimpleString, Long> pageDestinations,
+                                         final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
    {
       File pageDirectory = new File(directory);
-      
+
       File[] files = pageDirectory.listFiles();
-      
+
       if (files == null)
       {
-         return Collections.<SimpleString>emptyList();
+         return Collections.<PagingStore> emptyList();
 
       }
       else
-      {         
-         ArrayList<SimpleString> filesReturn = new ArrayList<SimpleString>(files.length);
-         
-         for (File file: files)
+      {
+         ArrayList<PagingStore> storesReturn = new ArrayList<PagingStore>(files.length);
+
+         for (Map.Entry<SimpleString, Long> entryPage : pageDestinations.entrySet())
          {
-            if (file.isDirectory())
-            {
-               try
-               {
-                  filesReturn.add(new SimpleString(Base64.decode(file.getName(), Base64.URL_SAFE)));
-               }
-               catch (Exception e)
-               {
-                  log.warn("Invalid encoding on directory " + file.getCanonicalPath(), e);
-               }
-            }
+            SimpleString destinationName = entryPage.getKey();
+            long id = entryPage.getValue();
+            
+            pageDirectories.put(destinationName, id);
+            
+            SequentialFileFactory factory = newFileFactory(id);
+
+            QueueSettings settings = queueSettingsRepository.getMatch(destinationName.toString());
+
+            PagingStore store = new PagingStoreImpl(pagingManager,
+                                                    storageManager,
+                                                    postOffice,
+                                                    factory,
+                                                    this,
+                                                    destinationName,
+                                                    settings,
+                                                    executorFactory.getExecutor());
+
+            storesReturn.add(store);
          }
-         
-         return filesReturn;
+
+         return storesReturn;
       }
    }
 
@@ -180,5 +236,16 @@
 
    // Private -------------------------------------------------------
 
+   private synchronized SequentialFileFactory newFileFactory(long id) throws Exception
+   {
+
+      String destinationDirectory = format.format(id);
+
+      SequentialFileFactory seqFactory = new NIOSequentialFileFactory(directory + File.separatorChar +
+                                                                      destinationDirectory);
+
+      return seqFactory;
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -43,6 +43,7 @@
 import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.paging.PagingManager;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
 import org.jboss.messaging.core.server.ServerMessage;
@@ -76,8 +77,11 @@
 
    private final SimpleString storeName;
 
-   private final SequentialFileFactory fileFactory;
+   // The FileFactory is created lazily as soon as the first write is attempted
+   private volatile SequentialFileFactory fileFactory;
 
+   private final PagingStoreFactory storeFactory;
+
    private final long maxSize;
 
    private final long pageSize;
@@ -111,8 +115,6 @@
    private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
 
    private volatile boolean running = false;
-   
-   private final boolean createDir;
 
    // Static --------------------------------------------------------
 
@@ -135,10 +137,10 @@
                           final StorageManager storageManager,
                           final PostOffice postOffice,
                           final SequentialFileFactory fileFactory,
+                          final PagingStoreFactory storeFactory,
                           final SimpleString storeName,
                           final QueueSettings queueSettings,
-                          final Executor executor,
-                          final boolean createDir)
+                          final Executor executor)
    {
       if (pagingManager == null)
       {
@@ -149,8 +151,6 @@
 
       this.postOffice = postOffice;
 
-      this.fileFactory = fileFactory;
-
       this.storeName = storeName;
 
       maxSize = queueSettings.getMaxSizeBytes();
@@ -169,8 +169,10 @@
       this.executor = executor;
 
       this.pagingManager = pagingManager;
-      
-      this.createDir = createDir;
+
+      this.fileFactory = fileFactory;
+
+      this.storeFactory = storeFactory;
    }
 
    // Public --------------------------------------------------------
@@ -319,7 +321,7 @@
       }
    }
 
-   //TODO all of this can be simplified
+   // TODO all of this can be simplified
    public boolean page(final PagedMessage message, final boolean sync, final boolean duplicateDetection) throws Exception
    {
       if (!running)
@@ -357,25 +359,26 @@
          {
             return false;
          }
-         
+
          if (duplicateDetection)
          {
-            //We set the duplicate detection header to prevent the message being depaged more than once in case of failure during depage
-            
+            // We set the duplicate detection header to prevent the message being depaged more than once in case of
+            // failure during depage
+
             byte[] bytes = new byte[8];
-            
+
             ByteBuffer buff = ByteBuffer.wrap(bytes);
-            
+
             ServerMessage msg = message.getMessage(storageManager);
-            
+
             buff.putLong(msg.getMessageID());
-            
+
             SimpleString duplID = new SimpleString(bytes);
-               
+
             message.getMessage(storageManager).putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
          }
 
-         int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+         int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
 
          if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
          {
@@ -399,10 +402,10 @@
          try
          {
             if (currentPage != null)
-            {               
-               
+            {
+
                currentPage.write(message);
-               
+
                if (sync)
                {
                   currentPage.sync();
@@ -450,7 +453,7 @@
    }
 
    public boolean startDepaging(final Executor executor)
-   {     
+   {
       currentPageLock.readLock().lock();
       try
       {
@@ -517,7 +520,7 @@
    }
 
    public void start() throws Exception
-   {      
+   {
       writeLock.lock();
 
       try
@@ -536,48 +539,49 @@
          {
             currentPageLock.writeLock().lock();
 
-            if (createDir)
-            {               
-               fileFactory.createDirs();
-            }
-
-            firstPageId = Integer.MAX_VALUE;
-            currentPageId = 0;
-            currentPage = null;
-
             try
             {
-               List<String> files = fileFactory.listFiles("page");
+               running = true;
+               firstPageId = Integer.MAX_VALUE;
 
-               numberOfPages = files.size();
-
-               for (String fileName : files)
+               // There are no files yet on this Storage. We will just return it empty
+               if (fileFactory != null)
                {
-                  final int fileId = getPageIdFromFileName(fileName);
 
-                  if (fileId > currentPageId)
+                  currentPageId = 0;
+                  currentPage = null;
+
+                  List<String> files = fileFactory.listFiles("page");
+
+                  numberOfPages = files.size();
+
+                  for (String fileName : files)
                   {
-                     currentPageId = fileId;
+                     final int fileId = getPageIdFromFileName(fileName);
+
+                     if (fileId > currentPageId)
+                     {
+                        currentPageId = fileId;
+                     }
+
+                     if (fileId < firstPageId)
+                     {
+                        firstPageId = fileId;
+                     }
                   }
 
-                  if (fileId < firstPageId)
+                  if (numberOfPages != 0)
                   {
-                     firstPageId = fileId;
+                     startPaging();
                   }
                }
-
-               running = true;
-
-               if (numberOfPages != 0)
-               {
-                  startPaging();
-               }
             }
             finally
             {
                currentPageLock.writeLock().unlock();
             }
          }
+
       }
       finally
       {
@@ -651,12 +655,14 @@
 
       try
       {
+
          if (numberOfPages == 0)
          {
             return null;
          }
          else
          {
+
             numberOfPages--;
 
             final Page returnPage;
@@ -685,6 +691,8 @@
                   returnPage.open();
                   returnPage.delete();
 
+                  cleanupDirectory();
+
                   // This will trigger this Destination to exit the page mode,
                   // and this will make JBM start using the journal again
                   return null;
@@ -701,7 +709,6 @@
             {
                returnPage = createPage(firstPageId++);
             }
-
             return returnPage;
          }
       }
@@ -738,24 +745,23 @@
          return;
       }
 
-
       // Depage has to be done atomically, in case of failure it should be
       // back to where it was
-      
+
       Transaction depageTransaction = new TransactionImpl(storageManager);
-      
+
       depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-      
+
       depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
 
       HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-            
+
       for (PagedMessage pagedMessage : pagedMessages)
       {
          ServerMessage message = null;
 
          message = pagedMessage.getMessage(storageManager);
-         
+
          final long transactionIdDuringPaging = pagedMessage.getTransactionID();
 
          if (transactionIdDuringPaging >= 0)
@@ -767,7 +773,9 @@
             // section
             if (pageTransactionInfo == null)
             {
-               log.warn("Transaction " + pagedMessage.getTransactionID() + " used during paging not found, ignoring message " + message);
+               log.warn("Transaction " + pagedMessage.getTransactionID() +
+                        " used during paging not found, ignoring message " +
+                        message);
                continue;
             }
 
@@ -789,7 +797,7 @@
                pageTransactionsToUpdate.add(pageTransactionInfo);
             }
          }
-         
+
          postOffice.route(message, depageTransaction);
       }
 
@@ -809,7 +817,7 @@
       }
 
       depageTransaction.commit();
-      
+
       trace("Depage committed");
    }
 
@@ -844,7 +852,7 @@
    {
       final boolean pageFull = isFull(getPageSizeBytes());
       final boolean globalFull = isGlobalFull(getPageSizeBytes());
-      if (pageFull || globalFull)
+      if (pageFull || globalFull || !isPaging())
       {
          depaging.set(false);
          if (!globalFull)
@@ -895,6 +903,11 @@
    {
       String fileName = createFileName(page);
 
+      if (fileFactory == null)
+      {
+         fileFactory = storeFactory.newFileFactory(this.getStoreName());
+      }
+
       SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
 
       file.open();
@@ -938,9 +951,9 @@
    private void readPage() throws Exception
    {
       Page page = depage();
-      
+
       if (page == null)
-      {        
+      {
          return;
       }
 
@@ -951,8 +964,22 @@
       onDepage(page.getPageId(), storeName, messages);
 
       page.delete();
+
    }
 
+   /**
+    * @throws Exception
+    */
+   private void cleanupDirectory() throws Exception
+   {
+      if (fileFactory != null && numberOfPages == 0)
+      {
+         // No more files.. delete page directory
+         storeFactory.deleteFileFactory(storeName);
+         fileFactory = null;
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
    private class DepageRunnable implements Runnable

Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -27,6 +27,7 @@
 
 import javax.transaction.xa.Xid;
 
+import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.PageTransactionInfo;
 import org.jboss.messaging.core.postoffice.Binding;
 import org.jboss.messaging.core.postoffice.PostOffice;
@@ -115,5 +116,12 @@
 
    boolean deleteDestination(SimpleString destination) throws Exception;
 
-   void loadBindings(BindableFactory queueFactory, List<Binding> bindings, List<SimpleString> destinations) throws Exception;
+   long addPageDirDestination(SimpleString pageAddress) throws Exception;
+
+   void deletePageDirDestination(long pageAddressID) throws Exception;
+
+   void loadBindings(BindableFactory queueFactory,
+                     List<Binding> bindings,
+                     List<SimpleString> destinations,
+                     Map<SimpleString, Long> pageDestinationDirectory) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -104,6 +104,8 @@
    public static final byte BINDING_RECORD = 21;
 
    public static final byte DESTINATION_RECORD = 22;
+   
+   public static final byte PAGE_DESTINATION_DIR_RECORD = 23;
 
    // type + expiration + timestamp + priority
    public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
@@ -630,6 +632,24 @@
       bindingsJournal.appendDeleteRecord(id);
    }
 
+
+   
+   public long addPageDirDestination(final SimpleString pageAddress) throws Exception
+   {
+      long destinationID = idGenerator.generateID();
+
+      DestinationEncoding destinationEnc = new DestinationEncoding(pageAddress);
+
+      bindingsJournal.appendAddRecord(destinationID, PAGE_DESTINATION_DIR_RECORD, destinationEnc);
+
+      return destinationID;
+   }
+
+   public void deletePageDirDestination(final long pageAddressID) throws Exception
+   {
+      bindingsJournal.appendDeleteRecord(pageAddressID);
+   }
+   
    public boolean addDestination(final SimpleString destination) throws Exception
    {
       long destinationID = idGenerator.generateID();
@@ -667,7 +687,8 @@
 
    public void loadBindings(final BindableFactory bindableFactory,
                             final List<Binding> bindings,
-                            final List<SimpleString> destinations) throws Exception
+                            final List<SimpleString> destinations,
+                            final Map<SimpleString, Long> pageDestinationDirectory) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -731,6 +752,14 @@
 
             destinations.add(destinationEncoding.destination);
          }
+         else if (rec == PAGE_DESTINATION_DIR_RECORD)
+         {
+            DestinationEncoding destinationEncoding = new DestinationEncoding();
+
+            destinationEncoding.decode(buffer);
+            
+            pageDestinationDirectory.put(destinationEncoding.destination, id);
+         }
          else
          {
             throw new IllegalStateException("Invalid record type " + rec);

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -82,7 +82,8 @@
 
    public void loadBindings(final BindableFactory queueFactory,
                             final List<Binding> bindings,
-                            final List<SimpleString> destinations) throws Exception
+                            final List<SimpleString> destinations,
+                            Map<SimpleString, Long> pageDestinationDirectory) throws Exception
    {
    }
 
@@ -223,4 +224,13 @@
    {
    }
 
+   public long addPageDirDestination(SimpleString pageAddress) throws Exception
+   {
+      return idGenerator.generateID();
+   }
+
+   public void deletePageDirDestination(long pageAddressID) throws Exception
+   {
+   }
+
 }

Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -526,8 +526,10 @@
       List<Binding> bindings = new ArrayList<Binding>();
 
       List<SimpleString> dests = new ArrayList<SimpleString>();
+      
+      Map<SimpleString, Long> pageDestinations = new HashMap<SimpleString, Long>();
 
-      storageManager.loadBindings(bindableFactory, bindings, dests);
+      storageManager.loadBindings(bindableFactory, bindings, dests, pageDestinations);
 
       // Destinations must be added first to ensure flow controllers exist
       // before queues are created
@@ -548,7 +550,7 @@
          }
       }
 
-      pagingManager.reloadStores();
+      pagingManager.reloadStores(pageDestinations);
       
       Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -32,6 +32,7 @@
 import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.persistence.impl.nullpm.NullStorageManager;
 import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
 import org.jboss.messaging.core.server.ServerMessage;
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -66,7 +67,7 @@
       queueSettings.setDefault(new QueueSettings());
 
       PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), 10),
-                                                            null,
+                                                            new NullStorageManager(),
                                                             queueSettings,
                                                             -1,
                                                             1024 * 1024,

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -26,10 +26,12 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.easymock.classextension.EasyMock;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
@@ -65,9 +67,10 @@
                                                   createStorageManagerMock(),
                                                   createPostOfficeMock(),
                                                   factory,
+                                                  null,
                                                   destinationTestName,
                                                   new QueueSettings(),
-                                                  executor, true);
+                                                  executor);
 
       storeImpl.start();
 
@@ -83,14 +86,19 @@
    public void testStore() throws Exception
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
+      
+      PagingStoreFactory storeFactory = EasyMock.createNiceMock(PagingStoreFactory.class);
+      
+      EasyMock.replay(storeFactory);
 
       PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
                                                   createStorageManagerMock(),
                                                   createPostOfficeMock(),
                                                   factory,
+                                                  storeFactory,
                                                   destinationTestName,
                                                   new QueueSettings(),
-                                                  executor, true);
+                                                  executor);
 
       storeImpl.start();
 
@@ -121,9 +129,10 @@
                                       createStorageManagerMock(),
                                       createPostOfficeMock(),
                                       factory,
+                                      null,
                                       destinationTestName,
                                       new QueueSettings(),
-                                      executor, true);
+                                      executor);
 
       storeImpl.start();
 
@@ -135,13 +144,22 @@
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
 
+      SimpleString destination = new SimpleString("test");
+      
+      PagingStoreFactory storeFactory = EasyMock.createMock(PagingStoreFactory.class);
+      
+      storeFactory.deleteFileFactory(destination);
+      
+      EasyMock.replay(storeFactory);
+
       TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
                                                   createStorageManagerMock(),
                                                   createPostOfficeMock(),
                                                   factory,
+                                                  storeFactory,
                                                   destinationTestName,
                                                   new QueueSettings(),
-                                                  executor, true);
+                                                  executor);
 
       storeImpl.start();
 
@@ -151,8 +169,6 @@
 
       List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
 
-      SimpleString destination = new SimpleString("test");
-
       for (int i = 0; i < 10; i++)
       {
 
@@ -189,20 +205,32 @@
          assertEquals(0, (msg.get(i).getMessage(null)).getMessageID());
          assertEqualsByteArrays(buffers.get(i).array(), (msg.get(i).getMessage(null)).getBody().array());
       }
+      
+      EasyMock.verify(storeFactory);
 
    }
 
    public void testDepageMultiplePages() throws Exception
    {
       SequentialFileFactory factory = new FakeSequentialFileFactory();
+      SimpleString destination = new SimpleString("test");
+      
+      PagingStoreFactory storeFactory = EasyMock.createNiceMock(PagingStoreFactory.class);
+      
+      EasyMock.expect(storeFactory.newFileFactory(destination)).andReturn(factory);
+      
+      storeFactory.deleteFileFactory(destination);
+      
+      EasyMock.replay(storeFactory);
 
       TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
                                                            createStorageManagerMock(),
                                                            createPostOfficeMock(),
                                                            factory,
+                                                           storeFactory,
                                                            destinationTestName,
                                                            new QueueSettings(),
-                                                           executor, true);
+                                                           executor);
 
       storeImpl.start();
 
@@ -214,8 +242,6 @@
 
       List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
 
-      SimpleString destination = new SimpleString("test");
-
       for (int i = 0; i < 10; i++)
       {
 
@@ -307,6 +333,8 @@
       assertEquals(0, storeImpl.getNumberOfPages());
 
       page.open();
+      
+      EasyMock.verify(storeFactory);
 
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java	2009-01-06 04:57:50 UTC (rev 5579)
@@ -39,6 +39,7 @@
 import org.jboss.messaging.core.paging.Page;
 import org.jboss.messaging.core.paging.PagedMessage;
 import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
 import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
 import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
 import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
@@ -91,6 +92,10 @@
    protected void testConcurrentPaging(final SequentialFileFactory factory, final int numberOfThreads) throws Exception,
                                                                                                       InterruptedException
    {
+      
+      PagingStoreFactory storeFactory = EasyMock.createNiceMock(PagingStoreFactory.class);
+      
+      EasyMock.replay(storeFactory);
 
       final int MAX_SIZE = 1024 * 10;
 
@@ -111,9 +116,10 @@
                                                                  createStorageManagerMock(),
                                                                  createPostOfficeMock(),
                                                                  factory,
+                                                                 storeFactory,
                                                                  new SimpleString("test"),
                                                                  settings,
-                                                                 executor, true);
+                                                                 executor);
 
       storeImpl.start();
 
@@ -266,9 +272,10 @@
                                                             createStorageManagerMock(),
                                                             createPostOfficeMock(),
                                                             factory,
+                                                            storeFactory,
                                                             new SimpleString("test"),
                                                             settings,
-                                                            executor, true);
+                                                            executor);
       storeImpl2.start();
 
       int numberOfPages = storeImpl2.getNumberOfPages();




More information about the jboss-cvs-commits mailing list