[jboss-cvs] JBoss Messaging SVN: r5459 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 3 20:40:14 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-03 20:40:14 -0500 (Wed, 03 Dec 2008)
New Revision: 5459
Modified:
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/paging/Page.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
Paging tweaks
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -78,6 +78,6 @@
long size() throws Exception;
- void renameTo(SequentialFile file) throws Exception;
+ void renameTo(String newFileName) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -182,7 +182,7 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.journal.SequentialFile#renameTo(org.jboss.messaging.core.journal.SequentialFile)
*/
- public void renameTo(SequentialFile file) throws Exception
+ public void renameTo(String fileName) throws Exception
{
throw new IllegalStateException ("method rename not supported on AIO");
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -1213,7 +1213,7 @@
}
else
{
- log.warn("Prepared transaction " + healthy + " wasn't considered completed, it will be ignored");
+ log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
tx.invalid = true;
}
@@ -1779,7 +1779,7 @@
if (counter == null)
{
- for (JournalFile lookupFile : orderedFiles)
+ for (JournalFile lookupFile : orderedFiles)
{
if (lookupFile.getOrderingID() == ref.a)
{
@@ -1796,7 +1796,7 @@
}
else
{
- // (V) Missing a record... Transaction was not completed as stated.
+ // (IV) Missing a record... Transaction was not completed as stated.
// we will ignore the whole transaction
// This is probably a hole caused by a crash during commit/prepare.
if (counter.get() != ref.b)
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -43,17 +43,21 @@
public class NIOSequentialFile implements SequentialFile
{
private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
+
private File file;
+ private final String directory;
+
private FileChannel channel;
private RandomAccessFile rfile;
BufferCallback bufferCallback;
- public NIOSequentialFile(final String journalDir, final String fileName)
+ public NIOSequentialFile(final String directory, final String fileName)
{
- this.file = new File(journalDir + "/" + fileName);
+ this.directory = directory;
+ file = new File(directory + "/" + fileName);
}
public int getAlignment()
@@ -70,7 +74,7 @@
{
return file.getName();
}
-
+
public synchronized boolean isOpen()
{
return channel != null;
@@ -114,7 +118,7 @@
}
public void close() throws Exception
- {
+ {
if (channel != null)
{
channel.close();
@@ -210,8 +214,7 @@
throw e;
}
}
-
-
+
public void sync() throws Exception
{
channel.force(false);
@@ -232,20 +235,18 @@
return channel.position();
}
- public void renameTo(SequentialFile newFile) throws Exception
+ public void renameTo(final String newFileName) throws Exception
{
close();
- this.file.renameTo(((NIOSequentialFile)newFile).file);
- file = ((NIOSequentialFile)newFile).file;
+ File newFile = new File(directory + "/" + newFileName);
+ file.renameTo(newFile);
+ file = newFile;
}
-
-
-
+
+ @Override
public String toString()
{
- return "NIOSequentialFile " + this.file;
+ return "NIOSequentialFile " + file;
}
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/Page.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/Page.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/Page.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.paging;
+import java.util.List;
+
/**
*
* <p>Look at the <a href="http://wiki.jboss.org/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
@@ -36,7 +38,7 @@
void write(PagedMessage message) throws Exception;
- PagedMessage[] read() throws Exception;
+ List<PagedMessage> read() throws Exception;
int getSize();
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -87,6 +87,7 @@
/**
* Page, only if destination is in page mode.
* @param message
+ * @param sync - Sync should be called right after the write
* @return false if destination is not on page mode
*/
boolean page(ServerMessage message) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -61,9 +61,7 @@
void sync() throws Exception;
- public boolean readPage() throws Exception;
-
- boolean page(PagedMessage message) throws Exception;
+ boolean page(PagedMessage message, boolean sync) throws Exception;
/**
*
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageImpl.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -27,10 +27,12 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
@@ -44,6 +46,8 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageImpl.class);
+
public static final int SIZE_RECORD = SIZE_BYTE + SIZE_INT + SIZE_BYTE;
private static final byte START_BYTE = (byte)'{';
@@ -54,6 +58,8 @@
private final int pageId;
+ private boolean suspiciousRecords = false;
+
private final AtomicInteger numberOfMessages = new AtomicInteger(0);
private final SequentialFile file;
@@ -82,7 +88,7 @@
return pageId;
}
- public PagedMessage[] read() throws Exception
+ public List<PagedMessage> read() throws Exception
{
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
@@ -108,23 +114,31 @@
{
PagedMessage msg = new PagedMessageImpl();
msg.decode(messageBuffer);
+ if (buffer.get() != END_BYTE)
+ {
+ // Sanity Check: This would only happen if there is a bug on decode or any internal code, as this
+ // constraint was already checked
+ throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE");
+ }
messages.add(msg);
}
else
{
- buffer.position(position + 1);
+ markFileAsSuspect(position, messages.size());
+ break;
}
}
}
else
{
- buffer.position(position + 1);
+ markFileAsSuspect(position, messages.size());
+ break;
}
}
numberOfMessages.set(messages.size());
- return messages.toArray(new PagedMessage[messages.size()]);
+ return messages;
}
public void write(final PagedMessage message) throws Exception
@@ -136,7 +150,7 @@
buffer.put(END_BYTE);
buffer.rewind();
- file.write(buffer, false);
+ file.write(buffer, false);
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
@@ -144,7 +158,7 @@
public void sync() throws Exception
{
- file.sync();
+ file.sync();
}
public void open() throws Exception
@@ -161,7 +175,18 @@
public void delete() throws Exception
{
- file.delete();
+ if (suspiciousRecords)
+ {
+ log.warn("File " + file.getFileName() +
+ " being renamed to " +
+ file.getFileName() +
+ ".invalidPage as it was loaded partially. Please verify your data.");
+ file.renameTo(file.getFileName() + ".invalidPage");
+ }
+ else
+ {
+ file.delete();
+ }
}
public int getNumberOfMessages()
@@ -180,5 +205,15 @@
// Private -------------------------------------------------------
+ /**
+ * @param position
+ * @param msgNumber
+ */
+ private void markFileAsSuspect(final int position, final int msgNumber)
+ {
+ log.warn("Page file had incomplete records at position " + position + " at record number " + msgNumber);
+ suspiciousRecords = true;
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -73,6 +73,8 @@
private final long defaultPageSize;
+ private final boolean syncNonTransactional;
+
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<Long, PageTransactionInfo>();
// Static
@@ -80,9 +82,6 @@
private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
- // private static final boolean isTrace = log.isTraceEnabled();
- private static final boolean isTrace = true;
-
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the
// variable isTrace above
@@ -99,13 +98,15 @@
final StorageManager storageManager,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final long maxGlobalSize,
- final long defaultPageSize)
+ final long defaultPageSize,
+ final boolean syncNonTransactional)
{
this.pagingSPI = pagingSPI;
this.queueSettingsRepository = queueSettingsRepository;
this.storageManager = storageManager;
this.defaultPageSize = defaultPageSize;
this.maxGlobalSize = maxGlobalSize;
+ this.syncNonTransactional = syncNonTransactional;
}
// Public
@@ -119,7 +120,7 @@
return globalMode.get();
}
- public void setGlobalPageMode(boolean globalMode)
+ public void setGlobalPageMode(final boolean globalMode)
{
this.globalMode.set(globalMode);
}
@@ -179,7 +180,7 @@
public void setLastPageRecord(final LastPageRecord lastPage) throws Exception
{
trace("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
-
+
getPageStore(lastPage.getDestination()).setLastPageRecord(lastPage);
}
@@ -200,29 +201,32 @@
public boolean page(final ServerMessage message, final long transactionId) throws Exception
{
- return getPageStore(message.getDestination()).page(new PagedMessageImpl(message, transactionId));
+ // The sync on transactions is done on commit only
+ return getPageStore(message.getDestination()).page(new PagedMessageImpl(message, transactionId), false);
}
public boolean page(final ServerMessage message) throws Exception
{
- return getPageStore(message.getDestination()).page(new PagedMessageImpl(message));
+ // If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
+ // of crash
+ return getPageStore(message.getDestination()).page(new PagedMessageImpl(message),
+ syncNonTransactional && message.isDurable());
}
public void addTransaction(final PageTransactionInfo pageTransaction)
{
transactions.put(pageTransaction.getTransactionID(), pageTransaction);
}
-
+
public void removeTransaction(final long id)
{
transactions.remove(id);
}
-
+
public PageTransactionInfo getTransaction(final long id)
{
return transactions.get(id);
}
-
public void sync(final Collection<SimpleString> destinationsToSync) throws Exception
{
@@ -246,11 +250,11 @@
{
return;
}
-
+
pagingSPI.setPagingManager(this);
-
+
pagingSPI.setStorageManager(storageManager);
-
+
started = true;
}
@@ -260,7 +264,7 @@
{
return;
}
-
+
started = false;
for (PagingStore store : stores.values())
@@ -270,10 +274,10 @@
pagingSPI.stop();
}
-
+
public synchronized void startGlobalDepage()
{
- for (PagingStore store: stores.values())
+ for (PagingStore store : stores.values())
{
store.startDepaging(pagingSPI.getGlobalDepagerExecutor());
}
@@ -285,19 +289,17 @@
*/
public long getGlobalSize()
{
- return this.globalSize.get();
+ return globalSize.get();
}
-
/* (non-Javadoc)
* @see org.jboss.messaging.core.paging.PagingManager#addGlobalSize(long)
*/
- public long addGlobalSize(long size)
+ public long addGlobalSize(final long size)
{
return globalSize.addAndGet(size);
}
-
/* (non-Javadoc)
* @see org.jboss.messaging.core.paging.PagingManager#getMaxGlobalSize()
*/
@@ -305,7 +307,6 @@
{
return maxGlobalSize;
}
-
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -90,8 +90,7 @@
// Bytes consumed by the queue on the memory
private final AtomicLong sizeInBytes = new AtomicLong();
- //FIXME - don't call this a thread - it's a Runnable not a Thread
- private volatile Runnable dequeueThread;
+ private volatile Runnable depageAction;
private volatile int numberOfPages;
@@ -153,11 +152,11 @@
if (queueSettings.getPageSizeBytes() != null)
{
- this.pageSize = queueSettings.getPageSizeBytes();
+ pageSize = queueSettings.getPageSizeBytes();
}
else
{
- this.pageSize = pagingManager.getDefaultPageSize();
+ pageSize = pagingManager.getDefaultPageSize();
}
dropMessagesWhenFull = queueSettings.isDropMessagesWhenFull();
@@ -215,39 +214,6 @@
return storeName;
}
- /**
- * Depage one page-file, read it and send it to the pagingManager / postoffice
- * @return
- * @throws Exception
- */
- // FIXME - why is this public?
- public boolean readPage() throws Exception
- {
- Page page = depage();
-
- if (page == null)
- {
- if (lastPageRecord != null)
- {
- clearLastPageRecord(lastPageRecord);
- }
-
- lastPageRecord = null;
-
- return false;
- }
-
- page.open();
-
- PagedMessage messages[] = page.read();
-
- boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
-
- page.delete();
-
- return addressNotFull;
- }
-
/**
* It returns a Page out of the Page System without reading it.
* The method calling this method will remove the page and will start reading it outside of any locks.
@@ -269,6 +235,9 @@
numberOfPages--;
final Page returnPage;
+
+ // We are out of old pages, all that is left now is the current page.
+ // On that case we need to replace it by a new empty page, and return the current page immediately
if (currentPageId == firstPageId)
{
firstPageId = Integer.MAX_VALUE;
@@ -285,14 +254,19 @@
throw new IllegalStateException("CurrentPage is null");
}
+ // The current page is empty... what means we achieved the end of the pages
if (returnPage.getNumberOfMessages() == 0)
{
returnPage.open();
returnPage.delete();
+
+ // This will trigger this Destination to exit the page mode,
+ // and this will make JBM start using the journal again
return null;
}
else
{
+ // We need to create a new page, as we can't lock the address until we finish depaging.
openNewPage();
}
@@ -409,21 +383,21 @@
}
}
- public boolean page(final PagedMessage message) throws Exception
+ public boolean page(final PagedMessage message, final boolean sync) throws Exception
{
-
+
if (!running)
{
- throw new IllegalStateException ("PagingStore(" + this.getStoreName() + ") not initialized");
+ throw new IllegalStateException("PagingStore(" + getStoreName() + ") not initialized");
}
-
- // Max-size is set, but reject is activated, what means.. never page on
- // this address
+
+ // We should never page when drop-messages is activated.
if (dropMessagesWhenFull)
{
return false;
}
+ // We need to ensure a read lock, as depage could change the paging state
currentPageLock.readLock().lock();
try
@@ -457,6 +431,8 @@
try
{
openNewPage();
+
+ // openNewPage will set pageUsedSize to zero, we need to set it again
pageUsedSize.addAndGet(bytesToWrite);
}
finally
@@ -472,6 +448,10 @@
if (currentPage != null)
{
currentPage.write(message);
+ if (sync)
+ {
+ currentPage.sync();
+ }
return true;
}
else
@@ -489,6 +469,7 @@
{
writeLock.unlock();
}
+
}
public void sync() throws Exception
@@ -508,13 +489,12 @@
}
}
-
public boolean startDepaging()
{
return startDepaging(executor);
}
- public boolean startDepaging(Executor executor)
+ public boolean startDepaging(final Executor executor)
{
currentPageLock.readLock().lock();
try
@@ -527,10 +507,10 @@
{
synchronized (this)
{
- if (dequeueThread == null)
+ if (depageAction == null)
{
- dequeueThread = new DepageRunnable(executor);
- executor.execute(dequeueThread);
+ depageAction = new DepageRunnable(executor);
+ executor.execute(depageAction);
return true;
}
else
@@ -573,7 +553,7 @@
try
{
running = false;
-
+
if (currentPage != null)
{
currentPage.close();
@@ -606,7 +586,7 @@
else
{
currentPageLock.writeLock().lock();
-
+
fileFactory.createDirs();
firstPageId = Integer.MAX_VALUE;
@@ -659,7 +639,7 @@
{
return false;
}
-
+
// First check without any global locks.
// (Faster)
currentPageLock.readLock().lock();
@@ -719,7 +699,7 @@
* If persistent messages are also used, it will update eventual PageTransactions
*/
- private boolean onDepage(final int pageId, final SimpleString destination, final PagedMessage[] data) throws Exception
+ private boolean onDepage(final int pageId, final SimpleString destination, final List<PagedMessage> data) throws Exception
{
trace("Depaging....");
@@ -756,7 +736,7 @@
{
ServerMessage pagedMessage = null;
- pagedMessage = (ServerMessage)msg.getMessage(storageManager);
+ pagedMessage = msg.getMessage(storageManager);
final long transactionIdDuringPaging = msg.getTransactionID();
@@ -846,7 +826,7 @@
private synchronized void clearDequeueThread()
{
- dequeueThread = null;
+ depageAction = null;
}
private void openNewPage() throws Exception
@@ -927,13 +907,45 @@
return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
}
+ /**
+ * Depage one page-file, read it and send it to the pagingManager / postoffice
+ * @return
+ * @throws Exception
+ */
+ private boolean readPage() throws Exception
+ {
+ Page page = depage();
+
+ if (page == null)
+ {
+ if (lastPageRecord != null)
+ {
+ clearLastPageRecord(lastPageRecord);
+ }
+
+ lastPageRecord = null;
+
+ return false;
+ }
+
+ page.open();
+
+ List<PagedMessage> messages = page.read();
+
+ boolean addressNotFull = onDepage(page.getPageId(), storeName, messages);
+
+ page.delete();
+
+ return addressNotFull;
+ }
+
// Inner classes -------------------------------------------------
private class DepageRunnable implements Runnable
{
private final Executor followingExecutor;
-
- public DepageRunnable(Executor followingExecutor)
+
+ public DepageRunnable(final Executor followingExecutor)
{
this.followingExecutor = followingExecutor;
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -46,7 +46,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(JournalLargeMessageImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Attributes ----------------------------------------------------
@@ -55,14 +55,14 @@
// We should only use the NIO implementation on the Journal
private volatile SequentialFile file;
-
+
private volatile boolean complete = false;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public JournalLargeMessageImpl(JournalStorageManager storageManager)
+ public JournalLargeMessageImpl(final JournalStorageManager storageManager)
{
this.storageManager = storageManager;
}
@@ -187,7 +187,7 @@
public void deleteFile() throws MessagingException
{
- this.storageManager.deleteFile(file);
+ storageManager.deleteFile(file);
}
@Override
@@ -196,15 +196,15 @@
// The body won't be on memory (aways on-file), so we don't consider this for paging
return super.getPropertiesEncodeSize();
}
-
+
public synchronized void complete() throws Exception
{
releaseResources();
-
+
if (!complete)
{
- SequentialFile fileToRename = storageManager.createFileForLargeMessage(this.getMessageID(), true);
- file.renameTo(fileToRename);
+ SequentialFile fileToRename = storageManager.createFileForLargeMessage(getMessageID(), true);
+ file.renameTo(fileToRename.getFileName());
}
}
@@ -233,17 +233,16 @@
{
if (file == null)
{
- if (this.messageID <= 0)
+ if (messageID <= 0)
{
throw new RuntimeException("MessageID not set on LargeMessage");
}
-
- file = storageManager.createFileForLargeMessage(this.getMessageID(), complete);
-
+
+ file = storageManager.createFileForLargeMessage(getMessageID(), complete);
+
}
}
-
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -214,7 +214,8 @@
storageManager,
queueSettingsRepository,
configuration.getPagingMaxGlobalSizeBytes(),
- configuration.getPagingDefaultSize());
+ configuration.getPagingDefaultSize(),
+ configuration.isJournalSyncNonTransactional());
pagingManager.start();
resourceManager = new ResourceManagerImpl((int)configuration.getTransactionTimeout() / 1000,
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingIntegrationTest.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -50,6 +50,11 @@
testAdd(new NIOSequentialFileFactory(getTestDir()), 1000);
}
+ public void testDamagedDataWithNIO() throws Exception
+ {
+ testDamagedPage(new NIOSequentialFileFactory(getTestDir()), 1000);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -24,6 +24,7 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.List;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagedMessage;
@@ -60,7 +61,7 @@
// Public --------------------------------------------------------
- public void testPagingManagerNIO() throws Exception
+ public void testPagingManager() throws Exception
{
HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
queueSettings.setDefault(new QueueSettings());
@@ -69,7 +70,8 @@
null,
queueSettings,
-1,
- 1024 * 1024);
+ 1024 * 1024,
+ true);
managerImpl.start();
@@ -77,31 +79,32 @@
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
- assertFalse(store.page(new PagedMessageImpl(msg)));
+ assertFalse(store.page(new PagedMessageImpl(msg), true));
store.startPaging();
- assertTrue(store.page(new PagedMessageImpl(msg)));
+ assertTrue(store.page(new PagedMessageImpl(msg), true));
Page page = store.depage();
page.open();
- PagedMessage msgs[] = page.read();
+ List<PagedMessage> msgs = page.read();
page.close();
- assertEquals(1, msgs.length);
+ assertEquals(1, msgs.size());
- assertEqualsByteArrays(msg.getBody().array(), (msgs[0].getMessage(null)).getBody().array());
+ assertEqualsByteArrays(msg.getBody().array(), (msgs.get(0).getMessage(null)).getBody().array());
assertTrue(store.isPaging());
assertNull(store.depage());
- assertFalse(store.page(new PagedMessageImpl(msg)));
+ assertFalse(store.page(new PagedMessageImpl(msg), true));
}
+
public void testPagingManagerAddressFull() throws Exception
{
HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
@@ -117,7 +120,8 @@
null,
queueSettings,
-1,
- 1024 * 1024);
+ 1024 * 1024,
+ false);
managerImpl.start();
managerImpl.createPageStore(new SimpleString("simple-test"));
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingStoreIntegrationTest.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -24,8 +24,6 @@
import java.io.File;
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.tests.unit.core.paging.impl.PagingStoreTestBase;
@@ -46,18 +44,6 @@
// Public --------------------------------------------------------
- public void testPageStoreWithAIO() throws Exception
- {
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
- testConcurrentPaging(new AIOSequentialFileFactory(getTestDir()), 10);
- }
-
public void testPageWithNIO() throws Exception
{
// This integration test could fail 1 in 100 due to race conditions.
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -304,7 +304,7 @@
{
private volatile boolean open;
- private final String fileName;
+ private String fileName;
private ByteBuffer data;
@@ -557,9 +557,11 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.journal.SequentialFile#renameTo(org.jboss.messaging.core.journal.SequentialFile)
*/
- public void renameTo(SequentialFile file) throws Exception
+ public void renameTo(String newFileName) throws Exception
{
- throw new IllegalStateException("Method rename not supoprted on FakeSequentialFile");
+ fileMap.remove(this.fileName);
+ this.fileName = newFileName;
+ fileMap.put(newFileName, this);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTest.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -46,6 +46,12 @@
testAdd(new FakeSequentialFileFactory(1, false), 10);
}
+ /** Validate if everything we add is recovered */
+ public void testDamagedPage() throws Exception
+ {
+ testDamagedPage(new FakeSequentialFileFactory(1, false), 100);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -24,6 +24,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.List;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
@@ -73,65 +74,156 @@
assertEquals(1, factory.listFiles("page").size());
- ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-
SimpleString simpleDestination = new SimpleString("Test");
- for (int i = 0; i < numberOfElements; i++)
- {
- ByteBuffer buffer = ByteBuffer.allocate(10);
+ ArrayList<ByteBuffer> buffers = addPageElements(simpleDestination, impl, numberOfElements);
- for (int j = 0; j < buffer.limit(); j++)
- {
- buffer.put(RandomUtil.randomByte());
- }
+ impl.sync();
+ impl.close();
- buffers.add(buffer);
+ file = factory.createSequentialFile("00010.page", 1);
+ file.open();
+ impl = new PageImpl(factory, file, 10);
- ServerMessage msg = new ServerMessageImpl((byte)1,
- true,
- 0,
- System.currentTimeMillis(),
- (byte)0,
- new ByteBufferWrapper(buffer));
+ List<PagedMessage> msgs = impl.read();
- msg.setMessageID(i);
+ assertEquals(numberOfElements, msgs.size());
- msg.setDestination(simpleDestination);
+ assertEquals(numberOfElements, impl.getNumberOfMessages());
- impl.write(new PagedMessageImpl(msg));
+ for (int i = 0; i < msgs.size(); i++)
+ {
+ assertEquals(i, (msgs.get(i).getMessage(null)).getMessageID());
- assertEquals(i + 1, impl.getNumberOfMessages());
+ assertEquals(simpleDestination, (msgs.get(i).getMessage(null)).getDestination());
+
+ assertEqualsByteArrays(buffers.get(i).array(), (msgs.get(i).getMessage(null)).getBody().array());
}
+ impl.delete();
+
+ assertEquals(0, factory.listFiles(".page").size());
+
+ }
+
+
+
+ public void testDamagedPage(final SequentialFileFactory factory, final int numberOfElements) throws Exception
+ {
+
+ SequentialFile file = factory.createSequentialFile("00010.page", 1);
+
+ PageImpl impl = new PageImpl(factory, file, 10);
+
+ assertEquals(10, impl.getPageId());
+
+ impl.open();
+
+ assertEquals(1, factory.listFiles("page").size());
+
+ SimpleString simpleDestination = new SimpleString("Test");
+
+ ArrayList<ByteBuffer> buffers = addPageElements(simpleDestination, impl, numberOfElements);
+
impl.sync();
+
+ long positionA = file.position();
+
+ // Add one record that will be damaged
+ addPageElements(simpleDestination, impl, 1);
+
+ long positionB = file.position();
+
+ // Add more 10 as they will need to be ignored
+ addPageElements(simpleDestination, impl, 10);
+
+
+ // Damage data... position the file on the middle between points A and B
+ file.position(positionA + (positionB - positionA) / 2);
+
+ ByteBuffer buffer = ByteBuffer.allocate((int)(positionB - file.position()));
+
+ for (int i = 0; i< buffer.capacity(); i++)
+ {
+ buffer.put((byte)'Z');
+ }
+
+ buffer.rewind();
+
+ file.write(buffer, true);
+
impl.close();
file = factory.createSequentialFile("00010.page", 1);
file.open();
impl = new PageImpl(factory, file, 10);
- PagedMessage msgs[] = impl.read();
+ List<PagedMessage> msgs = impl.read();
- assertEquals(numberOfElements, msgs.length);
+ assertEquals(numberOfElements, msgs.size());
assertEquals(numberOfElements, impl.getNumberOfMessages());
- for (int i = 0; i < msgs.length; i++)
+ for (int i = 0; i < msgs.size(); i++)
{
- assertEquals(i, (msgs[i].getMessage(null)).getMessageID());
+ assertEquals(i, (msgs.get(i).getMessage(null)).getMessageID());
- assertEquals(simpleDestination, (msgs[i].getMessage(null)).getDestination());
+ assertEquals(simpleDestination, (msgs.get(i).getMessage(null)).getDestination());
- assertEqualsByteArrays(buffers.get(i).array(), (msgs[i].getMessage(null)).getBody().array());
+ assertEqualsByteArrays(buffers.get(i).array(), (msgs.get(i).getMessage(null)).getBody().array());
}
impl.delete();
- assertEquals(0, factory.listFiles(".page").size());
+ assertEquals(0, factory.listFiles("page").size());
+ assertEquals(1, factory.listFiles("invalidPage").size());
+
}
+
+ /**
+ * @param simpleDestination
+ * @param page
+ * @param numberOfElements
+ * @return
+ * @throws Exception
+ */
+ protected ArrayList<ByteBuffer> addPageElements(SimpleString simpleDestination, PageImpl page, int numberOfElements) throws Exception
+ {
+ ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+
+ int initialNumberOfMessages = page.getNumberOfMessages();
+ for (int i = 0; i < numberOfElements; i++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocate(10);
+
+ for (int j = 0; j < buffer.limit(); j++)
+ {
+ buffer.put(RandomUtil.randomByte());
+ }
+
+ buffers.add(buffer);
+
+ ServerMessage msg = new ServerMessageImpl((byte)1,
+ true,
+ 0,
+ System.currentTimeMillis(),
+ (byte)0,
+ new ByteBufferWrapper(buffer));
+
+ msg.setMessageID(i);
+
+ msg.setDestination(simpleDestination);
+
+ page.write(new PagedMessageImpl(msg));
+
+ assertEquals(initialNumberOfMessages + i + 1, page.getNumberOfMessages());
+ }
+ return buffers;
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -111,7 +111,7 @@
assertTrue(storeImpl.isPaging());
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, true));
assertEquals(1, storeImpl.getNumberOfPages());
@@ -162,7 +162,7 @@
PagedMessageImpl msg = createMessage(destination, buffer);
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, true));
}
assertEquals(1, storeImpl.getNumberOfPages());
@@ -173,9 +173,9 @@
page.open();
- PagedMessage msg[] = page.read();
+ List<PagedMessage> msg = page.read();
- assertEquals(10, msg.length);
+ assertEquals(10, msg.size());
assertEquals(1, storeImpl.getNumberOfPages());
page = storeImpl.depage();
@@ -186,8 +186,8 @@
for (int i = 0; i < 10; i++)
{
- assertEquals(0, (msg[i].getMessage(null)).getMessageID());
- assertEqualsByteArrays(buffers.get(i).array(), (msg[i].getMessage(null)).getBody().array());
+ assertEquals(0, (msg.get(i).getMessage(null)).getMessageID());
+ assertEqualsByteArrays(buffers.get(i).array(), (msg.get(i).getMessage(null)).getBody().array());
}
}
@@ -230,7 +230,7 @@
PagedMessageImpl msg = createMessage(destination, buffer);
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, true));
}
assertEquals(2, storeImpl.getNumberOfPages());
@@ -243,16 +243,16 @@
page.open();
- PagedMessage msg[] = page.read();
+ List<PagedMessage> msg = page.read();
page.close();
- assertEquals(5, msg.length);
+ assertEquals(5, msg.size());
for (int i = 0; i < 5; i++)
{
- assertEquals(0, (msg[i].getMessage(null)).getMessageID());
- assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), (msg[i].getMessage(null)).getBody().array());
+ assertEquals(0, (msg.get(i).getMessage(null)).getMessageID());
+ assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), (msg.get(i).getMessage(null)).getBody().array());
}
}
@@ -262,13 +262,13 @@
PagedMessageImpl msg = createMessage(destination, buffers.get(0));
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, true));
Page newPage = storeImpl.depage();
newPage.open();
- assertEquals(1, newPage.read().length);
+ assertEquals(1, newPage.read().size());
newPage.delete();
@@ -280,23 +280,23 @@
assertFalse(storeImpl.isPaging());
- assertFalse(storeImpl.page(msg));
+ assertFalse(storeImpl.page(msg, true));
storeImpl.startPaging();
- assertTrue(storeImpl.page(msg));
+ assertTrue(storeImpl.page(msg, true));
Page page = storeImpl.depage();
page.open();
- PagedMessage msgs[] = page.read();
+ List<PagedMessage> msgs = page.read();
- assertEquals(1, msgs.length);
+ assertEquals(1, msgs.size());
- assertEquals(0l, (msgs[0].getMessage(null)).getMessageID());
+ assertEquals(0l, (msgs.get(0).getMessage(null)).getMessageID());
- assertEqualsByteArrays(buffers.get(0).array(), (msgs[0].getMessage(null)).getBody().array());
+ assertEqualsByteArrays(buffers.get(0).array(), (msgs.get(0).getMessage(null)).getBody().array());
assertEquals(1, storeImpl.getNumberOfPages());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-12-03 22:19:37 UTC (rev 5458)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-12-04 01:40:14 UTC (rev 5459)
@@ -141,7 +141,7 @@
{
long id = messageIdGenerator.incrementAndGet();
PagedMessageImpl msg = createMessage(destination, createRandomBuffer(id, 5));
- if (storeImpl.page(msg))
+ if (storeImpl.page(msg, false))
{
buffers.put(id, msg);
}
@@ -231,7 +231,7 @@
for (Page page : readPages)
{
page.open();
- PagedMessage msgs[] = page.read();
+ List<PagedMessage> msgs = page.read();
page.close();
for (PagedMessage msg : msgs)
@@ -278,7 +278,7 @@
long lastMessageId = messageIdGenerator.incrementAndGet();
PagedMessage lastMsg = createMessage(destination, createRandomBuffer(lastMessageId, 5));
- storeImpl2.page(lastMsg);
+ storeImpl2.page(lastMsg, false);
buffers2.put(lastMessageId, lastMsg);
Page lastPage = null;
@@ -294,7 +294,7 @@
page.open();
- PagedMessage[] msgs = page.read();
+ List<PagedMessage> msgs = page.read();
page.close();
@@ -312,13 +312,13 @@
}
lastPage.open();
- PagedMessage lastMessages[] = lastPage.read();
+ List<PagedMessage> lastMessages = lastPage.read();
lastPage.close();
- assertEquals(1, lastMessages.length);
+ assertEquals(1, lastMessages.size());
- (lastMessages[0].getMessage(null)).getBody().rewind();
- assertEquals((lastMessages[0].getMessage(null)).getBody().getLong(), lastMessageId);
- assertEqualsByteArrays((lastMessages[0].getMessage(null)).getBody().array(), (lastMsg.getMessage(null)).getBody()
+ (lastMessages.get(0).getMessage(null)).getBody().rewind();
+ assertEquals((lastMessages.get(0).getMessage(null)).getBody().getLong(), lastMessageId);
+ assertEqualsByteArrays((lastMessages.get(0).getMessage(null)).getBody().array(), (lastMsg.getMessage(null)).getBody()
.array());
assertEquals(0, buffers2.size());
More information about the jboss-cvs-commits
mailing list