[jboss-cvs] JBoss Messaging SVN: r4922 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Sep 8 19:43:58 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-09-08 19:43:58 -0400 (Mon, 08 Sep 2008)
New Revision: 4922
Modified:
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1314 - refactoring... Moving methods around.
(Tests on depaging on the delivery thread)
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -66,6 +66,8 @@
void sync() throws Exception;
+ public boolean readPage() throws Exception;
+
boolean page(PageMessage message) throws Exception;
/**
@@ -79,11 +81,10 @@
/**
*
- * @param postOffice
* @return false if a thread was already started, or if not in page mode
* @throws Exception
*/
- boolean startDepaging(PagingManager listener) throws Exception;
+ boolean startDepaging() throws Exception;
LastPageRecord getLastRecord();
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStoreFactory.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -38,4 +38,6 @@
PagingStore newStore(org.jboss.messaging.util.SimpleString destinationName, QueueSettings queueSettings);
+ void setPagingManager(PagingManager manager);
+
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerFactoryNIO.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.settings.impl.QueueSettings;
@@ -52,6 +53,8 @@
private final Executor executor;
+ private PagingManager pagingManager;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -76,8 +79,13 @@
File destinationFile = new File(destinationDirectory);
destinationFile.mkdirs();
- return new PagingStoreImpl(newFileFactory(destinationDirectory), destinationName, settings, executor);
+ return new PagingStoreImpl(pagingManager, newFileFactory(destinationDirectory), destinationName, settings, executor);
}
+
+ public void setPagingManager(PagingManager manager)
+ {
+ this.pagingManager = manager;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -393,7 +393,7 @@
{
if ( maxSize > 0 && addressSize < (maxSize - pageSize))
{
- if (store.startDepaging(this))
+ if (store.startDepaging())
{
log.info("Starting depaging Thread, size = " + addressSize);
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -71,6 +71,8 @@
private boolean droppedMessages;
+ private final PagingManager pagingManager;
+
private final Executor executor;
// Bytes consumed by the queue on the memory
@@ -96,7 +98,7 @@
// Constructors --------------------------------------------------
- public PagingStoreImpl(final SequentialFileFactory fileFactory, final SimpleString storeName, QueueSettings queueSettings, Executor executor)
+ public PagingStoreImpl(final PagingManager pagingManager, final SequentialFileFactory fileFactory, final SimpleString storeName, QueueSettings queueSettings, Executor executor)
{
this.fileFactory = fileFactory;
this.storeName = storeName;
@@ -104,6 +106,7 @@
this.pageSize = queueSettings.getPageSizeBytes();
this.dropMessagesOnSize = queueSettings.isDropMessagesWhenFull();
this.executor = executor;
+ this.pagingManager = pagingManager;
}
@@ -173,6 +176,35 @@
return storeName;
}
+
+
+ /**
+ * Depage one page-file, read it and send it to the pagingManager
+ * @return
+ * @throws Exception
+ */
+ public boolean readPage() throws Exception
+ {
+ Page page = depage();
+ if (page == null)
+ {
+ if (lastPageRecord != null)
+ {
+ pagingManager.clearLastPageRecord(lastPageRecord);
+ }
+ lastPageRecord = null;
+ return false;
+ }
+ page.open();
+ PageMessage messages[] = page.read();
+ boolean needMorePages = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
+ page.delete();
+
+ return needMorePages;
+
+ }
+
+
/**
* It returns a Page out of the Page System without reading it.
* The method calling this method will remove the page and will start reading it outside of any locks.
@@ -331,7 +363,7 @@
}
}
- public boolean startDepaging(final PagingManager manager) throws Exception
+ public boolean startDepaging() throws Exception
{
lock.readLock().lock();
try
@@ -346,7 +378,7 @@
{
if (this.dequeueThread == null)
{
- this.dequeueThread = new DepageRunnable(manager);
+ this.dequeueThread = new DepageRunnable();
executor.execute(this.dequeueThread);
return true;
}
@@ -607,11 +639,8 @@
class DepageRunnable implements Runnable
{
- final PagingManager manager;
-
- public DepageRunnable(final PagingManager manager)
+ public DepageRunnable()
{
- this.manager = manager;
}
@@ -622,20 +651,7 @@
boolean needMorePages = false;
do
{
- Page page = depage();
- if (page == null)
- {
- if (lastPageRecord != null)
- {
- manager.clearLastPageRecord(lastPageRecord);
- }
- lastPageRecord = null;
- break;
- }
- page.open();
- PageMessage messages[] = page.read();
- needMorePages = manager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
- page.delete();
+ needMorePages = readPage();
}
while (needMorePages);
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -437,7 +437,7 @@
for (SimpleString destination: dests)
{
PagingStore store = pagingManager.getPageStore(destination);
- store.startDepaging(pagingManager);
+ store.startDepaging();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -189,6 +189,8 @@
PagingStoreFactory storeFactory = new PagingManagerFactoryNIO(configuration.getPagingDirectory());
pagingManager = new PagingManagerImpl(storeFactory, storageManager, queueSettingsRepository);
+
+ storeFactory.setPagingManager(pagingManager);
resourceManager = new ResourceManagerImpl(0);
postOffice =
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -129,7 +129,7 @@
EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
- PagingStoreImpl storeImpl = new PagingStoreImpl(factory, destination, new QueueSettings(), Executors.newSingleThreadExecutor());
+ PagingStoreImpl storeImpl = new PagingStoreImpl(manager, factory, destination, new QueueSettings(), Executors.newSingleThreadExecutor());
EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andStubReturn(storeImpl);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -62,7 +62,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+ PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
storeImpl.start();
@@ -78,7 +78,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+ PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
storeImpl.start();
@@ -105,7 +105,7 @@
storeImpl.sync();
- storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+ storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
storeImpl.start();
@@ -117,7 +117,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- PagingStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+ PagingStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
storeImpl.start();
@@ -173,7 +173,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory();
- TestSupportPageStore storeImpl = new PagingStoreImpl(factory, destinationTestName, new QueueSettings(), executor);
+ TestSupportPageStore storeImpl = new PagingStoreImpl(null, factory, destinationTestName, new QueueSettings(), executor);
storeImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-09-08 22:21:13 UTC (rev 4921)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-09-08 23:43:58 UTC (rev 4922)
@@ -101,7 +101,7 @@
QueueSettings settings = new QueueSettings();
settings.setPageSizeBytes(MAX_SIZE);
- final TestSupportPageStore storeImpl = new PagingStoreImpl(factory, new SimpleString("test"), settings, executor);
+ final TestSupportPageStore storeImpl = new PagingStoreImpl(null, factory, new SimpleString("test"), settings, executor);
storeImpl.start();
@@ -251,7 +251,7 @@
fileTmp.close();
}
- TestSupportPageStore storeImpl2 = new PagingStoreImpl(factory, new SimpleString("test"), settings, executor);
+ TestSupportPageStore storeImpl2 = new PagingStoreImpl(null, factory, new SimpleString("test"), settings, executor);
storeImpl2.start();
int numberOfPages = storeImpl2.getNumberOfPages();
More information about the jboss-cvs-commits
mailing list