[jboss-cvs] JBoss Messaging SVN: r5579 - in trunk: src/main/org/jboss/messaging/core/paging and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jan 5 23:57:50 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-05 23:57:50 -0500 (Mon, 05 Jan 2009)
New Revision: 5579
Modified:
trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Using storageManager to store page-dir and https://jira.jboss.org/jira/browse/JBMESSAGING-1477
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -26,10 +26,12 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingStore;
/**
*
@@ -42,28 +44,26 @@
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
-
+
protected final String journalDir;
public AbstractSequentialFactory(final String journalDir)
{
this.journalDir = journalDir;
}
-
+
/**
* Create the directory if it doesn't exist yet
*/
public void createDirs() throws Exception
- {
+ {
File file = new File(journalDir);
boolean ok = file.mkdirs();
-//FIXME - uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1477 is complete
-// if (!ok)
-// {
-// throw new IOException("Failed to create directory " + journalDir);
-// }
+ if (!ok)
+ {
+ throw new IOException("Failed to create directory " + journalDir);
+ }
}
-
public List<String> listFiles(final String extension) throws Exception
{
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.paging;
import java.util.Collection;
+import java.util.Map;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -171,6 +172,6 @@
* Reload previously created PagingStores into memory
* @throws Exception
*/
- void reloadStores() throws Exception;
+ void reloadStores(Map<SimpleString, Long> pageDestinations) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -23,10 +23,13 @@
package org.jboss.messaging.core.paging;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
@@ -38,7 +41,7 @@
*/
public interface PagingStoreFactory
{
- PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings, boolean createDir);
+ PagingStore newStore(SimpleString destinationName, QueueSettings queueSettings) throws Exception;
Executor getGlobalDepagerExecutor();
@@ -50,5 +53,17 @@
void setPostOffice(PostOffice office);
- List<SimpleString> getStoredDestinations() throws Exception;
+ List<PagingStore> reloadStores(Map<SimpleString, Long> pageDestinations,
+ HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception;
+
+ /**
+ * @param storeName
+ * @return
+ */
+ SequentialFileFactory newFileFactory(SimpleString destinationName) throws Exception;
+
+ /**
+ * @param storageManager
+ */
+ void deleteFileFactory(SimpleString storageName) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,15 +84,6 @@
private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
- // This is just a debug tool method.
- // During debugs you could make log.trace as log.info, and change the
- // variable isTrace above
- private static void trace(final String message)
- {
- // log.trace(message);
- log.info(message);
- }
-
// Constructors
// --------------------------------------------------------------------------------------------------------------------
@@ -129,14 +121,16 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.paging.PagingManager#reloadStores()
*/
- public void reloadStores() throws Exception
+ public void reloadStores(Map<SimpleString, Long> pageDestinations) throws Exception
{
- List<SimpleString> destinations = pagingStoreFactory.getStoredDestinations();
+ List<PagingStore> destinations = pagingStoreFactory.reloadStores(pageDestinations, queueSettingsRepository);
- for (SimpleString dest : destinations)
+ for (PagingStore store: destinations)
{
- createPageStore(dest, false);
+ stores.put(store.getStoreName(), store);
+ store.start();
}
+
}
/**
@@ -149,7 +143,7 @@
if (store == null)
{
- store = newStore(storeName, createDir);
+ store = newStore(storeName);
PagingStore oldStore = stores.putIfAbsent(storeName, store);
@@ -334,9 +328,9 @@
// Private -------------------------------------------------------
- private PagingStore newStore(final SimpleString destinationName, final boolean createDir)
+ private PagingStore newStore(final SimpleString destinationName) throws Exception
{
- return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()), createDir);
+ return pagingStoreFactory.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
}
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -23,15 +23,20 @@
package org.jboss.messaging.core.paging.impl;
import java.io.File;
+import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -40,9 +45,8 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.util.Base64;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.OrderedExecutorFactory;
import org.jboss.messaging.util.SimpleString;
@@ -60,18 +64,22 @@
// Attributes ----------------------------------------------------
+ private final DecimalFormat format = new DecimalFormat("000000000");
+
private final String directory;
+
+ private final HashMap<SimpleString, Long> pageDirectories = new HashMap<SimpleString, Long>();
private final ExecutorService parentExecutor;
-
+
private final OrderedExecutorFactory executorFactory;
-
+
private final Executor globalDepagerExecutor;
private PagingManager pagingManager;
-
+
private StorageManager storageManager;
-
+
private PostOffice postOffice;
// Static --------------------------------------------------------
@@ -82,13 +90,15 @@
{
this.directory = directory;
- parentExecutor = new ThreadPoolExecutor(0, maxThreads,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(),
- new JBMThreadFactory("JBM-depaging-threads"));
-
+ parentExecutor = new ThreadPoolExecutor(0,
+ maxThreads,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new JBMThreadFactory("JBM-depaging-threads"));
+
executorFactory = new OrderedExecutorFactory(parentExecutor);
-
+
globalDepagerExecutor = executorFactory.getExecutor();
}
@@ -106,66 +116,112 @@
parentExecutor.awaitTermination(30, TimeUnit.SECONDS);
}
- public PagingStore newStore(final SimpleString destinationName, final QueueSettings settings, final boolean createDir)
- {
- final String destinationDirectory = directory + "/" + Base64.encodeBytes(destinationName.getData(), Base64.URL_SAFE);
-
+ public synchronized PagingStore newStore(final SimpleString destinationName, final QueueSettings settings) throws Exception
+ {
+
return new PagingStoreImpl(pagingManager,
storageManager,
postOffice,
- newFileFactory(destinationDirectory),
+ null,
+ this,
destinationName,
settings,
- executorFactory.getExecutor(),
- createDir);
+ executorFactory.getExecutor());
}
+
+ /**
+ * @param storeName
+ * @return
+ */
+ public synchronized SequentialFileFactory newFileFactory(SimpleString destinationName) throws Exception
+ {
+ long id = storageManager.addPageDirDestination(destinationName);
+
+ pageDirectories.put(destinationName, id);
+
+ SequentialFileFactory factory = newFileFactory(id);
+
+ factory.createDirs();
+
+ return factory;
+ }
+ /**
+ * @param storageManager
+ */
+ public synchronized void deleteFileFactory(final SimpleString storageName) throws Exception
+ {
+ Long id = pageDirectories.get(storageName);
+
+ if (id == null)
+ {
+ throw new IllegalStateException("Storage " + storageName + " didn't have a record on Bindings Journal");
+ }
+
+
+ File destinationDirectory = new File(directory + File.separatorChar + format.format(id));
+
+ destinationDirectory.delete();
+
+
+ storageManager.deletePageDirDestination(id);
+ }
+
public void setPagingManager(final PagingManager pagingManager)
{
this.pagingManager = pagingManager;
}
-
+
public void setStorageManager(final StorageManager storageManager)
{
- this.storageManager = storageManager;
+ this.storageManager = storageManager;
}
-
+
public void setPostOffice(final PostOffice postOffice)
{
this.postOffice = postOffice;
}
-
- public List<SimpleString> getStoredDestinations() throws Exception
+
+ public List<PagingStore> reloadStores(final Map<SimpleString, Long> pageDestinations,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository) throws Exception
{
File pageDirectory = new File(directory);
-
+
File[] files = pageDirectory.listFiles();
-
+
if (files == null)
{
- return Collections.<SimpleString>emptyList();
+ return Collections.<PagingStore> emptyList();
}
else
- {
- ArrayList<SimpleString> filesReturn = new ArrayList<SimpleString>(files.length);
-
- for (File file: files)
+ {
+ ArrayList<PagingStore> storesReturn = new ArrayList<PagingStore>(files.length);
+
+ for (Map.Entry<SimpleString, Long> entryPage : pageDestinations.entrySet())
{
- if (file.isDirectory())
- {
- try
- {
- filesReturn.add(new SimpleString(Base64.decode(file.getName(), Base64.URL_SAFE)));
- }
- catch (Exception e)
- {
- log.warn("Invalid encoding on directory " + file.getCanonicalPath(), e);
- }
- }
+ SimpleString destinationName = entryPage.getKey();
+ long id = entryPage.getValue();
+
+ pageDirectories.put(destinationName, id);
+
+ SequentialFileFactory factory = newFileFactory(id);
+
+ QueueSettings settings = queueSettingsRepository.getMatch(destinationName.toString());
+
+ PagingStore store = new PagingStoreImpl(pagingManager,
+ storageManager,
+ postOffice,
+ factory,
+ this,
+ destinationName,
+ settings,
+ executorFactory.getExecutor());
+
+ storesReturn.add(store);
}
-
- return filesReturn;
+
+ return storesReturn;
}
}
@@ -180,5 +236,16 @@
// Private -------------------------------------------------------
+ private synchronized SequentialFileFactory newFileFactory(long id) throws Exception
+ {
+
+ String destinationDirectory = format.format(id);
+
+ SequentialFileFactory seqFactory = new NIOSequentialFileFactory(directory + File.separatorChar +
+ destinationDirectory);
+
+ return seqFactory;
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -43,6 +43,7 @@
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.ServerMessage;
@@ -76,8 +77,11 @@
private final SimpleString storeName;
- private final SequentialFileFactory fileFactory;
+ // The FileFactory is created lazily as soon as the first write is attempted
+ private volatile SequentialFileFactory fileFactory;
+ private final PagingStoreFactory storeFactory;
+
private final long maxSize;
private final long pageSize;
@@ -111,8 +115,6 @@
private final ReadWriteLock currentPageLock = new ReentrantReadWriteLock();
private volatile boolean running = false;
-
- private final boolean createDir;
// Static --------------------------------------------------------
@@ -135,10 +137,10 @@
final StorageManager storageManager,
final PostOffice postOffice,
final SequentialFileFactory fileFactory,
+ final PagingStoreFactory storeFactory,
final SimpleString storeName,
final QueueSettings queueSettings,
- final Executor executor,
- final boolean createDir)
+ final Executor executor)
{
if (pagingManager == null)
{
@@ -149,8 +151,6 @@
this.postOffice = postOffice;
- this.fileFactory = fileFactory;
-
this.storeName = storeName;
maxSize = queueSettings.getMaxSizeBytes();
@@ -169,8 +169,10 @@
this.executor = executor;
this.pagingManager = pagingManager;
-
- this.createDir = createDir;
+
+ this.fileFactory = fileFactory;
+
+ this.storeFactory = storeFactory;
}
// Public --------------------------------------------------------
@@ -319,7 +321,7 @@
}
}
- //TODO all of this can be simplified
+ // TODO all of this can be simplified
public boolean page(final PagedMessage message, final boolean sync, final boolean duplicateDetection) throws Exception
{
if (!running)
@@ -357,25 +359,26 @@
{
return false;
}
-
+
if (duplicateDetection)
{
- //We set the duplicate detection header to prevent the message being depaged more than once in case of failure during depage
-
+ // We set the duplicate detection header to prevent the message being depaged more than once in case of
+ // failure during depage
+
byte[] bytes = new byte[8];
-
+
ByteBuffer buff = ByteBuffer.wrap(bytes);
-
+
ServerMessage msg = message.getMessage(storageManager);
-
+
buff.putLong(msg.getMessageID());
-
+
SimpleString duplID = new SimpleString(bytes);
-
+
message.getMessage(storageManager).putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, duplID);
}
- int bytesToWrite = fileFactory.calculateBlockSize(message.getEncodeSize() + PageImpl.SIZE_RECORD);
+ int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
{
@@ -399,10 +402,10 @@
try
{
if (currentPage != null)
- {
-
+ {
+
currentPage.write(message);
-
+
if (sync)
{
currentPage.sync();
@@ -450,7 +453,7 @@
}
public boolean startDepaging(final Executor executor)
- {
+ {
currentPageLock.readLock().lock();
try
{
@@ -517,7 +520,7 @@
}
public void start() throws Exception
- {
+ {
writeLock.lock();
try
@@ -536,48 +539,49 @@
{
currentPageLock.writeLock().lock();
- if (createDir)
- {
- fileFactory.createDirs();
- }
-
- firstPageId = Integer.MAX_VALUE;
- currentPageId = 0;
- currentPage = null;
-
try
{
- List<String> files = fileFactory.listFiles("page");
+ running = true;
+ firstPageId = Integer.MAX_VALUE;
- numberOfPages = files.size();
-
- for (String fileName : files)
+ // There are no files yet on this Storage. We will just return it empty
+ if (fileFactory != null)
{
- final int fileId = getPageIdFromFileName(fileName);
- if (fileId > currentPageId)
+ currentPageId = 0;
+ currentPage = null;
+
+ List<String> files = fileFactory.listFiles("page");
+
+ numberOfPages = files.size();
+
+ for (String fileName : files)
{
- currentPageId = fileId;
+ final int fileId = getPageIdFromFileName(fileName);
+
+ if (fileId > currentPageId)
+ {
+ currentPageId = fileId;
+ }
+
+ if (fileId < firstPageId)
+ {
+ firstPageId = fileId;
+ }
}
- if (fileId < firstPageId)
+ if (numberOfPages != 0)
{
- firstPageId = fileId;
+ startPaging();
}
}
-
- running = true;
-
- if (numberOfPages != 0)
- {
- startPaging();
- }
}
finally
{
currentPageLock.writeLock().unlock();
}
}
+
}
finally
{
@@ -651,12 +655,14 @@
try
{
+
if (numberOfPages == 0)
{
return null;
}
else
{
+
numberOfPages--;
final Page returnPage;
@@ -685,6 +691,8 @@
returnPage.open();
returnPage.delete();
+ cleanupDirectory();
+
// This will trigger this Destination to exit the page mode,
// and this will make JBM start using the journal again
return null;
@@ -701,7 +709,6 @@
{
returnPage = createPage(firstPageId++);
}
-
return returnPage;
}
}
@@ -738,24 +745,23 @@
return;
}
-
// Depage has to be done atomically, in case of failure it should be
// back to where it was
-
+
Transaction depageTransaction = new TransactionImpl(storageManager);
-
+
depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
+
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
+
for (PagedMessage pagedMessage : pagedMessages)
{
ServerMessage message = null;
message = pagedMessage.getMessage(storageManager);
-
+
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
if (transactionIdDuringPaging >= 0)
@@ -767,7 +773,9 @@
// section
if (pageTransactionInfo == null)
{
- log.warn("Transaction " + pagedMessage.getTransactionID() + " used during paging not found, ignoring message " + message);
+ log.warn("Transaction " + pagedMessage.getTransactionID() +
+ " used during paging not found, ignoring message " +
+ message);
continue;
}
@@ -789,7 +797,7 @@
pageTransactionsToUpdate.add(pageTransactionInfo);
}
}
-
+
postOffice.route(message, depageTransaction);
}
@@ -809,7 +817,7 @@
}
depageTransaction.commit();
-
+
trace("Depage committed");
}
@@ -844,7 +852,7 @@
{
final boolean pageFull = isFull(getPageSizeBytes());
final boolean globalFull = isGlobalFull(getPageSizeBytes());
- if (pageFull || globalFull)
+ if (pageFull || globalFull || !isPaging())
{
depaging.set(false);
if (!globalFull)
@@ -895,6 +903,11 @@
{
String fileName = createFileName(page);
+ if (fileFactory == null)
+ {
+ fileFactory = storeFactory.newFileFactory(this.getStoreName());
+ }
+
SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
file.open();
@@ -938,9 +951,9 @@
private void readPage() throws Exception
{
Page page = depage();
-
+
if (page == null)
- {
+ {
return;
}
@@ -951,8 +964,22 @@
onDepage(page.getPageId(), storeName, messages);
page.delete();
+
}
+ /**
+ * @throws Exception
+ */
+ private void cleanupDirectory() throws Exception
+ {
+ if (fileFactory != null && numberOfPages == 0)
+ {
+ // No more files.. delete page directory
+ storeFactory.deleteFileFactory(storeName);
+ fileFactory = null;
+ }
+ }
+
// Inner classes -------------------------------------------------
private class DepageRunnable implements Runnable
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -27,6 +27,7 @@
import javax.transaction.xa.Xid;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -115,5 +116,12 @@
boolean deleteDestination(SimpleString destination) throws Exception;
- void loadBindings(BindableFactory queueFactory, List<Binding> bindings, List<SimpleString> destinations) throws Exception;
+ long addPageDirDestination(SimpleString pageAddress) throws Exception;
+
+ void deletePageDirDestination(long pageAddressID) throws Exception;
+
+ void loadBindings(BindableFactory queueFactory,
+ List<Binding> bindings,
+ List<SimpleString> destinations,
+ Map<SimpleString, Long> pageDestinationDirectory) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -104,6 +104,8 @@
public static final byte BINDING_RECORD = 21;
public static final byte DESTINATION_RECORD = 22;
+
+ public static final byte PAGE_DESTINATION_DIR_RECORD = 23;
// type + expiration + timestamp + priority
public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
@@ -630,6 +632,24 @@
bindingsJournal.appendDeleteRecord(id);
}
+
+
+ public long addPageDirDestination(final SimpleString pageAddress) throws Exception
+ {
+ long destinationID = idGenerator.generateID();
+
+ DestinationEncoding destinationEnc = new DestinationEncoding(pageAddress);
+
+ bindingsJournal.appendAddRecord(destinationID, PAGE_DESTINATION_DIR_RECORD, destinationEnc);
+
+ return destinationID;
+ }
+
+ public void deletePageDirDestination(final long pageAddressID) throws Exception
+ {
+ bindingsJournal.appendDeleteRecord(pageAddressID);
+ }
+
public boolean addDestination(final SimpleString destination) throws Exception
{
long destinationID = idGenerator.generateID();
@@ -667,7 +687,8 @@
public void loadBindings(final BindableFactory bindableFactory,
final List<Binding> bindings,
- final List<SimpleString> destinations) throws Exception
+ final List<SimpleString> destinations,
+ final Map<SimpleString, Long> pageDestinationDirectory) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -731,6 +752,14 @@
destinations.add(destinationEncoding.destination);
}
+ else if (rec == PAGE_DESTINATION_DIR_RECORD)
+ {
+ DestinationEncoding destinationEncoding = new DestinationEncoding();
+
+ destinationEncoding.decode(buffer);
+
+ pageDestinationDirectory.put(destinationEncoding.destination, id);
+ }
else
{
throw new IllegalStateException("Invalid record type " + rec);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -82,7 +82,8 @@
public void loadBindings(final BindableFactory queueFactory,
final List<Binding> bindings,
- final List<SimpleString> destinations) throws Exception
+ final List<SimpleString> destinations,
+ Map<SimpleString, Long> pageDestinationDirectory) throws Exception
{
}
@@ -223,4 +224,13 @@
{
}
+ public long addPageDirDestination(SimpleString pageAddress) throws Exception
+ {
+ return idGenerator.generateID();
+ }
+
+ public void deletePageDirDestination(long pageAddressID) throws Exception
+ {
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -526,8 +526,10 @@
List<Binding> bindings = new ArrayList<Binding>();
List<SimpleString> dests = new ArrayList<SimpleString>();
+
+ Map<SimpleString, Long> pageDestinations = new HashMap<SimpleString, Long>();
- storageManager.loadBindings(bindableFactory, bindings, dests);
+ storageManager.loadBindings(bindableFactory, bindings, dests, pageDestinations);
// Destinations must be added first to ensure flow controllers exist
// before queues are created
@@ -548,7 +550,7 @@
}
}
- pagingManager.reloadStores();
+ pagingManager.reloadStores(pageDestinations);
Map<SimpleString, List<Pair<SimpleString, Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<SimpleString, Long>>>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.persistence.impl.nullpm.NullStorageManager;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -66,7 +67,7 @@
queueSettings.setDefault(new QueueSettings());
PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getPageDir(), 10),
- null,
+ new NullStorageManager(),
queueSettings,
-1,
1024 * 1024,
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -26,10 +26,12 @@
import java.util.ArrayList;
import java.util.List;
+import org.easymock.classextension.EasyMock;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
@@ -65,9 +67,10 @@
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ null,
destinationTestName,
new QueueSettings(),
- executor, true);
+ executor);
storeImpl.start();
@@ -83,14 +86,19 @@
public void testStore() throws Exception
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ PagingStoreFactory storeFactory = EasyMock.createNiceMock(PagingStoreFactory.class);
+
+ EasyMock.replay(storeFactory);
PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ storeFactory,
destinationTestName,
new QueueSettings(),
- executor, true);
+ executor);
storeImpl.start();
@@ -121,9 +129,10 @@
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ null,
destinationTestName,
new QueueSettings(),
- executor, true);
+ executor);
storeImpl.start();
@@ -135,13 +144,22 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
+ SimpleString destination = new SimpleString("test");
+
+ PagingStoreFactory storeFactory = EasyMock.createMock(PagingStoreFactory.class);
+
+ storeFactory.deleteFileFactory(destination);
+
+ EasyMock.replay(storeFactory);
+
TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ storeFactory,
destinationTestName,
new QueueSettings(),
- executor, true);
+ executor);
storeImpl.start();
@@ -151,8 +169,6 @@
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
- SimpleString destination = new SimpleString("test");
-
for (int i = 0; i < 10; i++)
{
@@ -189,20 +205,32 @@
assertEquals(0, (msg.get(i).getMessage(null)).getMessageID());
assertEqualsByteArrays(buffers.get(i).array(), (msg.get(i).getMessage(null)).getBody().array());
}
+
+ EasyMock.verify(storeFactory);
}
public void testDepageMultiplePages() throws Exception
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
+ SimpleString destination = new SimpleString("test");
+
+ PagingStoreFactory storeFactory = EasyMock.createNiceMock(PagingStoreFactory.class);
+
+ EasyMock.expect(storeFactory.newFileFactory(destination)).andReturn(factory);
+
+ storeFactory.deleteFileFactory(destination);
+
+ EasyMock.replay(storeFactory);
TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ storeFactory,
destinationTestName,
new QueueSettings(),
- executor, true);
+ executor);
storeImpl.start();
@@ -214,8 +242,6 @@
List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
- SimpleString destination = new SimpleString("test");
-
for (int i = 0; i < 10; i++)
{
@@ -307,6 +333,8 @@
assertEquals(0, storeImpl.getNumberOfPages());
page.open();
+
+ EasyMock.verify(storeFactory);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2009-01-05 13:08:07 UTC (rev 5578)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2009-01-06 04:57:50 UTC (rev 5579)
@@ -39,6 +39,7 @@
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
@@ -91,6 +92,10 @@
protected void testConcurrentPaging(final SequentialFileFactory factory, final int numberOfThreads) throws Exception,
InterruptedException
{
+
+ PagingStoreFactory storeFactory = EasyMock.createNiceMock(PagingStoreFactory.class);
+
+ EasyMock.replay(storeFactory);
final int MAX_SIZE = 1024 * 10;
@@ -111,9 +116,10 @@
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ storeFactory,
new SimpleString("test"),
settings,
- executor, true);
+ executor);
storeImpl.start();
@@ -266,9 +272,10 @@
createStorageManagerMock(),
createPostOfficeMock(),
factory,
+ storeFactory,
new SimpleString("test"),
settings,
- executor, true);
+ executor);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
More information about the jboss-cvs-commits
mailing list