[jboss-cvs] JBoss Messaging SVN: r4890 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/paging/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Aug 28 15:59:29 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-28 15:59:28 -0400 (Thu, 28 Aug 2008)
New Revision: 4890
Removed:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
Log:
Merging Pager and PagingManager into a single class
Deleted: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/Pager.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -1,103 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.paging;
-
-import java.util.Collection;
-
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- *
- * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
- *
- * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- */
-public interface Pager
-{
-
- /**
- * @param pagingStoreImpl
- * @return false if the listener can't handle more pages
- */
- boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
-
- /**
- * To be used by transactions only.
- * If you're sure you will page if isPaging, just call the method page and look at its return.
- * @param destination
- * @return
- */
- boolean isPaging(SimpleString destination) throws Exception;
-
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @return false if destination is not on page mode
- */
- boolean page(ServerMessage message) throws Exception;
-
- /**
- * Page, only if destination is in page mode.
- * @param message
- * @return false if destination is not on page mode
- */
- boolean page(ServerMessage message, long transactionId) throws Exception;
-
- /**
- * Point to inform/restoring Transactions used when the messages were added into paging
- * */
- void addTransaction(PageTransactionInfo pageTransaction);
-
-
- void completeTransaction(long transactionId);
-
-
- /**
- *
- * Duplication detection for paging processing
- * */
- void loadLastPage(LastPageRecord lastPage) throws Exception;
-
- /**
- *
- * To be called when there are no more references to the message
- * @param message
- */
- void messageDone(ServerMessage message) throws Exception;
-
- /** To be called when a rollback is called after messageDone was called */
- long addSize(ServerMessage message) throws Exception;
-
- void sync(Collection<SimpleString> destinationsToSync) throws Exception;
-
- /**
- * When we stop depaging, The Last page record needs to removed.
- * Or else the record could live forever on the journal.
- * @throws Exception
- * */
- void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
-
-
-}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -22,7 +22,11 @@
package org.jboss.messaging.core.paging;
+import java.util.Collection;
+
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessagingComponent;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
/**
@@ -30,9 +34,85 @@
* <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public interface PagingManager extends MessagingComponent
{
- public PagingStore getPageStore(SimpleString storeName) throws Exception;
+
+ /** To return the PageStore associated with the address */
+ public PagingStore getPageStore(SimpleString address) throws Exception;
+
+ /** An injection point for the PostOffice to inject itself */
+ void setPostOffice(PostOffice postOffice);
+
+ /**
+ * @param pagingStoreImpl
+ * @return false if the listener can't handle more pages
+ */
+ boolean onDepage(int pageId, SimpleString destination, PagingStore pagingStoreImpl, PageMessage[] data) throws Exception;
+
+ /**
+ * To be used by transactions only.
+ * If you're sure you will page if isPaging, just call the method page and look at its return.
+ * @param destination
+ * @return
+ */
+ boolean isPaging(SimpleString destination) throws Exception;
+
+ /**
+ * Page, only if destination is in page mode.
+ * @param message
+ * @return false if destination is not on page mode
+ */
+ boolean page(ServerMessage message) throws Exception;
+
+ /**
+ * Page, only if destination is in page mode.
+ * @param message
+ * @return false if destination is not on page mode
+ */
+ boolean page(ServerMessage message, long transactionId) throws Exception;
+
+ /**
+ * Point to inform/restoring Transactions used when the messages were added into paging
+ * */
+ void addTransaction(PageTransactionInfo pageTransaction);
+
+
+ /**
+ * Use this method to inform when a transaction was completed.
+ * @param transactionId
+ */
+ void completeTransaction(long transactionId);
+
+
+ /**
+ *
+ * Duplication detection for paging processing
+ * */
+ void loadLastPage(LastPageRecord lastPage) throws Exception;
+
+ /**
+ *
+ * To be called when there are no more references to the message
+ * @param message
+ */
+ void messageDone(ServerMessage message) throws Exception;
+
+ /** To be called when a rollback is called after messageDone was called */
+ long addSize(ServerMessage message) throws Exception;
+
+ /** Sync current-pages on disk for these destinations */
+ void sync(Collection<SimpleString> destinationsToSync) throws Exception;
+
+ /**
+ * When we stop depaging, The Last page record needs to removed.
+ * Or else the record could live forever on the journal.
+ * @throws Exception
+ * */
+ void clearLastPageRecord(LastPageRecord lastRecord) throws Exception;
+
+
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -75,7 +75,7 @@
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
- boolean startDequeueThread(Pager listener) throws Exception;
+ boolean startDequeueThread(PagingManager listener) throws Exception;
LastPageRecord getLastRecord();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -23,23 +23,40 @@
package org.jboss.messaging.core.paging.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.LastPageRecord;
+import org.jboss.messaging.core.paging.PageMessage;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
/**
+ * <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
public class PagingManagerImpl implements PagingManager
{
+
+
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -52,18 +69,43 @@
private final PagingStoreFactory pagingSPI;
- // Static --------------------------------------------------------
+ private final StorageManager storageManager;
+
+ private PostOffice postOffice;
- // Constructors --------------------------------------------------
+ private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
- public PagingManagerImpl(final PagingStoreFactory pagingSPI, final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+
+
+ // Static --------------------------------------------------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+
+ //private static final boolean isTrace = log.isTraceEnabled();
+ private static final boolean isTrace = true;
+
+ // This is just a debug tool method.
+ // During debugs you could make log.trace as log.info, and change the variable isTrace above
+ private static void trace(String message)
{
+ //log.trace(message);
+ log.info(message);
+ }
+
+
+ // Constructors --------------------------------------------------------------------------------------------------------------------
+
+ public PagingManagerImpl(final PagingStoreFactory pagingSPI, StorageManager storageManager, final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ {
this.pagingSPI = pagingSPI;
this.queueSettingsRepository = queueSettingsRepository;
+ this.storageManager = storageManager;
}
- // Public --------------------------------------------------------
+ // Public ---------------------------------------------------------------------------------------------------------------------------
+ // PagingManager implementation -----------------------------------------------------------------------------------------------------
+
public PagingStore getPageStore(final SimpleString storeName) throws Exception
{
validateStarted();
@@ -88,6 +130,181 @@
}
+ /** this will be set by the postOffice itself.
+ * There is no way to set this on the constructor as the PagingManager is constructed before the postOffice.
+ * (There is a one-to-one relationship here) */
+ public void setPostOffice(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
+
+ public void clearLastPageRecord(LastPageRecord lastRecord) throws Exception
+ {
+ trace("Clearing lastRecord information " + lastRecord.getLastId());
+ storageManager.storeDelete(lastRecord.getRecordId());
+ }
+
+ /**
+ * This method will remove files from the page system and add them into the journal, doing it transactionally
+ *
+ * A Transaction will be opened only if persistent messages are used.
+ * If persistent messages are also used, it will update eventual PageTransactions
+ */
+ public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
+ {
+ log.info("Depaging....");
+
+ /// Depage has to be done atomically, in case of failure it should be back to where it was
+ final long depageTransactionID = storageManager.generateTransactionID();
+
+ LastPageRecord lastPage = pagingStore.getLastRecord();
+
+ if (lastPage == null)
+ {
+ lastPage = new LastPageRecordImpl(pageId, destination);
+ pagingStore.setLastRecord(lastPage);
+ }
+ else
+ {
+ if (pageId <= lastPage.getLastId())
+ {
+ log.warn("Page " + pageId + " was already processed, ignoring the page");
+ return true;
+ }
+ }
+
+ lastPage.setLastId(pageId);
+ storageManager.storeLastPage(depageTransactionID, lastPage);
+
+ HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+
+ final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
+
+ for (PageMessage msg: data)
+ {
+ final long transactionIdDuringPaging = msg.getTransactionID();
+ if (transactionIdDuringPaging > 0)
+ {
+ final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
+
+ // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+ // This is the Step D described on the "Transactions on Paging" section
+ if (pageTransactionInfo == null)
+ {
+ if (isTrace)
+ {
+ trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+ }
+ continue;
+ }
+
+ // This is to avoid a race condition where messages are depaged before the commit arrived
+ pageTransactionInfo.waitCompletion();
+
+ /// Update information about transactions
+ if (msg.getMessage().isDurable())
+ {
+ pageTransactionInfo.decrement();
+ pageTransactionsToUpdate.add(pageTransactionInfo);
+ }
+ }
+
+ msg.getMessage().setMessageID(storageManager.generateMessageID());
+
+ refsToAdd.addAll(postOffice.route(msg.getMessage()));
+
+ if (msg.getMessage().getDurableRefCount() != 0)
+ {
+ storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
+ }
+ }
+
+
+ for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
+ {
+ if (pageWithTransaction.getNumberOfMessages() == 0)
+ {
+ // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
+ // numberOfReads==numberOfWrites -> We delete the record
+ storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
+ this.transactions.remove(pageWithTransaction.getTransactionID());
+ }
+ else
+ {
+ storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
+ }
+ }
+
+ storageManager.commit(depageTransactionID);
+
+ for (MessageReference ref : refsToAdd)
+ {
+ ref.getQueue().addLast(ref);
+ }
+
+ return pagingStore.getQueueSize() < pagingStore.getMaxSizeBytes();
+ }
+
+ public void loadLastPage(LastPageRecord lastPage) throws Exception
+ {
+ System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
+ this.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
+ }
+
+ public boolean isPaging(SimpleString destination) throws Exception
+ {
+ return this.getPageStore(destination).isPaging();
+ }
+
+ public void messageDone(ServerMessage message) throws Exception
+ {
+ addSize(message.getDestination(), message.getEncodeSize() * -1);
+ }
+
+ public long addSize(final ServerMessage message) throws Exception
+ {
+ return addSize(message.getDestination(), message.getEncodeSize());
+ }
+
+ public boolean page(ServerMessage message, long transactionId)
+ throws Exception
+ {
+ return this.getPageStore(message.getDestination()).page(new PageMessageImpl(message, transactionId));
+ }
+
+
+ public boolean page(ServerMessage message) throws Exception
+ {
+ return this.getPageStore(message.getDestination()).page(new PageMessageImpl(message));
+ }
+
+
+ public void addTransaction(PageTransactionInfo pageTransaction)
+ {
+ this.transactions.put(pageTransaction.getTransactionID(), pageTransaction);
+ }
+
+ public void completeTransaction(long transactionId)
+ {
+ PageTransactionInfo pageTrans = this.transactions.get(transactionId);
+
+ // If nothing was paged.. we just remove the information to avoid memory leaks
+ if (pageTrans.getNumberOfMessages() == 0)
+ {
+ this.transactions.remove(pageTrans);
+ }
+ }
+
+ public void sync(Collection<SimpleString> destinationsToSync) throws Exception
+ {
+ for (SimpleString destination: destinationsToSync)
+ {
+ this.getPageStore(destination).sync();
+ }
+ }
+
+ // MessagingComponent implementation ------------------------------------------------------------------------------------------------
+
public boolean isStarted()
{
return started;
@@ -128,7 +345,45 @@
throw new IllegalStateException("PagingManager is not started");
}
}
+
+ private long addSize(final SimpleString destination, final long size) throws Exception
+ {
+ final PagingStore store = this.getPageStore(destination);
+
+ final long addressSize = store.addQueueSize(size);
+
+ final long maxSize = store.getMaxSizeBytes();
+
+ if (size > 0)
+ {
+
+ if (maxSize > 0 && addressSize > maxSize)
+ {
+ if (store.startPaging())
+ {
+ if (isTrace)
+ {
+ trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
+ }
+ }
+ }
+ }
+ else
+ {
+ if ( maxSize > 0 && addressSize < maxSize)
+ {
+ if (store.startDequeueThread(this))
+ {
+ log.info("Starting depaging Thread, size = " + addressSize);
+ }
+ }
+ }
+
+ return addressSize;
+ }
+
+
// Inner classes -------------------------------------------------
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -37,7 +37,7 @@
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PageMessage;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
@@ -294,7 +294,7 @@
}
}
- public boolean startDequeueThread(final Pager listener) throws Exception
+ public boolean startDequeueThread(final PagingManager listener) throws Exception
{
lock.readLock().lock();
try
@@ -424,16 +424,26 @@
{
validateInit();
+
+ // First check without any global locks.
+ // (Faster)
+ lock.readLock().lock();
+ try
+ {
+ if (currentPage != null)
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+
+
+ // if the first check failed, we do it again under a global lock (positioningGlobalLock) this time
positioningGlobalLock.acquire();
- // StartPaging would change positioning (by changing currentPage), because of that it needs to be in a synchronized block.
- // Case this lock becomes a contention, we will need to implement the dual-lock antipattern (which I tried to avoid):
- // if (currentPage == null)
- // {
- // synchronizedBlockLock.acquire();
- // if (currentPage == null) // this dual-verification should be fine as currentPage is volatile
- // etc, etc...
-
try
{
if (currentPage == null)
@@ -560,9 +570,9 @@
class DequeueThread extends Thread
{
- final Pager listener;
+ final PagingManager listener;
- public DequeueThread(final Pager listener)
+ public DequeueThread(final PagingManager listener)
{
this.listener = listener;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -49,7 +49,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -326,9 +326,9 @@
pageTransactionInfo.setRecordID(record.id);
- Pager pager = postOffice.getPager();
+ PagingManager pagingManager = postOffice.getPagingManager();
- pager.addTransaction(pageTransactionInfo);
+ pagingManager.addTransaction(pageTransactionInfo);
break;
}
@@ -341,10 +341,10 @@
recordImpl.setRecordId(record.id);
recordImpl.decode(buff);
-
- Pager pager = postOffice.getPager();
- pager.loadLastPage(recordImpl);
+ PagingManager pagingManager = postOffice.getPagingManager();
+
+ pagingManager.loadLastPage(recordImpl);
break;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -27,7 +27,7 @@
import java.util.Set;
import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingComponent;
@@ -83,7 +83,7 @@
Set<SimpleString> listAllDestinations();
- Pager getPager();
+ PagingManager getPagingManager();
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -23,10 +23,8 @@
package org.jboss.messaging.core.postoffice.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -38,14 +36,8 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.LastPageRecord;
-import org.jboss.messaging.core.paging.PageMessage;
-import org.jboss.messaging.core.paging.PageTransactionInfo;
-import org.jboss.messaging.core.paging.Pager;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.paging.impl.LastPageRecordImpl;
-import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -100,13 +92,12 @@
private final PagingManager pagingManager;
- private final Pager pager = new PagerImpl();
-
private volatile boolean started;
public PostOfficeImpl(final StorageManager storageManager,
final PagingManager pagingManager,
- final QueueFactory queueFactory, final boolean checkAllowable)
+ final QueueFactory queueFactory,
+ final boolean checkAllowable)
{
this.storageManager = storageManager;
@@ -121,6 +112,8 @@
public void start() throws Exception
{
+ this.pagingManager.setPostOffice(this);
+
pagingManager.start();
loadBindings();
@@ -241,7 +234,7 @@
public List<MessageReference> route(final ServerMessage message) throws Exception
{
- pager.addSize(message);
+ pagingManager.addSize(message);
SimpleString address = message.getDestination();
@@ -298,9 +291,9 @@
// }
// }
- public Pager getPager()
+ public PagingManager getPagingManager()
{
- return this.pager;
+ return this.pagingManager;
}
@@ -417,221 +410,8 @@
for (SimpleString destination: dests)
{
PagingStore store = pagingManager.getPageStore(destination);
- store.startDequeueThread(pager);
+ store.startDequeueThread(pagingManager);
}
}
- // TODO this probably will become a separate class?
- private class PagerImpl implements Pager
- {
-
- private final ConcurrentMap</*TransactionID*/ Long , PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
-
- public void clearLastPageRecord(LastPageRecord lastRecord) throws Exception
- {
- trace("Clearing lastRecord information " + lastRecord.getLastId());
- storageManager.storeDelete(lastRecord.getRecordId());
- }
-
- /**
- * This method will remove files from the page system and add them into the journal, doing it transactionally
- *
- * A Transaction will be opened only if persistent messages are used.
- * If persistent messages are also used, it will update eventual PageTransactions
- */
- public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
- {
- log.info("Depaging....");
-
- /// Depage has to be done atomically, in case of failure it should be back to where it was
- final long depageTransactionID = storageManager.generateTransactionID();
-
- LastPageRecord lastPage = pagingStore.getLastRecord();
-
- if (lastPage == null)
- {
- lastPage = new LastPageRecordImpl(pageId, destination);
- pagingStore.setLastRecord(lastPage);
- }
- else
- {
- if (pageId <= lastPage.getLastId())
- {
- log.warn("Page " + pageId + " was already processed, ignoring the page");
- return true;
- }
- }
-
- lastPage.setLastId(pageId);
- storageManager.storeLastPage(depageTransactionID, lastPage);
-
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
- final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
- for (PageMessage msg: data)
- {
- final long transactionIdDuringPaging = msg.getTransactionID();
- if (transactionIdDuringPaging > 0)
- {
- final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
-
- // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
- // This is the Step D described on the "Transactions on Paging" section
- if (pageTransactionInfo == null)
- {
- if (isTrace)
- {
- trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
- }
- continue;
- }
-
- // This is to avoid a race condition where messages are depaged before the commit arrived
- pageTransactionInfo.waitCompletion();
-
- /// Update information about transactions
- if (msg.getMessage().isDurable())
- {
- pageTransactionInfo.decrement();
- pageTransactionsToUpdate.add(pageTransactionInfo);
- }
- }
-
- msg.getMessage().setMessageID(storageManager.generateMessageID());
-
- refsToAdd.addAll(PostOfficeImpl.this.route(msg.getMessage()));
-
- if (msg.getMessage().getDurableRefCount() != 0)
- {
- storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
- }
- }
-
-
- for (PageTransactionInfo pageWithTransaction: pageTransactionsToUpdate)
- {
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.storeDeleteTransactional(depageTransactionID, pageWithTransaction.getRecordID());
- this.transactions.remove(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransactionID, pageWithTransaction);
- }
- }
-
- storageManager.commit(depageTransactionID);
-
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
-
- return pagingStore.getQueueSize() < pagingStore.getMaxSizeBytes();
- }
-
- public void loadLastPage(LastPageRecord lastPage) throws Exception
- {
- System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
- pagingManager.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
- }
-
- public boolean isPaging(SimpleString destination) throws Exception
- {
- return pagingManager.getPageStore(destination).isPaging();
- }
-
- public void messageDone(ServerMessage message) throws Exception
- {
- addSize(message.getDestination(), message.getEncodeSize() * -1);
- }
-
- /** To be called when a rollback is called after messageDone was called */
- public long addSize(final ServerMessage message) throws Exception
- {
- return addSize(message.getDestination(), message.getEncodeSize());
- }
-
- public boolean page(ServerMessage message, long transactionId)
- throws Exception
- {
- return pagingManager.getPageStore(message.getDestination()).page(new PageMessageImpl(message, transactionId));
- }
-
-
- public boolean page(ServerMessage message) throws Exception
- {
- return pagingManager.getPageStore(message.getDestination()).page(new PageMessageImpl(message));
- }
-
-
- public void addTransaction(PageTransactionInfo pageTransaction)
- {
- this.transactions.put(pageTransaction.getTransactionID(), pageTransaction);
- }
-
- public void completeTransaction(long transactionId)
- {
- PageTransactionInfo pageTrans = this.transactions.get(transactionId);
-
- // If nothing was paged.. we just remove the information to avoid memory leaks
- if (pageTrans.getNumberOfMessages() == 0)
- {
- this.transactions.remove(pageTrans);
- }
- }
-
- public void sync(Collection<SimpleString> destinationsToSync) throws Exception
- {
- for (SimpleString destination: destinationsToSync)
- {
- pagingManager.getPageStore(destination).sync();
- }
- }
-
-
-
- private long addSize(final SimpleString destination, final long size) throws Exception
- {
- final PagingStore store = pagingManager.getPageStore(destination);
-
- final long addressSize = store.addQueueSize(size);
-
- final long maxSize = store.getMaxSizeBytes();
-
- if (size > 0)
- {
-
- if (maxSize > 0 && addressSize > maxSize)
- {
- if (store.startPaging())
- {
- if (isTrace)
- {
- trace("Starting paging on " + destination + ", size = " + addressSize + ", maxSize=" + maxSize);
- }
- }
- }
- }
- else
- {
- if ( maxSize > 0 && addressSize < maxSize)
- {
- if (store.startDequeueThread(this))
- {
- log.info("Starting depaging Thread, size = " + addressSize);
- }
- }
- }
-
- return addressSize;
- }
-
-
-
- }
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -169,7 +169,7 @@
scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
- PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()), queueSettingsRepository);
+ PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()), storageManager, queueSettingsRepository);
postOffice = new PostOfficeImpl(storageManager, pagingManager,
queueFactory, configuration.isRequireDestinations());
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -39,7 +39,7 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.filter.impl.FilterImpl;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -130,7 +130,7 @@
private final PostOffice postOffice;
- private final Pager pager;
+ private final PagingManager pager;
private final SecurityStore securityStore;
@@ -164,7 +164,7 @@
this.postOffice = postOffice;
- this.pager = postOffice.getPager();
+ this.pager = postOffice.getPagingManager();
this.queueSettingsRepository = queueSettingsRepository;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -33,7 +33,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -59,7 +59,7 @@
private final PostOffice postOffice;
- private final Pager pager;
+ private final PagingManager pagingManager;
private final List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
@@ -83,16 +83,16 @@
final PostOffice postOffice)
{
this.storageManager = storageManager;
-
+
this.postOffice = postOffice;
if (postOffice == null)
{
- pager = null;
+ pagingManager = null;
}
else
{
- this.pager = postOffice.getPager();
+ this.pagingManager = postOffice.getPagingManager();
}
this.xid = null;
@@ -106,9 +106,17 @@
this.storageManager = storageManager;
this.postOffice = postOffice;
-
- this.pager = postOffice.getPager();
+ if (postOffice == null)
+ {
+ pagingManager = null;
+ }
+ else
+ {
+ this.pagingManager = postOffice.getPagingManager();
+ }
+
+
this.xid = xid;
this.id = storageManager.generateTransactionID();
@@ -129,7 +137,7 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
- if (pager.isPaging(message.getDestination()))
+ if (pagingManager.isPaging(message.getDestination()))
{
pagedMessages.add(message);
}
@@ -153,9 +161,9 @@
if (message.decrementRefCount() == 0)
{
- if (pager != null)
+ if (pagingManager != null)
{
- pager.messageDone(message);
+ pagingManager.messageDone(message);
}
}
@@ -320,7 +328,7 @@
// Putting back the size to control paging
if (message.incrementRefCount() == 1)
{
- pager.addSize(message);
+ pagingManager.addSize(message);
}
message.incrementRefCount();
@@ -441,7 +449,7 @@
{
pageTransaction = new PageTransactionInfoImpl(this.id);
// To avoid a race condition where depage happens before the transaction is completed, we need to inform the pager about this transaction is being processed
- pager.addTransaction(pageTransaction);
+ pagingManager.addTransaction(pageTransaction);
}
}
@@ -451,7 +459,7 @@
// http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging
// Explained under Transaction On Paging. (This is the item B)
- if (pager.page(message, id))
+ if (pagingManager.page(message, id))
{
if (message.isDurable())
{
@@ -473,7 +481,7 @@
containsPersistent = true;
if (pagedDestinationsToSync.size() > 0)
{
- pager.sync(pagedDestinationsToSync);
+ pagingManager.sync(pagedDestinationsToSync);
storageManager.storePageTransaction(id, pageTransaction);
}
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/util/TypedProperties.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -62,7 +62,7 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class TypedProperties implements EncodingSupport
+public class TypedProperties
{
private static final Logger log = Logger.getLogger(TypedProperties.class);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -69,7 +69,7 @@
PagingManagerImpl managerImpl =
- new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024), queueSettings);
+ new PagingManagerImpl(new PagingManagerFactoryNIO(journalDir, 1024*1024), null, queueSettings);
managerImpl.start();
PagingStore store = managerImpl.getPageStore(new SimpleString("simple-test"));
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -28,7 +28,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.paging.Pager;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
import org.jboss.messaging.core.postoffice.PostOffice;
@@ -166,16 +166,12 @@
public boolean page(ServerMessage message, long transactionID)
throws Exception
{
- // TODO Auto-generated method stub
return false;
}
- public Pager getPager()
+ public PagingManager getPagingManager()
{
- // TODO Auto-generated method stub
return null;
}
-
-
-
+
}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-28 19:40:40 UTC (rev 4889)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-28 19:59:28 UTC (rev 4890)
@@ -67,7 +67,7 @@
queueSettings.setDefault(new QueueSettings());
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, queueSettings);
+ PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings);
SimpleString destination = new SimpleString("some-destination");
@@ -120,7 +120,7 @@
public void testMultipleThreadsGetStore() throws Exception
{
PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- final PagingManagerImpl manager = new PagingManagerImpl(spi, repoSettings);
+ final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings);
final SimpleString destination = new SimpleString("some-destination");
More information about the jboss-cvs-commits
mailing list