[jboss-cvs] JBoss Messaging SVN: r4833 - in branches/Branch_JBMESSAGING-1314: src/main/org/jboss/messaging/core/config/impl and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 18 19:26:08 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-08-18 19:26:08 -0400 (Mon, 18 Aug 2008)
New Revision: 4833
Modified:
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java
Log:
Integrating Paging into PostOffice (2nd commit)
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/Configuration.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -125,6 +125,14 @@
String getJournalDirectory();
void setJournalDirectory(String dir);
+
+ String getPagingDirectory();
+
+ void setPagingDirectory(String dir);
+
+ void setPageFileSize(long size);
+
+ long getPageFileSize();
JournalType getJournalType();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -77,6 +77,10 @@
public static final String DEFAULT_JOURNAL_DIR = "data/journal";
+ public static final String DEFAULT_PAGING_DIR = "data/paging";
+
+ public static final long DEFAULT_PAGE_FILE_SIZE = 10 * 1024 * 1024;
+
public static final boolean DEFAULT_CREATE_JOURNAL_DIR = true;
public static final JournalType DEFAULT_JOURNAL_TYPE = JournalType.ASYNCIO;
@@ -115,6 +119,10 @@
protected String journalDirectory = DEFAULT_JOURNAL_DIR;
+ protected String pagingDirectory = DEFAULT_PAGING_DIR;
+
+ protected long pageFileSize = DEFAULT_PAGE_FILE_SIZE;
+
protected boolean createJournalDir = DEFAULT_CREATE_JOURNAL_DIR;
public JournalType journalType = DEFAULT_JOURNAL_TYPE;
@@ -357,6 +365,27 @@
return journalType;
}
+
+ public void setPagingDirectory(String dir)
+ {
+ this.pagingDirectory = dir;
+ }
+
+ public String getPagingDirectory()
+ {
+ return this.pagingDirectory;
+ }
+
+ public long getPageFileSize()
+ {
+ return this.pageFileSize;
+ }
+
+ public void setPageFileSize(long size)
+ {
+ this.pageFileSize = size;
+ }
+
public void setJournalType(JournalType type)
{
this.journalType = type;
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -26,9 +26,7 @@
import java.io.Reader;
import java.net.URL;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
@@ -189,6 +187,10 @@
createBindingsDir = getBoolean(e, "create-bindings-dir", createBindingsDir);
journalDirectory = getString(e, "journal-directory", journalDirectory);
+
+ pagingDirectory = getString(e, "paging-directory", pagingDirectory);
+
+ pageFileSize = getLong(e, "page-file-size", pageFileSize);
createJournalDir = getBoolean(e, "create-journal-dir", createJournalDir);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -23,8 +23,10 @@
package org.jboss.messaging.core.paging;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -40,9 +42,10 @@
int getNumberOfPages();
- String getStoreName();
+ SimpleString getStoreName();
- void startPaging() throws Exception;
+ /** @return true if paging was started, or false if paging was already started before this call */
+ boolean startPaging() throws Exception;
boolean isPaging();
@@ -58,5 +61,13 @@
* @throws Exception
*/
Page dequeuePage() throws Exception;
+
+ /**
+ *
+ * @param postOffice
+ * @return false if a thread was already started, or if not in page mode
+ * @throws Exception
+ */
+ boolean startDequeueThread(PostOffice postOffice, long maxSize) throws Exception;
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -33,6 +33,6 @@
public interface PagingStoreFactory
{
- PagingStore newStore(String destinationName);
+ PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -43,13 +44,13 @@
// Attributes ----------------------------------------------------
private final String directory;
- private final int pageSize;
+ private final long pageSize;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PagingManagerFactoryNIO(final String directory, final int pageSize)
+ public PagingManagerFactoryNIO(final String directory, final long pageSize)
{
this.directory = directory;
this.pageSize = pageSize;
@@ -57,9 +58,9 @@
// Public --------------------------------------------------------
- public PagingStore newStore(String destinationName)
+ public PagingStore newStore(SimpleString destinationName)
{
- final String destinationDirectory = directory + "/" + destinationName;
+ final String destinationDirectory = directory + "/" + destinationName.toString();
File destinationFile = new File(destinationDirectory);
destinationFile.mkdirs();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -59,7 +59,7 @@
// Public --------------------------------------------------------
- public PagingStore getPageStore(SimpleString storeName) throws Exception
+ public PagingStore getPageStore(final SimpleString storeName) throws Exception
{
validateStarted();
@@ -67,7 +67,7 @@
if (store == null)
{
- store = newStore(storeName.toString());
+ store = newStore(storeName);
PagingStore oldStore = stores.putIfAbsent(storeName, store);
@@ -111,7 +111,7 @@
// Private -------------------------------------------------------
- private PagingStore newStore(String destinationName)
+ private PagingStore newStore(final SimpleString destinationName)
{
return pagingSPI.newStore(destinationName);
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -32,9 +32,12 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.util.SimpleString;
/**
*
@@ -45,6 +48,7 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingStoreImpl.class);
// Attributes ----------------------------------------------------
@@ -52,12 +56,14 @@
private final AtomicInteger pageUsedSize = new AtomicInteger(0);
- private final String storeName;
+ private final SimpleString storeName;
private final SequentialFileFactory factory;
private final long maxPageSize;
+ private volatile Thread dequeueThread;
+
private volatile int numberOfPages;
private volatile int firstPageId = Integer.MAX_VALUE;
private volatile int currentPageId;
@@ -75,7 +81,7 @@
// Constructors --------------------------------------------------
- public PagingStoreImpl(final SequentialFileFactory factory, final String storeName, final long maxPageSize)
+ public PagingStoreImpl(final SequentialFileFactory factory, final SimpleString storeName, final long maxPageSize)
{
this.factory = factory;
this.storeName = storeName;
@@ -106,7 +112,7 @@
return numberOfPages;
}
- public String getStoreName()
+ public SimpleString getStoreName()
{
return storeName;
}
@@ -242,15 +248,39 @@
}
}
+ public boolean startDequeueThread(final PostOffice postOffice, final long maxSize) throws Exception
+ {
+ if (!isPaging())
+ {
+ return false;
+ }
+ else
+ {
+ synchronized (this)
+ {
+ if (this.dequeueThread == null)
+ {
+ this.dequeueThread = new DequeueThread(postOffice, maxSize);
+ this.dequeueThread.start();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ }
+ }
+
// MessagingComponent implementation
- public boolean isStarted()
+ public synchronized boolean isStarted()
{
return initialized;
}
- public void stop() throws Exception
+ public synchronized void stop() throws Exception
{
if (initialized)
{
@@ -272,7 +302,7 @@
}
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
if (initialized)
@@ -324,7 +354,7 @@
}
}
- public void startPaging() throws Exception
+ public boolean startPaging() throws Exception
{
validateInit();
@@ -334,7 +364,12 @@
if (currentPage == null)
{
openNewPage();
+ return true;
}
+ else
+ {
+ return false;
+ }
}
finally
{
@@ -359,7 +394,13 @@
// Private -------------------------------------------------------
+
+ private synchronized void clearDequeueThread()
+ {
+ this.dequeueThread = null;
+ }
+
private void openNewPage() throws Exception
{
@@ -442,4 +483,47 @@
// Inner classes -------------------------------------------------
+ class DequeueThread extends Thread
+ {
+ final PostOffice postOffice;
+ final long maxSize;
+
+ public DequeueThread(final PostOffice postOffice, final long maxSize)
+ {
+ this.postOffice = postOffice;
+ this.maxSize = maxSize;
+ }
+
+
+ public void run()
+ {
+ try
+ {
+ while (postOffice.getSize(storeName) < maxSize)
+ {
+ Page page = dequeuePage();
+ if (page == null)
+ {
+ break;
+ }
+ page.open();
+ ServerMessage messages[] = page.read();
+ for (ServerMessage message: messages)
+ {
+ postOffice.routeAndDeliver(message);
+ }
+ page.delete();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error(e, e);
+ }
+ finally
+ {
+ PagingStoreImpl.this.clearDequeueThread();
+ }
+ }
+ }
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -81,6 +81,9 @@
Set<SimpleString> listAllDestinations();
+ void routeAndDeliver(final ServerMessage msg) throws Exception;
+
+
/**
* To be used by transactions only.
* If you're sure you will page if isPaging, just call the method page and look at its return.
@@ -90,17 +93,22 @@
boolean isPaging(SimpleString destination) throws Exception;
/**
- * If the destination is not being paged, this method will return false
+ * Page, only if destination is in page mode.
* @param message
- * @return
+ * @return false if destination is not on page mode
*/
boolean page(ServerMessage message) throws Exception;
/**
*
- * To be called whenever message is being deleted. (i.e. no references)
+ * To be called when there are no more references to the message
* @param message
*/
- void messageRemoved(ServerMessage message) throws Exception;
+ void messageDone(ServerMessage message) throws Exception;
+ /** To be called when a rollback is called after messageDone was called */
+ long addSize(ServerMessage message) throws Exception;
+
+ long getSize(SimpleString destination) throws Exception;
+
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.postoffice.FlowController;
@@ -50,6 +51,8 @@
import org.jboss.messaging.util.ConcurrentSet;
import org.jboss.messaging.util.SimpleString;
+import com.sun.org.apache.bcel.internal.generic.StoreInstruction;
+
/**
*
* A PostOfficeImpl
@@ -59,6 +62,9 @@
*/
public class PostOfficeImpl implements PostOffice
{
+
+ private static final long MAX_SIZE = 100 * 1024 * 1024;
+
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
//private final int nodeID;
@@ -102,6 +108,8 @@
public void start() throws Exception
{
+ pagingManager.start();
+
loadBindings();
started = true;
@@ -109,6 +117,8 @@
public void stop() throws Exception
{
+ pagingManager.stop();
+
mappings.clear();
destinations.clear();
@@ -135,7 +145,7 @@
{
storageManager.addDestination(address);
}
-
+
flowControllers.put(address, new FlowControllerImpl(address, this));
}
@@ -214,10 +224,33 @@
{
return nameMap.get(queueName);
}
+
+ public void routeAndDeliver(final ServerMessage message) throws Exception
+ {
+ List<MessageReference> refs = this.route(message);
+
+ if (message.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessage(message);
+ }
+
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
+ }
public List<MessageReference> route(final ServerMessage message) throws Exception
{
- getQueueSize(message.getDestination()).addAndGet(message.getEncodeSize());
+ if (addSize(message.getDestination(), message.getEncodeSize()) > MAX_SIZE)
+ {
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (store.startPaging())
+ {
+ log.info("Starting paging on " + message.getDestination());
+ }
+ }
SimpleString address = message.getDestination();
@@ -279,19 +312,34 @@
{
return pagingManager.getPageStore(destination).isPaging();
}
-
- public void messageRemoved(ServerMessage message)
+
+ public void messageDone(ServerMessage message) throws Exception
{
- addSize(message.getDestination(), message.getEncodeSize() * -1);
+ final long size = addSize(message.getDestination(), message.getEncodeSize() * -1);
+
+ if (size < MAX_SIZE)
+ {
+ PagingStore manager = pagingManager.getPageStore(message.getDestination());
+ if (manager.startDequeueThread(this, MAX_SIZE))
+ {
+ log.info("Starting dequeing page for " + message.getDestination());
+ }
+
+ }
}
+
+ /** To be called when a rollback is called after messageDone was called */
+ public long addSize(ServerMessage message) throws Exception
+ {
+ return addSize(message.getDestination(), message.getEncodeSize());
+ }
+
public boolean page(ServerMessage message) throws Exception
{
return pagingManager.getPageStore(message.getDestination()).writeOnCurrentPage(message);
}
-
-
public Map<SimpleString, List<Binding>> getMappings()
{
return mappings;
@@ -301,14 +349,19 @@
{
return flowControllers.get(address);
}
+
+ public long getSize(SimpleString destination)
+ {
+ return getQueueSize(destination).get();
+ }
// Private -----------------------------------------------------------------
- private void addSize(SimpleString destination, long size)
+ private long addSize(SimpleString destination, long size)
{
- getQueueSize(destination).addAndGet(size);
totalSize.addAndGet(size);
+ return getQueueSize(destination).addAndGet(size);
}
private AtomicLong getQueueSize(SimpleString destination)
@@ -424,5 +477,11 @@
}
storageManager.loadMessages(this, queues);
+
+ for (SimpleString destination: dests)
+ {
+ PagingStore store = pagingManager.getPageStore(destination);
+ store.startDequeueThread(this, MAX_SIZE);
+ }
}
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/ServerMessage.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -29,6 +29,7 @@
* A ServerMessage
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface ServerMessage extends Message
@@ -49,6 +50,12 @@
int getDurableRefCount();
+ int decrementRefCount();
+
+ int incrementRefCount();
+
+ int getRefCount();
+
ServerMessage copy();
}
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.MessagingServerManagement;
import org.jboss.messaging.core.management.impl.MessagingServerManagementImpl;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.paging.impl.PagingManagerFactoryNIO;
import org.jboss.messaging.core.paging.impl.PagingManagerImpl;
@@ -168,8 +169,11 @@
securityStore = new SecurityStoreImpl(configuration.getSecurityInvalidationInterval(), configuration.isSecurityEnabled());
queueSettingsRepository.setDefault(new QueueSettings());
scheduledExecutor = new ScheduledThreadPoolExecutor(configuration.getScheduledThreadPoolMaxSize(), new JBMThreadFactory("JBM-scheduled-threads"));
- queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
- postOffice = new PostOfficeImpl(storageManager, new PagingManagerImpl(new PagingManagerFactoryNIO("/tmp/factory", 10*1024*1024)),
+ queueFactory = new QueueFactoryImpl(scheduledExecutor, queueSettingsRepository);
+
+ PagingManager pagingManager = new PagingManagerImpl(new PagingManagerFactoryNIO(configuration.getPagingDirectory(), configuration.getPageFileSize()));
+
+ postOffice = new PostOfficeImpl(storageManager, pagingManager,
queueFactory, configuration.isRequireDestinations());
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -36,6 +36,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ataylor at redhat.com">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class ServerMessageImpl extends MessageImpl implements ServerMessage
@@ -45,6 +46,9 @@
private long connectionID;
private final AtomicInteger durableRefCount = new AtomicInteger(0);
+
+ /** Global reference counts for paging control */
+ private final AtomicInteger refCount = new AtomicInteger(0);
/*
* Constructor for when reading from network
@@ -113,6 +117,8 @@
durableRefCount.incrementAndGet();
}
+ refCount.incrementAndGet();
+
return ref;
}
@@ -131,6 +137,21 @@
return durableRefCount.incrementAndGet();
}
+ public int decrementRefCount()
+ {
+ return refCount.decrementAndGet();
+ }
+
+ public int getRefCount()
+ {
+ return refCount.get();
+ }
+
+ public int incrementRefCount()
+ {
+ return refCount.incrementAndGet();
+ }
+
public ServerMessage copy()
{
return new ServerMessageImpl(this);
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -306,17 +306,20 @@
if (autoCommitSends)
{
- List<MessageReference> refs = postOffice.route(msg);
-
- if (msg.getDurableRefCount() != 0)
+ if (!postOffice.page(msg))
{
- storageManager.storeMessage(msg);
+ List<MessageReference> refs = postOffice.route(msg);
+
+ if (msg.getDurableRefCount() != 0)
+ {
+ storageManager.storeMessage(msg);
+ }
+
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
}
-
- for (MessageReference ref : refs)
- {
- ref.getQueue().addLast(ref);
- }
}
else
{
@@ -1138,6 +1141,11 @@
Queue queue = ref.getQueue();
+ if (message.decrementRefCount() == 0)
+ {
+ postOffice.messageDone(message);
+ }
+
if (message.isDurable() && queue.isDurable())
{
int count = message.decrementDurableRefCount();
Modified: branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -126,6 +126,14 @@
acknowledgements.add(acknowledgement);
ServerMessage message = acknowledgement.getMessage();
+
+ if (message.decrementRefCount() == 0)
+ {
+ if (postOffice != null)
+ {
+ postOffice.messageDone(message);
+ }
+ }
if (message.isDurable())
{
@@ -269,6 +277,14 @@
// Reverse the decrements we did in the tx
message.incrementDurableRefCount();
}
+
+ // Putting back the size to control paging
+ if (message.incrementRefCount() == 1)
+ {
+ postOffice.addSize(message);
+ }
+
+ message.incrementRefCount();
LinkedList<MessageReference> list = queueMap.get(queue);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/performance/persistence/FakePostOffice.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -109,7 +109,6 @@
public boolean removeDestination(SimpleString address, boolean temporary)
throws Exception
{
- // TODO Auto-generated method stub
return false;
}
@@ -139,7 +138,7 @@
return false;
}
- public void messageRemoved(ServerMessage message) throws Exception
+ public void messageDone(ServerMessage message) throws Exception
{
}
@@ -147,5 +146,22 @@
{
return false;
}
+
+ public long addSize(ServerMessage message) throws Exception
+ {
+ return 0;
+ }
+
+ public long getSize(SimpleString destination) throws Exception
+ {
+ return 0;
+ }
+
+ public void routeAndDeliver(ServerMessage msg) throws Exception
+ {
+ throw new IllegalStateException("Not implemented!");
+ }
+
+
}
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -69,7 +69,7 @@
PagingStore store = EasyMock.createNiceMock(PagingStore.class);
- EasyMock.expect(spi.newStore(destination.toString())).andReturn(store);
+ EasyMock.expect(spi.newStore(destination)).andReturn(store);
store.start();
@@ -113,9 +113,9 @@
EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
- PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination.toString(), 1);
+ PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, 1);
- EasyMock.expect(spi.newStore(destination.toString())).andStubReturn(storeImpl);
+ EasyMock.expect(spi.newStore(destination)).andStubReturn(storeImpl);
EasyMock.replay(spi, factory);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -46,6 +46,8 @@
// Constants -----------------------------------------------------
+ private final static SimpleString destinationTestName = new SimpleString("test");
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -58,7 +60,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+ PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
storeImpl.start();
@@ -74,7 +76,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+ PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
storeImpl.start();
@@ -101,7 +103,7 @@
storeImpl.sync();
- storeImpl = new PagingStoreImpl(factory, "test", 1024 * 2);
+ storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 2);
storeImpl.start();
@@ -113,7 +115,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+ PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
storeImpl.start();
@@ -165,7 +167,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", 1024 * 10);
+ TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, 1024 * 10);
storeImpl.start();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -80,7 +80,7 @@
final ArrayList<Page> readPages = new ArrayList<Page>();
- final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, "test", MAX_SIZE);
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
storeImpl.start();
@@ -226,7 +226,7 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, "test", MAX_SIZE);
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), MAX_SIZE);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -145,6 +145,7 @@
serverMessage.setExpiration(0);
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
+ EasyMock.expect(serverMessage.decrementRefCount()).andReturn(1);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
EasyMock.replay(sm, po, repos, serverMessage, queue);
@@ -155,37 +156,61 @@
public void testCancelToDLQDoesntExist() throws Exception
{
QueueSettings queueSettings = new QueueSettings();
+
queueSettings.setMaxDeliveryAttempts(1);
+
SimpleString dlqName = new SimpleString("testDLQ");
+
queueSettings.setDLQ(dlqName);
+
Binding dlqBinding = EasyMock.createStrictMock(Binding.class);
+
StorageManager sm = EasyMock.createNiceMock(StorageManager.class);
+
PostOffice po = EasyMock.createStrictMock(PostOffice.class);
+
HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
+
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
+
Queue queue = EasyMock.createStrictMock(Queue.class);
+
MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
+
messageReference.setDeliveryCount(1);
+
SimpleString queueName = new SimpleString("queueName");
+
queue.referenceCancelled();
+
EasyMock.expect(queue.getName()).andStubReturn(queueName);
EasyMock.expect(repos.getMatch(queueName.toString())).andStubReturn(queueSettings);
+// EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(1);
EasyMock.expect(serverMessage.isDurable()).andStubReturn(true);
EasyMock.expect(serverMessage.getMessageID()).andStubReturn(999l);
+ EasyMock.expect(serverMessage.decrementRefCount()).andStubReturn(1);
EasyMock.expect(queue.isDurable()).andStubReturn(true);
+
sm.updateDeliveryCount(messageReference);
+
EasyMock.expect(po.getBinding(dlqName)).andReturn(null);
EasyMock.expect(po.addBinding(dlqName, dlqName, null, true, false)).andReturn(dlqBinding);
EasyMock.expect(serverMessage.copy()).andReturn(serverMessage);
EasyMock.expect(sm.generateMessageID()).andReturn(2l);
+
serverMessage.setMessageID(2);
+
serverMessage.setExpiration(0);
+
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
+
EasyMock.replay(sm, po, repos, serverMessage, queue);
+
assertFalse(messageReference.cancel(sm, po, repos));
+
EasyMock.verify(sm, po, repos, serverMessage, queue);
}
@@ -208,6 +233,7 @@
EasyMock.expect(repos.getMatch(queueName.toString())).andStubReturn(queueSettings);
EasyMock.expect(serverMessage.isDurable()).andStubReturn(true);
EasyMock.expect(serverMessage.getMessageID()).andStubReturn(999l);
+ EasyMock.expect(serverMessage.decrementRefCount()).andReturn(1);
EasyMock.expect(queue.isDurable()).andStubReturn(true);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
EasyMock.expect(sm.generateTransactionID()).andReturn(1l);
@@ -250,6 +276,9 @@
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+ EasyMock.expect(serverMessage.decrementRefCount()).andStubReturn(1);
+
+
EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
messageReference.expire(sm, po, repos);
EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
@@ -289,6 +318,8 @@
EasyMock.expect(po.route(serverMessage)).andReturn(new ArrayList<MessageReference>());
EasyMock.expect(serverMessage.getDurableRefCount()).andReturn(0);
EasyMock.expect(serverMessage.decrementDurableRefCount()).andReturn(0);
+ EasyMock.expect(serverMessage.decrementRefCount()).andStubReturn(1);
+
EasyMock.replay(sm, po, repos, serverMessage, queue, expQBinding);
messageReference.expire(sm, po, repos);
EasyMock.verify(sm, po, repos, serverMessage, queue, expQBinding);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -34,6 +34,7 @@
import org.easymock.EasyMock;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.DistributionPolicy;
import org.jboss.messaging.core.server.HandleStatus;
@@ -738,6 +739,9 @@
public void testDeleteAllReferences() throws Exception
{
+
+ PostOffice postOffice = EasyMock.createNiceMock(PostOffice.class);
+
Queue queue = new QueueImpl(1, queue1, null, false, true, false, -1, scheduledExecutor);
StorageManager storageManager = EasyMock.createStrictMock(StorageManager.class);
Modified: branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java
===================================================================
--- branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java 2008-08-18 16:13:23 UTC (rev 4832)
+++ branches/Branch_JBMESSAGING-1314/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerConsumerImplTest.java 2008-08-18 23:26:08 UTC (rev 4833)
@@ -195,6 +195,7 @@
{
MessageReference messageReference = createStrictMock(MessageReference.class);
ServerMessage message = createStrictMock(ServerMessage.class);
+ expect(message.decrementRefCount()).andStubReturn(1);
expect(messageReference.getMessage()).andStubReturn(message);
ServerConsumerImpl consumer = create(1, 999l, false, true, true);
expect(messageReference.getQueue()).andStubReturn(queue);
More information about the jboss-cvs-commits
mailing list