[jboss-cvs] JBoss Messaging SVN: r5453 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Dec 2 16:59:25 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-02 16:59:25 -0500 (Tue, 02 Dec 2008)
New Revision: 5453
Removed:
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Few tweaks on Paging...
I wanted to make this commit before my next change on Executors
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -77,12 +77,6 @@
void setPostOffice(PostOffice postOffice);
/**
- * @param pagingStoreImpl
- * @return false if the listener can't handle more pages
- */
- boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PagedMessage[] data) throws Exception;
-
- /**
* To be used by transactions only.
* If you're sure you will page if isPaging, just call the method page and look at its return.
* @param destination
@@ -108,6 +102,11 @@
* Point to inform/restoring Transactions used when the messages were added into paging
* */
void addTransaction(PageTransactionInfo pageTransaction);
+
+ /**
+ * Point to inform/restoring Transactions used when the messages were added into paging
+ * */
+ PageTransactionInfo getTransaction(long transactionID);
/**
*
@@ -130,14 +129,33 @@
void sync(Collection<SimpleString> destinationsToSync) throws Exception;
/**
- * When we stop depaging, The Last page record needs to removed.
- * Or else the record could live forever on the journal.
- * @throws Exception
- * */
- void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
+ * @return
+ */
+ long getDefaultPageSize();
/**
+ * @param transactionID
+ */
+ void removeTransaction(long transactionID);
+
+ /**
* @return
*/
- long getDefaultPageSize();
+ long getMaxGlobalSize();
+
+ /**
+ * @return
+ */
+ long getGlobalSize();
+
+ /**
+ * @param size
+ * @return
+ */
+ long addGlobalSize(long size);
+
+ /**
+ *
+ */
+ void startGlobalDepage();
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -46,18 +46,12 @@
/** Maximum number of bytes allowed in memory */
long getMaxSizeBytes();
- boolean isPrintedDropMessagesWarning();
-
- void setPrintedDropMessagesWarning(boolean droppedMessages);
-
boolean isDropWhenMaxSize();
long getPageSizeBytes();
long getAddressSize();
- long addAddressSize(long add);
-
/** @return true if paging was started, or false if paging was already started before this call */
boolean startPaging() throws Exception;
@@ -68,16 +62,7 @@
public boolean readPage() throws Exception;
boolean page(PagedMessage message) throws Exception;
-
- /**
- * Remove the first page from the Writing Queue.
- * The file will still exist until Page.delete is called,
- * So, case the system is reloaded the same Page will be loaded back if delete is not called.
- * @return
- * @throws Exception
- */
- Page depage() throws Exception;
-
+
/**
*
* @return false if a thread was already started, or if not in page mode
@@ -88,4 +73,11 @@
LastPageRecord getLastPageRecord();
void setLastPageRecord(LastPageRecord record);
+
+ /**
+ * @param memoryEstimate
+ * @return
+ * @throws Exception
+ */
+ long addSize(long memoryEstimate) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -24,6 +24,8 @@
import java.util.concurrent.Executor;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
@@ -42,4 +44,8 @@
void stop() throws InterruptedException;
void setPagingManager(PagingManager manager);
+
+ void setStorageManager(StorageManager storageManager);
+
+ void setPostOffice(PostOffice office);
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -22,10 +22,7 @@
package org.jboss.messaging.core.paging.impl;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,14 +30,12 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -80,8 +75,6 @@
private final long defaultPageSize;
- private PostOffice postOffice;
-
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
@@ -177,148 +170,9 @@
* (There is a one-to-one relationship here) */
public void setPostOffice(final PostOffice postOffice)
{
- this.postOffice = postOffice;
+ pagingSPI.setPostOffice(postOffice);
}
- public void clearLastPageRecord(final LastPageRecord lastRecord) throws Exception
- {
- trace("Clearing lastRecord information " + lastRecord.getLastId());
-
- storageManager.storeDelete(lastRecord.getRecordId());
- }
-
- /**
- * This method will remove files from the page system and and route them, doing it transactionally
- *
- * A Transaction will be opened only if persistent messages are used.
- *
- * If persistent messages are also used, it will update eventual PageTransactions
- */
-
- //TODO - this method should be moved to PagingStoreImpl
- public boolean onDepage(final int pageId,
- final SimpleString destination,
- final PagingStore pagingStore,
- final PagedMessage[] data) throws Exception
- {
- trace("Depaging....");
-
- // Depage has to be done atomically, in case of failure it should be
- // back to where it was
- final long depageTransactionID = storageManager.generateUniqueID();
-
- LastPageRecord lastPage = pagingStore.getLastPageRecord();
-
- if (lastPage == null)
- {
- lastPage = new LastPageRecordImpl(pageId, destination);
-
- pagingStore.setLastPageRecord(lastPage);
- }
- else
- {
- if (pageId <= lastPage.getLastId())
- {
- log.warn("Page " + pageId + " was already processed, ignoring the page");
- return true;
- }
- }
-
- lastPage.setLastId(pageId);
-
- storageManager.storeLastPage(depageTransactionID, lastPage);
-
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
- final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
- for (PagedMessage msg : data)
- {
- ServerMessage pagedMessage = null;
-
- pagedMessage = (ServerMessage)msg.getMessage(storageManager);
-
- final long transactionIdDuringPaging = msg.getTransactionID();
-
- if (transactionIdDuringPaging >= 0)
- {
- final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
-
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // This is the Step D described on the "Transactions on Paging"
- // section
- if (pageTransactionInfo == null)
- {
- if (isTrace)
- {
- trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + pagedMessage);
- }
- continue;
- }
-
- // This is to avoid a race condition where messages are depaged
- // before the commit arrived
- if (!pageTransactionInfo.waitCompletion())
- {
- trace("Rollback was called after prepare, ignoring message " + pagedMessage);
- continue;
- }
-
- // Update information about transactions
- if (pagedMessage.isDurable())
- {
- pageTransactionInfo.decrement();
- pageTransactionsToUpdate.add(pageTransactionInfo);
- }
- }
-
- refsToAdd.addAll(postOffice.route(pagedMessage));
-
- if (pagedMessage.getDurableRefCount() != 0)
- {
- storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
- }
- }
-
- for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
- {
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // http://wiki.jboss.org/wiki/JBossMessaging2Paging
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.storeDeletePageTransaction(depageTransactionID, pageWithTransaction.getRecordID());
- transactions.remove(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
- }
- }
-
- storageManager.commit(depageTransactionID);
-
- trace("Depage committed");
-
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
-
- if (globalMode.get())
- {
- // We use the Default Page Size when in global mode for the calculation of the Watermark
- return globalSize.get() < maxGlobalSize - defaultPageSize && pagingStore.getMaxSizeBytes() <= 0 ||
- pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
- }
- else
- {
- // If Max-size is not configured (-1) it will aways return true, as
- // this method was probably called by global-depage
- return pagingStore.getMaxSizeBytes() <= 0 || pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
- }
-
- }
-
public long getDefaultPageSize()
{
return defaultPageSize;
@@ -338,12 +192,12 @@
public void messageDone(final ServerMessage message) throws Exception
{
- addSize(message.getDestination(), message.getMemoryEstimate() * -1);
+ getPageStore(message.getDestination()).addSize(message.getMemoryEstimate() * -1);
}
public long addSize(final ServerMessage message) throws Exception
{
- return addSize(message.getDestination(), message.getMemoryEstimate());
+ return getPageStore(message.getDestination()).addSize(message.getMemoryEstimate());
}
public boolean page(final ServerMessage message, final long transactionId) throws Exception
@@ -360,6 +214,17 @@
{
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
}
+
+ public void removeTransaction(final long id)
+ {
+ transactions.remove(id);
+ }
+
+ public PageTransactionInfo getTransaction(final long id)
+ {
+ return transactions.get(id);
+ }
+
public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
{
@@ -386,6 +251,8 @@
pagingSPI.setPagingManager(this);
+ pagingSPI.setStorageManager(storageManager);
+
started = true;
}
@@ -405,120 +272,52 @@
store.stop();
}
}
+
+ public void startGlobalDepage()
+ {
+ if (globalDepageRunning.compareAndSet(false, true))
+ {
+ Runnable globalDepageRunnable = new GlobalDepager();
+ pagingSPI.getPagingExecutor().execute(globalDepageRunnable);
+ }
+ }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private PagingStore newStore(final SimpleString destinationName)
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.paging.PagingManager#getGlobalSize()
+ */
+ public long getGlobalSize()
{
- return pagingSPI.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
+ return this.globalSize.get();
}
+
- //TODO - this method should be moved to PagingStoreImpl
- private long addSize(final SimpleString destination, final long size) throws Exception
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.paging.PagingManager#addGlobalSize(long)
+ */
+ public long addGlobalSize(long size)
{
- final PagingStore store = getPageStore(destination);
+ return globalSize.addAndGet(size);
+ }
- final long maxSize = store.getMaxSizeBytes();
- final long pageSize = store.getPageSizeBytes();
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.paging.PagingManager#getMaxGlobalSize()
+ */
+ public long getMaxGlobalSize()
+ {
+ return maxGlobalSize;
+ }
+
- if (store.isDropWhenMaxSize() && size > 0)
- {
- // if destination configured to drop messages && size is over the
- // limit, we return -1 which means drop the message
- if (store.getAddressSize() + size > maxSize || maxGlobalSize > 0 && globalSize.get() + size > maxGlobalSize)
- {
- if (!store.isPrintedDropMessagesWarning())
- {
- store.setPrintedDropMessagesWarning(true);
-
- log.warn("Messages are being dropped on adress " + store.getStoreName());
- }
+ // Package protected ---------------------------------------------
- return -1l;
- }
- else
- {
- return store.addAddressSize(size);
- }
- }
- else
- {
- long currentGlobalSize = globalSize.addAndGet(size);
+ // Protected -----------------------------------------------------
- final long addressSize = store.addAddressSize(size);
+ // Private -------------------------------------------------------
- if (size > 0)
- {
- if (maxGlobalSize > 0 && currentGlobalSize > maxGlobalSize)
- {
- globalMode.set(true);
-
- if (store.startPaging())
- {
- if (isTrace)
- {
- trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
- }
- }
- }
- else if (maxSize > 0 && addressSize > maxSize)
- {
- if (store.startPaging())
- {
- if (isTrace)
- {
- trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
- }
- }
- }
- }
- else
- {
- // When in Global mode, we use the default page size as the minimal
- // watermark to start depage
-
- if (isTrace)
- {
- log.trace("globalMode.get = " + globalMode.get() +
- " currentGlobalSize = " +
- currentGlobalSize +
- " defaultPageSize = " +
- defaultPageSize +
- " maxGlobalSize = " +
- maxGlobalSize +
- "maxGlobalSize - defaultPageSize = " +
- (maxGlobalSize - defaultPageSize));
- }
-
- if (globalMode.get() && currentGlobalSize < maxGlobalSize - defaultPageSize)
- {
- startGlobalDepage();
- }
- else if (maxSize > 0 && addressSize < maxSize - pageSize)
- {
- if (store.startDepaging())
- {
- log.info("Starting depaging Thread, size = " + addressSize);
- }
- }
- }
-
- return addressSize;
- }
- }
-
- private void startGlobalDepage()
+ private PagingStore newStore(final SimpleString destinationName)
{
- if (globalDepageRunning.compareAndSet(false, true))
- {
- Runnable globalDepageRunnable = new GlobalDepager();
- pagingSPI.getPagingExecutor().execute(globalDepageRunnable);
- }
+ return pagingSPI.newStore(destinationName, queueSettingsRepository.getMatch(destinationName.toString()));
}
// Inner classes -------------------------------------------------
@@ -574,4 +373,5 @@
}
}
}
+
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreFactoryNIO.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -33,6 +33,8 @@
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.messaging.util.SimpleString;
@@ -54,6 +56,10 @@
private final ExecutorService executor;
private PagingManager pagingManager;
+
+ private StorageManager storageManager;
+
+ private PostOffice postOffice;
// Static --------------------------------------------------------
@@ -91,16 +97,28 @@
destinationFile.mkdirs();
return new PagingStoreImpl(pagingManager,
+ storageManager,
+ postOffice,
newFileFactory(destinationDirectory),
destinationName,
settings,
executor);
}
- public void setPagingManager(final PagingManager manager)
+ public void setPagingManager(final PagingManager pagingManager)
{
- pagingManager = manager;
+ this.pagingManager = pagingManager;
}
+
+ public void setStorageManager(final StorageManager storageManager)
+ {
+ this.storageManager = storageManager;
+ }
+
+ public void setPostOffice(final PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -23,6 +23,8 @@
package org.jboss.messaging.core.paging.impl;
import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -37,9 +39,14 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.Page;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
@@ -57,6 +64,10 @@
// Attributes ----------------------------------------------------
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
private final DecimalFormat format = new DecimalFormat("000000000");
private final AtomicInteger pageUsedSize = new AtomicInteger(0);
@@ -103,9 +114,24 @@
// Static --------------------------------------------------------
+ // private static final boolean isTrace = log.isTraceEnabled();
+ private static final boolean isTrace = true;
+
+ // This is just a debug tool method.
+ // During debugs you could make log.trace as log.info, and change the
+ // variable isTrace above
+ private static void trace(final String message)
+ {
+ // log.trace(message);
+ log.info(message);
+ }
+
+
// Constructors --------------------------------------------------
public PagingStoreImpl(final PagingManager pagingManager,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
final SequentialFileFactory fileFactory,
final SimpleString storeName,
final QueueSettings queueSettings,
@@ -116,6 +142,10 @@
throw new IllegalStateException("Paging Manager can't be null");
}
+ this.storageManager = storageManager;
+
+ this.postOffice = postOffice;
+
this.fileFactory = fileFactory;
this.storeName = storeName;
@@ -142,28 +172,11 @@
// PagingStore implementation ------------------------------------
- //TODO - this methods shouldn't be necessary if move functionality from
- //PagingManagerImpl to PagingStoreImpl
- public boolean isPrintedDropMessagesWarning()
- {
- return printedDropMessagesWarning;
- }
-
- public void setPrintedDropMessagesWarning(final boolean droppedMessages)
- {
- this.printedDropMessagesWarning = droppedMessages;
- }
-
public long getAddressSize()
{
return sizeInBytes.get();
}
- public long addAddressSize(final long delta)
- {
- return sizeInBytes.addAndGet(delta);
- }
-
/** Maximum number of bytes allowed in memory */
public long getMaxSizeBytes()
{
@@ -217,7 +230,7 @@
{
if (lastPageRecord != null)
{
- pagingManager.clearLastPageRecord(lastPageRecord);
+ clearLastPageRecord(lastPageRecord);
}
lastPageRecord = null;
@@ -229,7 +242,7 @@
PagedMessage messages[] = page.read();
- boolean addressNotFull = pagingManager.onDepage(page.getPageId(), storeName, PagingStoreImpl.this, messages);
+ boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
page.delete();
@@ -241,7 +254,6 @@
* The method calling this method will remove the page and will start reading it outside of any locks.
*
* */
- //FIXME - why is this public?
public Page depage() throws Exception
{
writeLock.lock();
@@ -303,6 +315,102 @@
}
+
+ public long addSize(final long size) throws Exception
+ {
+ final long maxSize = getMaxSizeBytes();
+
+ final long pageSize = getPageSizeBytes();
+
+ if (isDropWhenMaxSize() && size > 0)
+ {
+ // if destination configured to drop messages && size is over the
+ // limit, we return -1 which means drop the message
+ if (getAddressSize() + size > maxSize || pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() + size > pagingManager.getMaxGlobalSize())
+ {
+ if (!printedDropMessagesWarning)
+ {
+ printedDropMessagesWarning = true;
+
+ log.warn("Messages are being dropped on adress " + getStoreName());
+ }
+
+ return -1l;
+ }
+ else
+ {
+ return addAddressSize(size);
+ }
+ }
+ else
+ {
+ final long currentGlobalSize = pagingManager.addGlobalSize(size);
+
+ final long maxGlobalSize = pagingManager.getMaxGlobalSize();
+
+ final long addressSize = addAddressSize(size);
+
+ if (size > 0)
+ {
+ if (maxGlobalSize > 0 && currentGlobalSize > maxGlobalSize)
+ {
+ pagingManager.setGlobalPageMode(true);
+
+ if (startPaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting paging on " + getStoreName() + ", size = " + addressSize + ", maxSize=" + maxSize);
+ }
+ }
+ }
+ else if (maxSize > 0 && addressSize > maxSize)
+ {
+ if (startPaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting paging on " + getStoreName() + ", size = " + addressSize + ", maxSize=" + maxSize);
+ }
+ }
+ }
+ }
+ else
+ {
+ // When in Global mode, we use the default page size as the minimal
+ // watermark to start depage
+
+ if (isTrace)
+ {
+ log.trace("globalMode.get = " + pagingManager.isGlobalPageMode() +
+ " currentGlobalSize = " +
+ currentGlobalSize +
+ " defaultPageSize = " +
+ pagingManager.getDefaultPageSize() +
+ " maxGlobalSize = " +
+ maxGlobalSize +
+ "maxGlobalSize - defaultPageSize = " +
+ (maxGlobalSize - pagingManager.getDefaultPageSize()));
+ }
+
+ if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
+ {
+ pagingManager.startGlobalDepage();
+ }
+ else if (maxSize > 0 && addressSize < maxSize - pageSize)
+ {
+ if (startDepaging())
+ {
+ log.info("Starting depaging Thread, size = " + addressSize);
+ }
+ }
+ }
+
+ return addressSize;
+ }
+ }
+
+
public boolean page(final PagedMessage message) throws Exception
{
// Max-size is set, but reject is activated, what means.. never page on
@@ -564,6 +672,8 @@
writeLock.unlock();
}
}
+
+
// TestSupportPageStore ------------------------------------------
@@ -577,7 +687,144 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+
+
+ /**
+ * This method will remove files from the page system and and route them, doing it transactionally
+ *
+ * A Transaction will be opened only if persistent messages are used.
+ *
+ * If persistent messages are also used, it will update eventual PageTransactions
+ */
+
+ private boolean onDepage(final int pageId,
+ final SimpleString destination,
+ final PagedMessage[] data) throws Exception
+ {
+ trace("Depaging....");
+ // Depage has to be done atomically, in case of failure it should be
+ // back to where it was
+ final long depageTransactionID = storageManager.generateUniqueID();
+
+ LastPageRecord lastPage = getLastPageRecord();
+
+ if (lastPage == null)
+ {
+ lastPage = new LastPageRecordImpl(pageId, destination);
+
+ setLastPageRecord(lastPage);
+ }
+ else
+ {
+ if (pageId <= lastPage.getLastId())
+ {
+ log.warn("Page " + pageId + " was already processed, ignoring the page");
+ return true;
+ }
+ }
+
+ lastPage.setLastId(pageId);
+
+ storageManager.storeLastPage(depageTransactionID, lastPage);
+
+ HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+
+ final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
+ for (PagedMessage msg : data)
+ {
+ ServerMessage pagedMessage = null;
+
+ pagedMessage = (ServerMessage)msg.getMessage(storageManager);
+
+ final long transactionIdDuringPaging = msg.getTransactionID();
+
+ if (transactionIdDuringPaging >= 0)
+ {
+ final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
+
+ // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+ // This is the Step D described on the "Transactions on Paging"
+ // section
+ if (pageTransactionInfo == null)
+ {
+ if (isTrace)
+ {
+ trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + pagedMessage);
+ }
+ continue;
+ }
+
+ // This is to avoid a race condition where messages are depaged
+ // before the commit arrived
+ if (!pageTransactionInfo.waitCompletion())
+ {
+ trace("Rollback was called after prepare, ignoring message " + pagedMessage);
+ continue;
+ }
+
+ // Update information about transactions
+ if (pagedMessage.isDurable())
+ {
+ pageTransactionInfo.decrement();
+ pageTransactionsToUpdate.add(pageTransactionInfo);
+ }
+ }
+
+ refsToAdd.addAll(postOffice.route(pagedMessage));
+
+ if (pagedMessage.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
+ }
+ }
+
+ for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ {
+ if (pageWithTransaction.getNumberOfMessages() == 0)
+ {
+ // http://wiki.jboss.org/wiki/JBossMessaging2Paging
+ // numberOfReads==numberOfWrites -> We delete the record
+ storageManager.storeDeletePageTransaction(depageTransactionID, pageWithTransaction.getRecordID());
+ pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
+ }
+ else
+ {
+ storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
+ }
+ }
+
+ storageManager.commit(depageTransactionID);
+
+ trace("Depage committed");
+
+ for (MessageReference ref : refsToAdd)
+ {
+ ref.getQueue().addLast(ref);
+ }
+
+ if (pagingManager.isGlobalPageMode())
+ {
+ // We use the Default Page Size when in global mode for the calculation of the Watermark
+ return pagingManager.getGlobalSize() < pagingManager.getMaxGlobalSize() - pagingManager.getDefaultPageSize() && getMaxSizeBytes() <= 0 ||
+ getAddressSize() < getMaxSizeBytes();
+ }
+ else
+ {
+ // If Max-size is not configured (-1) it will aways return true, as
+ // this method was probably called by global-depage
+ return getMaxSizeBytes() <= 0 || getAddressSize() < getMaxSizeBytes();
+ }
+
+ }
+
+
+ private long addAddressSize(final long delta)
+ {
+ return sizeInBytes.addAndGet(delta);
+ }
+
private synchronized void clearDequeueThread()
{
dequeueThread = null;
@@ -615,6 +862,13 @@
}
}
+ public void clearLastPageRecord(final LastPageRecord lastRecord) throws Exception
+ {
+ trace("Clearing lastRecord information " + lastRecord.getLastId());
+
+ storageManager.storeDelete(lastRecord.getRecordId());
+ }
+
private Page createPage(final int page) throws Exception
{
String fileName = createFileName(page);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/TestSupportPageStore.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.paging.impl;
+import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagingStore;
/**
@@ -31,5 +32,16 @@
*/
public interface TestSupportPageStore extends PagingStore
{
+ /**
+ * Remove the first page from the Writing Queue.
+ * The file will still exist until Page.delete is called,
+ * So, case the system is reloaded the same Page will be loaded back if delete is not called.
+ * @return
+ * @throws Exception
+ *
+ * Note: This should still be part of the interface, even though JBossMessaging only uses through the
+ */
+ Page depage() throws Exception;
+
void forceAnotherPage() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -25,7 +25,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -48,11 +47,11 @@
import org.jboss.messaging.core.server.SendLock;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.SendLockImpl;
-import org.jboss.messaging.core.transaction.ResourceManager;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.SimpleString;
/**
* A PostOfficeImpl
@@ -495,36 +494,21 @@
queues.put(binding.getQueue().getPersistenceID(), binding.getQueue());
}
- // TODO: This is related to http://www.jboss.com/index.html?module=bb&op=viewtopic&t=145597
-
- //FIXME This is incorrect - you cannot assume there is an allowable address in existence
- //for every address in the post office.
- //This code is unnecessary if paging stores are loaded lazily
- HashSet<SimpleString> addresses = new HashSet<SimpleString>();
-
- for (Binding binding : bindings)
+
+ for (SimpleString destination : addressManager.getMappings().keySet())
{
- addresses.add(binding.getAddress());
- }
-
- for (SimpleString destination : dests)
- {
- addresses.add(destination);
- }
-
- for (SimpleString destination : addresses)
- {
pagingManager.createPageStore(destination);
}
- // End TODO -------------------------------------
-
-
storageManager.loadMessages(this, queues, resourceManager);
- for (SimpleString destination : addresses)
+
+ // If Paging was interrupted due a server stop, during restart we need to resume depaging those addresses
+ for (SimpleString destination : addressManager.getMappings().keySet())
{
PagingStore store = pagingManager.getPageStore(destination);
+
+ // FIXME this should be changed as soon as we change the threading model
if (!pagingManager.isGlobalPageMode())
{
if (store.isPaging() && store.getMaxSizeBytes() < 0)
@@ -537,6 +521,7 @@
}
}
}
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
+import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -65,7 +66,7 @@
{
HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
queueSettings.setDefault(new QueueSettings());
-
+
PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(journalDir),
null,
queueSettings,
@@ -74,7 +75,7 @@
managerImpl.start();
- PagingStore store = managerImpl.createPageStore(new SimpleString("simple-test"));
+ TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
@@ -120,7 +121,7 @@
-1,
1024 * 1024);
managerImpl.start();
-
+
managerImpl.createPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.paging.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.easymock.EasyMock;
-import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PagedMessage;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.PagingStoreFactory;
-import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
-import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
-import org.jboss.messaging.core.persistence.StorageManager;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
-
-public class PageManagerImplTest extends UnitTestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private static HierarchicalRepository<QueueSettings> repoSettings = new HierarchicalObjectRepository<QueueSettings>();
- static
- {
- repoSettings.setDefault(new QueueSettings());
- }
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
- public void testOnDepage() throws Exception
- {
- long time = System.currentTimeMillis() + 10000;
- List<MessageReference> refs = new ArrayList<MessageReference>();
- MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
- refs.add(ref);
- Queue queue = EasyMock.createStrictMock(Queue.class);
- HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
- queueSettings.setDefault(new QueueSettings());
- PostOffice po = EasyMock.createStrictMock(PostOffice.class);
- PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingStore store = EasyMock.createNiceMock(PagingStore.class);
- StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, storageManager, queueSettings, -1, 1024 * 1024);
- manager.setPostOffice(po);
- ServerMessage message = EasyMock.createStrictMock(ServerMessage.class);
-
- EasyMock.expect(storageManager.generateUniqueID()).andReturn(1l);
- EasyMock.expect(po.route(message)).andReturn(refs);
- EasyMock.expect(message.getDurableRefCount()).andReturn(1);
- storageManager.storeLastPage(EasyMock.anyLong(), (LastPageRecord) EasyMock.anyObject());
- storageManager.storeMessageTransactional(EasyMock.anyLong(), (ServerMessage) EasyMock.anyObject());
- storageManager.commit(EasyMock.anyLong());
- EasyMock.expect(ref.getQueue()).andReturn(queue);
- EasyMock.expect(queue.addLast(ref)).andReturn(null);
- EasyMock.replay(spi, store, message, storageManager, po, ref, queue);
- SimpleString queueName = new SimpleString("aq");
- PagedMessageImpl pageMessage = new PagedMessageImpl(message);
-
- manager.onDepage(0, queueName, store, new PagedMessage[] {pageMessage} );
- EasyMock.verify(spi, store, message, storageManager, po, ref, queue);
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -61,7 +61,13 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+ PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ destinationTestName,
+ new QueueSettings(),
+ executor);
storeImpl.start();
@@ -78,7 +84,13 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+ PagingStore storeImpl = new PagingStoreImpl(createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ destinationTestName,
+ new QueueSettings(),
+ executor);
storeImpl.start();
@@ -105,7 +117,13 @@
storeImpl.sync();
- storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+ storeImpl = new PagingStoreImpl(createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ destinationTestName,
+ new QueueSettings(),
+ executor);
storeImpl.start();
@@ -113,12 +131,17 @@
}
-
public void testDepageOnCurrentPage() throws Exception
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(createMockManager(), factory, destinationTestName, new QueueSettings(), executor);
+ TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
+ factory,
+ destinationTestName,
+ new QueueSettings(),
+ executor);
storeImpl.start();
@@ -174,6 +197,8 @@
SequentialFileFactory factory = new FakeSequentialFileFactory();
TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
factory,
destinationTestName,
new QueueSettings(),
@@ -292,9 +317,9 @@
testConcurrentPaging(factory, 10);
}
-
+
public void testFoo()
- {
+ {
}
// Protected ----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-12-02 15:06:47 UTC (rev 5452)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-12-02 21:59:25 UTC (rev 5453)
@@ -42,6 +42,8 @@
import org.jboss.messaging.core.paging.impl.PagedMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
@@ -106,6 +108,8 @@
settings.setPageSizeBytes(MAX_SIZE);
final TestSupportPageStore storeImpl = new PagingStoreImpl(createMockManager(),
+ createStorageManagerMock(),
+ createPostOfficeMock(),
factory,
new SimpleString("test"),
settings,
@@ -259,7 +263,7 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(), factory, new SimpleString("test"), settings, executor);
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(createMockManager(), createStorageManagerMock(), createPostOfficeMock(), factory, new SimpleString("test"), settings, executor);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
@@ -357,6 +361,21 @@
EasyMock.replay(mockManager);
return mockManager;
}
+
+ protected StorageManager createStorageManagerMock()
+ {
+ StorageManager storageManager = EasyMock.createNiceMock(StorageManager.class);
+ EasyMock.replay(storageManager);
+ return storageManager;
+ }
+
+
+ protected PostOffice createPostOfficeMock()
+ {
+ PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
+ EasyMock.replay(postOffice);
+ return postOffice;
+ }
// Private -------------------------------------------------------
More information about the jboss-cvs-commits
mailing list