Author: clebert.suconic(a)jboss.com
Date: 2009-11-05 21:01:22 -0500 (Thu, 05 Nov 2009)
New Revision: 8230
Modified:
trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Fixing PageStoreTest and few other minor tweaks
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-11-05 19:01:30
UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-11-06 02:01:22
UTC (rev 8230)
@@ -39,9 +39,9 @@
/** The factory may need to do some initialization before the file is activated.
* this was added as a hook for AIO to initialize the Observer on TimedBuffer.
* It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
- void activate(SequentialFile file);
+ void activateBuffer(SequentialFile file);
- void deactivate(SequentialFile file);
+ void deactivateBuffer();
// To be used in tests only
ByteBuffer wrapBuffer(byte[] bytes);
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-05
19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-11-06
02:01:22 UTC (rev 8230)
@@ -36,10 +36,10 @@
*/
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
-
+
// Timeout used to wait executors to shutdown
private static final int EXECUTOR_TIMEOUT = 60;
-
+
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
private static final boolean trace = log.isTraceEnabled();
@@ -77,21 +77,22 @@
}
public AIOSequentialFileFactory(final String journalDir,
- int bufferSize,
- long bufferTimeout,
- boolean flushOnSync,
- boolean logRates)
+ final int bufferSize,
+ final long bufferTimeout,
+ final boolean flushOnSync,
+ final boolean logRates)
{
super(journalDir);
this.bufferSize = bufferSize;
this.bufferTimeout = bufferTimeout;
- this.timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync,
logRates);
+ timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
}
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
*/
- public void activate(SequentialFile file)
+ @Override
+ public void activateBuffer(final SequentialFile file)
{
final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
timedBuffer.disableAutoFlush();
@@ -105,12 +106,14 @@
}
}
+ @Override
public void flush()
{
timedBuffer.flush();
}
- public void deactivate(SequentialFile file)
+ @Override
+ public void deactivateBuffer()
{
timedBuffer.flush();
timedBuffer.setObserver(null);
@@ -179,33 +182,37 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
*/
- public void releaseBuffer(ByteBuffer buffer)
+ @Override
+ public void releaseBuffer(final ByteBuffer buffer)
{
AsynchronousFileImpl.destroyBuffer(buffer);
}
+ @Override
public void start()
{
timedBuffer.start();
-
+
writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
-
true));
+ true));
pollerExecutor = Executors.newCachedThreadPool(new
HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
-
true));
+ true));
-
}
+ @Override
public void stop()
{
buffersControl.stop();
+
timedBuffer.stop();
-
- this.writeExecutor.shutdown();
+
+ writeExecutor.shutdown();
+
try
{
- if (!this.writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
{
log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
}
@@ -213,12 +220,12 @@
catch (InterruptedException e)
{
}
-
- this.pollerExecutor.shutdown();
+ pollerExecutor.shutdown();
+
try
{
- if (!this.pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ if (!pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
{
log.warn("Timed out on AIO poller shutdown", new
Exception("Timed out on AIO writer shutdown"));
}
@@ -228,9 +235,10 @@
}
}
+ @Override
protected void finalize()
{
- this.stop();
+ stop();
}
/** Class that will control buffer-reuse */
@@ -255,7 +263,9 @@
if (bufferSize > 0 && System.currentTimeMillis() -
bufferReuseLastTime > 10000)
{
if (trace)
+ {
trace("Clearing reuse buffers queue with " +
reuseBuffersQueue.size() + " elements");
+ }
bufferReuseLastTime = System.currentTimeMillis();
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-05
19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-11-06
02:01:22 UTC (rev 8230)
@@ -52,7 +52,7 @@
{
}
- public void activate(SequentialFile file)
+ public void activateBuffer(SequentialFile file)
{
}
@@ -60,7 +60,7 @@
{
}
- public void deactivate(SequentialFile file)
+ public void deactivateBuffer()
{
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-05 19:01:30
UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-06 02:01:22
UTC (rev 8230)
@@ -213,15 +213,15 @@
* Commits and rollbacks are also counted as negatives. We need to fix those also.
* @param dependencies
*/
- public void fixDependencies(final JournalFile originalFile, final
ArrayList<JournalFile> dependencies) throws Exception
+ public void fixDependencies(final JournalFile originalFile, final
ArrayList<JournalFile> dependencies) throws Exception
{
for (JournalFile dependency : dependencies)
{
fixDependency(originalFile, dependency);
}
-
+
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -256,15 +256,17 @@
{
JournalReaderCallback txfix = new JournalReaderCallbackAbstract()
{
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws
Exception
+ @Override
+ public void onReadCommitRecord(final long transactionID, final int
numberOfRecords) throws Exception
{
if (transactionCounter.containsKey(transactionID))
{
dependency.incNegCount(originalFile);
}
}
-
- public void onReadRollbackRecord(long transactionID) throws Exception
+
+ @Override
+ public void onReadRollbackRecord(final long transactionID) throws Exception
{
if (transactionCounter.containsKey(transactionID))
{
@@ -272,11 +274,10 @@
}
}
};
-
+
JournalImpl.readJournalFile(fileFactory, dependency, txfix);
}
-
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-05 19:01:30 UTC
(rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-06 02:01:22 UTC
(rev 8230)
@@ -2058,7 +2058,7 @@
openFile(currentFile, true);
}
- fileFactory.activate(currentFile.getFile());
+ fileFactory.activateBuffer(currentFile.getFile());
pushOpenedFile();
@@ -2559,7 +2559,7 @@
log.warn("Couldn't stop journal executor after 60 seconds");
}
- fileFactory.flush();
+ fileFactory.deactivateBuffer();
if (currentFile != null && currentFile.getFile().isOpen())
{
@@ -3037,7 +3037,7 @@
trace("moveNextFile: " + currentFile.getFile().getFileName() + "
sync: " + synchronous);
}
- fileFactory.activate(currentFile.getFile());
+ fileFactory.activateBuffer(currentFile.getFile());
}
/**
@@ -3171,7 +3171,7 @@
private void closeFile(final JournalFile file, final boolean synchronous)
{
- fileFactory.deactivate(file.getFile());
+ fileFactory.deactivateBuffer();
pendingCloseFiles.add(file);
Runnable run = new Runnable()
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-05
19:01:30 UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-11-06
02:01:22 UTC (rev 8230)
@@ -30,14 +30,14 @@
public class NIOSequentialFileFactory extends AbstractSequentialFactory implements
SequentialFileFactory
{
private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
-
+
public NIOSequentialFileFactory(final String journalDir)
{
super(journalDir);
-
+
if (journalDir == null)
{
- new Exception ("journalDir is null").printStackTrace();
+ new Exception("journalDir is null").printStackTrace();
}
}
@@ -46,7 +46,7 @@
{
return new NIOSequentialFile(journalDir, fileName);
}
-
+
public boolean isSupportsCallbacks()
{
return false;
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-05 19:01:30
UTC (rev 8229)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-06 02:01:22
UTC (rev 8230)
@@ -60,7 +60,7 @@
public class PagingStoreImpl implements TestSupportPageStore
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(PagingStoreImpl.class);
// Attributes ----------------------------------------------------
@@ -210,7 +210,7 @@
public boolean isPaging()
{
currentPageLock.readLock().lock();
-
+
try
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
@@ -271,7 +271,6 @@
checkReleaseProducerFlowControlCredits(-credits);
}
-
public void addSize(final ServerMessage message, final boolean add) throws Exception
{
long size = message.getMemoryEstimate();
@@ -307,7 +306,7 @@
addSize(-size);
}
}
-
+
public boolean page(final ServerMessage message, final long transactionID, final
boolean duplicateDetection) throws Exception
{
// The sync on transactions is done on commit only
@@ -320,7 +319,7 @@
// of crash
return page(message, -1, syncNonTransactional && message.isDurable(),
duplicateDetection);
}
-
+
public void sync() throws Exception
{
currentPageLock.readLock().lock();
@@ -482,6 +481,7 @@
currentPageLock.readLock().lock();
try
{
+ // Already paging, nothing to be done
if (currentPage != null)
{
return false;
@@ -515,7 +515,6 @@
}
}
-
public Page getCurrentPage()
{
return currentPage;
@@ -597,18 +596,16 @@
{
firstPageId = Integer.MAX_VALUE;
- if (currentPage != null)
+ if (currentPage == null)
{
- returnPage = currentPage;
- returnPage.close();
- currentPage = null;
- }
- else
- {
// sanity check... it shouldn't happen!
throw new IllegalStateException("CurrentPage is null");
}
+ returnPage = currentPage;
+ returnPage.close();
+ currentPage = null;
+
// The current page is empty... which means we reached the end of the
pages
if (returnPage.getNumberOfMessages() == 0)
{
@@ -679,7 +676,6 @@
}
-
private synchronized void checkReleaseProducerFlowControlCredits(final long size)
{
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize
!= -1)
@@ -702,7 +698,6 @@
}
}
-
private void addSize(final long size) throws Exception
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
@@ -734,7 +729,6 @@
}
else
{
- // When in Global mode, we use the default page size as the mark to start
depage
if (maxSize > 0 && currentPage != null && addressSize
<= maxSize - pageSize && !depaging.get())
{
if (startDepaging())
@@ -750,8 +744,11 @@
return;
}
}
-
- private boolean page(final ServerMessage message, final long transactionID, final
boolean sync, final boolean duplicateDetection) throws Exception
+
+ private boolean page(final ServerMessage message,
+ final long transactionID,
+ final boolean sync,
+ final boolean duplicateDetection) throws Exception
{
if (!running)
{
@@ -813,7 +810,7 @@
{
// We set the duplicate detection header to prevent the message being depaged
more than once in case of
// failure during depage
-
+
byte[] bytes = new byte[8];
ByteBuffer buff = ByteBuffer.wrap(bytes);
@@ -823,8 +820,19 @@
message.putBytesProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, bytes);
}
- int bytesToWrite = message.getEncodeSize() + PageImpl.SIZE_RECORD;
+ PagedMessage pagedMessage;
+ if (transactionID != -1)
+ {
+ pagedMessage = new PagedMessageImpl(message, transactionID);
+ }
+ else
+ {
+ pagedMessage = new PagedMessageImpl(message);
+ }
+
+ int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
+
if (currentPageSize.addAndGet(bytesToWrite) > pageSize &&
currentPage.getNumberOfMessages() > 0)
{
// Make sure nothing is currently validating or using currentPage
@@ -846,32 +854,14 @@
try
{
- if (currentPage != null)
- {
- PagedMessage pagedMessage;
-
- if (transactionID != -1)
- {
- pagedMessage = new PagedMessageImpl(message, transactionID);
- }
- else
- {
- pagedMessage = new PagedMessageImpl(message);
- }
-
- currentPage.write(pagedMessage);
+ currentPage.write(pagedMessage);
- if (sync)
- {
- currentPage.sync();
- }
-
- return true;
- }
- else
+ if (sync)
{
- return false;
+ currentPage.sync();
}
+
+ return true;
}
finally
{
@@ -884,7 +874,7 @@
}
}
-
+
/**
* This method will remove files from the page system and and route them, doing it
transactionally
*
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-05
19:01:30 UTC (rev 8229)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-11-06
02:01:22 UTC (rev 8230)
@@ -83,7 +83,7 @@
clearData();
Configuration config = createDefaultConfig();
-
+
HornetQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, new
HashMap<String, AddressSettings>());
server.start();
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-05
19:01:30 UTC (rev 8229)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-06
02:01:22 UTC (rev 8230)
@@ -677,7 +677,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
*/
- public void activate(SequentialFile file)
+ public void activateBuffer(SequentialFile file)
{
}
@@ -691,7 +691,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.journal.SequentialFileFactory#deactivate(org.hornetq.core.journal.SequentialFile)
*/
- public void deactivate(SequentialFile file)
+ public void deactivateBuffer()
{
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-05
19:01:30 UTC (rev 8229)
+++
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-06
02:01:22 UTC (rev 8230)
@@ -33,7 +33,6 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -77,8 +76,6 @@
{
// Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(PagingStoreImplTest.class);
private final static SimpleString destinationTestName = new
SimpleString("test");
@@ -168,15 +165,15 @@
}
-// public void testPageWithNIO() throws Exception
-// {
-// // This integration test could fail 1 in 100 due to race conditions.
-// for (int i = 0; i < 100; i++)
-// {
-// recreateDirectory(getTestDir());
-// testConcurrentPaging(new NIOSequentialFileFactory(getTestDir()), 1);
-// }
-// }
+ public void testPageWithNIO() throws Exception
+ {
+ // This integration test could fail 1 in 100 due to race conditions.
+ for (int i = 0; i < 100; i++)
+ {
+ recreateDirectory(getTestDir());
+ testConcurrentPaging(new NIOSequentialFileFactory(getTestDir()), 1);
+ }
+ }
public void testStore() throws Exception
{
@@ -431,13 +428,13 @@
}
-// public void testConcurrentDepage() throws Exception
-// {
-// SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
-//
-// testConcurrentPaging(factory, 10);
-// }
+ public void testConcurrentDepage() throws Exception
+ {
+ SequentialFileFactory factory = new FakeSequentialFileFactory(1, false);
+ testConcurrentPaging(factory, 10);
+ }
+
protected void testConcurrentPaging(final SequentialFileFactory factory, final int
numberOfThreads) throws Exception,
InterruptedException
{
@@ -474,28 +471,35 @@
assertEquals(0, storeImpl.getNumberOfPages());
+ // Marked the store to be paged
storeImpl.startPaging();
assertEquals(1, storeImpl.getNumberOfPages());
final SimpleString destination = new SimpleString("test");
- class ProducerThread extends Thread
+ class WriterThread extends Thread
{
+
Exception e;
@Override
public void run()
{
+
try
{
boolean firstTime = true;
while (true)
{
long id = messageIdGenerator.incrementAndGet();
- ServerMessage msg = createMessage(storeImpl, destination,
createRandomBuffer(id, 5));
- if (storeImpl.page(msg, true))
- {
+
+ // Each thread will Keep paging until all the messages are depaged.
+ // This is possible because the depage thread is not actually reading
the pages.
+ // Just using the internal API to remove it from the page file system
+ ServerMessage msg = createMessage(storeImpl, destination,
createRandomBuffer(id, 5));
+ if (storeImpl.page(msg, false))
+ {
buffers.put(id, msg);
}
else
@@ -505,6 +509,7 @@
if (firstTime)
{
+ // We have at least one data paged. So, we can start depaging now
latchStart.countDown();
firstTime = false;
}
@@ -522,7 +527,7 @@
}
}
- class ConsumerThread extends Thread
+ class ReaderThread extends Thread
{
Exception e;
@@ -533,12 +538,10 @@
{
// Wait every producer to produce at least one message
latchStart.await();
+
while (aliveProducers.get() > 0)
{
Page page = storeImpl.depage();
-
- //log.info("depaged " + page);
-
if (page != null)
{
readPages.add(page);
@@ -553,15 +556,15 @@
}
}
- ProducerThread producerThread[] = new ProducerThread[numberOfThreads];
+ WriterThread producerThread[] = new WriterThread[numberOfThreads];
for (int i = 0; i < numberOfThreads; i++)
{
- producerThread[i] = new ProducerThread();
+ producerThread[i] = new WriterThread();
producerThread[i].start();
}
- ConsumerThread consumer = new ConsumerThread();
+ ReaderThread consumer = new ReaderThread();
consumer.start();
for (int i = 0; i < numberOfThreads; i++)
@@ -611,7 +614,7 @@
{
SequentialFile fileTmp = factory.createSequentialFile(file, 1);
fileTmp.open();
- assertTrue(fileTmp.size() + " <= " + MAX_SIZE, fileTmp.size() <=
MAX_SIZE);
+ assertTrue("The page file size (" + fileTmp.size() + ")
shouldn't be > " + MAX_SIZE, fileTmp.size() <= MAX_SIZE);
fileTmp.close();
}