[hornetq-commits] JBoss hornetq SVN: r7980 - in trunk: src/main/org/hornetq/core/journal/impl and 16 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Sep 22 19:39:48 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-09-22 19:39:47 -0400 (Tue, 22 Sep 2009)
New Revision: 7980
Added:
trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/LoaderCallback.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
trunk/tests/src/org/hornetq/tests/util/JournalExample.java
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-49 (Orphaned files) and HORNETQ-143 (Bug on LargeMessages & XA)
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -75,7 +75,7 @@
// Load
- long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+ long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
int getAlignment() throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/LoaderCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/LoaderCallback.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/LoaderCallback.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -13,18 +13,19 @@
package org.hornetq.core.journal;
+
/**
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public interface LoaderCallback
+public interface LoaderCallback extends TransactionFailureCallback
{
+ void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+
void addRecord(RecordInfo info);
void deleteRecord(long id);
void updateRecord(RecordInfo info);
-
- void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
}
Added: trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/TransactionFailureCallback.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+import java.util.List;
+
+/**
+ * A TransactionFailureCallback
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TransactionFailureCallback
+{
+
+ /** To be used to inform about transactions without commit records.
+ * This could be used to remove extra resources associated with the transactions (such as external files received during the transaction) */
+ void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete);
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -28,7 +28,6 @@
import org.hornetq.core.asyncio.impl.TimedBuffer;
import org.hornetq.core.asyncio.impl.TimedBufferObserver;
import org.hornetq.core.journal.IOCallback;
-import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
@@ -40,14 +39,10 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class AIOSequentialFile implements SequentialFile
+public class AIOSequentialFile extends AbstractSequentialFile
{
private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
- private final String directory;
-
- private File file;
-
private boolean opened = false;
private final int maxIO;
@@ -87,9 +82,8 @@
final Executor executor,
final Executor pollerExecutor)
{
+ super(directory, new File(directory + "/" + fileName));
this.factory = factory;
- this.directory = directory;
- file = new File(directory + "/" + fileName);
this.maxIO = maxIO;
this.bufferCallback = bufferCallback;
this.executor = executor;
@@ -108,11 +102,6 @@
return aioFile.getBlockSize();
}
- public boolean exists()
- {
- return file.exists();
- }
-
public int calculateBlockStart(final int position) throws Exception
{
int alignment = getAlignment();
@@ -139,7 +128,10 @@
public synchronized void close() throws Exception
{
- checkOpened();
+ if (!opened)
+ {
+ return;
+ }
opened = false;
timedBuffer = null;
@@ -156,8 +148,8 @@
while (!donelatch.await(60, TimeUnit.SECONDS))
{
- log.warn("Executor on file " + file.getName() + " couldn't complete its tasks in 60 seconds.",
- new Exception("Warning: Executor on file " + file.getName() +
+ log.warn("Executor on file " + getFile().getName() + " couldn't complete its tasks in 60 seconds.",
+ new Exception("Warning: Executor on file " + getFile().getName() +
" couldn't complete its tasks in 60 seconds."));
}
@@ -178,17 +170,6 @@
}
}
- public void delete() throws Exception
- {
- if (aioFile != null)
- {
- aioFile.close();
- aioFile = null;
- }
-
- file.delete();
- }
-
public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
checkOpened();
@@ -237,35 +218,16 @@
this.fileSize = aioFile.size();
}
- public String getFileName()
- {
- return file.getName();
- }
-
public void open() throws Exception
{
open(maxIO);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFile#renameTo(org.hornetq.core.journal.SequentialFile)
- */
- public void renameTo(String newFileName) throws Exception
- {
- if (isOpen())
- {
- close();
- }
- File newFile = new File(directory + "/" + newFileName);
- file.renameTo(newFile);
- file = newFile;
- }
-
public synchronized void open(final int currentMaxIO) throws Exception
{
opened = true;
aioFile = newFile();
- aioFile.open(file.getAbsolutePath(), currentMaxIO);
+ aioFile.open(getFile().getAbsolutePath(), currentMaxIO);
position.set(0);
aioFile.setBufferCallback(bufferCallback);
this.fileSize = aioFile.size();
@@ -376,7 +338,7 @@
{
if (aioFile == null)
{
- return file.length();
+ return getFile().length();
}
else
{
@@ -387,7 +349,7 @@
@Override
public String toString()
{
- return "AIOSequentialFile:" + file.getAbsolutePath();
+ return "AIOSequentialFile:" + getFile().getAbsolutePath();
}
// Public methods
@@ -525,7 +487,7 @@
public String toString()
{
- return "TimedBufferObserver on file (" + AIOSequentialFile.this.file.getName() + ")";
+ return "TimedBufferObserver on file (" + getFile().getName() + ")";
}
}
Added: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFile;
+
+/**
+ * A AbstractSequentialFile
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractSequentialFile implements SequentialFile
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private File file;
+
+ private final String directory;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ /**
+ * @param file
+ * @param directory
+ */
+ public AbstractSequentialFile(String directory, File file)
+ {
+ super();
+ this.file = file;
+ this.directory = directory;
+ }
+
+ // Public --------------------------------------------------------
+
+ public final boolean exists()
+ {
+ return file.exists();
+ }
+
+ public final String getFileName()
+ {
+ return file.getName();
+ }
+
+
+ public final void delete() throws Exception
+ {
+ if (isOpen())
+ {
+ close();
+ }
+
+ file.delete();
+ }
+
+
+ public final void renameTo(final String newFileName) throws Exception
+ {
+ close();
+ File newFile = new File(directory + "/" + newFileName);
+
+
+ if (!file.equals(newFile))
+ {
+ file.renameTo(newFile);
+ file = newFile;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected File getFile()
+ {
+ return file;
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -50,6 +50,7 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
+import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.DataConstants;
@@ -1353,7 +1354,8 @@
* @see JournalImpl#load(LoaderCallback)
*/
public synchronized long load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo> preparedTransactions) throws Exception
+ final List<PreparedTransactionInfo> preparedTransactions,
+ final TransactionFailureCallback failureCallback) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
final List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -1399,6 +1401,14 @@
recordsToDelete.clear();
}
}
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ if (failureCallback != null)
+ {
+ failureCallback.failedTransaction(transactionID, records, recordsToDelete);
+ }
+ }
});
for (RecordInfo record : records)
@@ -2015,6 +2025,8 @@
// Remove the transactionInfo
transactions.remove(transaction.transactionID);
+
+ loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
}
else
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -32,16 +32,12 @@
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
-public class NIOSequentialFile implements SequentialFile
+public class NIOSequentialFile extends AbstractSequentialFile
{
private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
- private File file;
-
private long fileSize = 0;
- private final String directory;
-
private FileChannel channel;
private RandomAccessFile rfile;
@@ -50,15 +46,9 @@
public NIOSequentialFile(final String directory, final String fileName)
{
- this.directory = directory;
- file = new File(directory + "/" + fileName);
+ super(directory, new File(directory + "/" + fileName));
}
- public boolean exists()
- {
- return file.exists();
- }
-
public int getAlignment()
{
return 1;
@@ -78,11 +68,6 @@
return this.position.get() + size <= fileSize;
}
- public String getFileName()
- {
- return file.getName();
- }
-
public synchronized boolean isOpen()
{
return channel != null;
@@ -90,7 +75,7 @@
public synchronized void open() throws Exception
{
- rfile = new RandomAccessFile(file, "rw");
+ rfile = new RandomAccessFile(getFile(), "rw");
channel = rfile.getChannel();
@@ -150,17 +135,7 @@
notifyAll();
}
-
- public void delete() throws Exception
- {
- if (isOpen())
- {
- close();
- }
-
- file.delete();
- }
-
+
public int read(final ByteBuffer bytes) throws Exception
{
return read(bytes, null);
@@ -249,7 +224,7 @@
{
if (channel == null)
{
- return file.length();
+ return getFile().length();
}
else
{
@@ -268,18 +243,10 @@
return position.get();
}
- public void renameTo(final String newFileName) throws Exception
- {
- close();
- File newFile = new File(directory + "/" + newFileName);
- file.renameTo(newFile);
- file = newFile;
- }
-
@Override
public String toString()
{
- return "NIOSequentialFile " + file;
+ return "NIOSequentialFile " + getFile();
}
/* (non-Javadoc)
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -153,6 +153,12 @@
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
+
+ if (message.getMessage(null).isLargeMessage())
+ {
+ // If we don't sync on large messages we could have the risk of files unnatended files on disk
+ sync();
+ }
}
public void sync() throws Exception
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -37,6 +37,7 @@
import org.hornetq.core.paging.PagingStoreFactory;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
@@ -769,6 +770,16 @@
ServerMessage message = null;
message = pagedMessage.getMessage(storageManager);
+
+ if (message.isLargeMessage())
+ {
+ LargeServerMessage largeMsg = (LargeServerMessage)message;
+ if (!largeMsg.isFileExists())
+ {
+ log.warn("File for large message " + largeMsg.getMessageID() + " doesn't exist, so ignoring depage for this large message");
+ continue;
+ }
+ }
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -50,8 +50,6 @@
// We should only use the NIO implementation on the Journal
private SequentialFile file;
- private boolean complete = false;
-
private long bodySize = -1;
// Static --------------------------------------------------------
@@ -76,7 +74,6 @@
this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
- complete = true;
bodySize = copy.bodySize;
setMessageID(newID);
}
@@ -149,6 +146,7 @@
return (int)Math.min(bodySize, Integer.MAX_VALUE);
}
+ @Override
public synchronized long getLargeBodySize()
{
try
@@ -178,32 +176,26 @@
public void decode(final HornetQBuffer buffer)
{
file = null;
- complete = true;
+ try
+ {
+ this.setStored();
+ }
+ catch (Exception e)
+ {
+ // File still null, this wasn't supposed to happen ever.
+ log.warn(e.getMessage(), e);
+ }
decodeProperties(buffer);
}
- /**
- * @return the complete
- */
- public boolean isComplete()
- {
- return complete;
- }
-
- /**
- * @param complete the complete to set
- */
- public void setComplete(boolean complete)
- {
- this.complete = complete;
- }
-
@Override
public int decrementRefCount()
{
int currentRefCount = super.decrementRefCount();
- if (currentRefCount == 0)
+ // We use <= as this could be used by load.
+ // because of a failure, no references were loaded, so we have 0... and we still need to delete the associated files
+ if (currentRefCount <= 0)
{
if (linkMessage != null)
{
@@ -242,6 +234,12 @@
validateFile();
storageManager.deleteFile(file);
}
+
+ public boolean isFileExists() throws Exception
+ {
+ SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(), isStored());
+ return localfile.exists();
+ }
// We cache this
private volatile int memoryEstimate = -1;
@@ -257,14 +255,16 @@
return memoryEstimate;
}
-
- public synchronized void complete() throws Exception
+
+
+ @Override
+ public void setStored() throws Exception
{
+ super.setStored();
releaseResources();
-
- if (!complete)
+ if (file != null && linkMessage == null)
{
- SequentialFile fileToRename = storageManager.createFileForLargeMessage(getMessageID(), true);
+ SequentialFile fileToRename = storageManager.createFileForLargeMessage(getMessageID(), isStored());
file.renameTo(fileToRename.getFileName());
}
}
@@ -296,7 +296,7 @@
idToUse = linkMessage.getMessageID();
}
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, true);
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, isStored());
file.open();
@@ -327,7 +327,7 @@
throw new RuntimeException("MessageID not set on LargeMessage");
}
- file = storageManager.createFileForLargeMessage(getMessageID(), complete);
+ file = storageManager.createFileForLargeMessage(getMessageID(), isStored());
file.open();
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -39,6 +39,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -465,7 +466,45 @@
int deliveryCount;
}
+
+
+ private class LargeMessageTXFailureCallback implements TransactionFailureCallback
+ {
+ private final Map<Long, ServerMessage> messages;
+
+ public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
+ {
+ super();
+ this.messages = messages;
+ }
+
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == ADD_LARGE_MESSAGE)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = ChannelBuffers.wrappedBuffer(data);
+
+ try
+ {
+ LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
+ serverMessage.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ }
+
public void loadMessageJournal(final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
@@ -475,9 +514,11 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
- messageJournal.load(records, preparedTransactions);
-
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+
+ messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
+
+ ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
@@ -493,37 +534,12 @@
{
case ADD_LARGE_MESSAGE:
{
- LargeServerMessage largeMessage = createLargeMessage();
+ LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
- LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
- messageEncoding.decode(buff);
+ messages.put(record.id, largeMessage);
- Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
- // Using the linked file by the original file
- if (originalMessageID != null)
- {
- LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
- {
- // this could happen if the message was deleted but the file still exists as the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setMessageID(originalMessageID);
- originalMessage.setComplete(true);
- messages.put(originalMessageID, originalMessage);
- }
-
- originalMessage.incrementRefCount();
-
- largeMessage.setLinkedMessage(originalMessage);
- largeMessage.setComplete(true);
- }
-
+ largeMessages.add(largeMessage);
- messages.put(record.id, largeMessage);
-
break;
}
case ADD_MESSAGE:
@@ -708,12 +724,58 @@
loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+ for (LargeServerMessage msg : largeMessages)
+ {
+ if (msg.getRefCount() == 0)
+ {
+ log.info("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
+ msg.decrementRefCount();
+ }
+ }
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
}
}
+ /**
+ * @param messages
+ * @param buff
+ * @return
+ * @throws Exception
+ */
+ private LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, HornetQBuffer buff) throws Exception
+ {
+ LargeServerMessage largeMessage = createLargeMessage();
+
+ LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+
+ messageEncoding.decode(buff);
+
+ Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+ // Using the linked file by the original file
+ if (originalMessageID != null)
+ {
+ LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
+
+ if (originalMessage == null)
+ {
+ // this could happen if the message was deleted but the file still exists as the file still being used
+ originalMessage = createLargeMessage();
+ originalMessage.setMessageID(originalMessageID);
+ originalMessage.setStored();
+ messages.put(originalMessageID, originalMessage);
+ }
+
+ originalMessage.incrementRefCount();
+
+ largeMessage.setLinkedMessage(originalMessage);
+ }
+ return largeMessage;
+ }
+
private void loadPreparedTransactions(final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
@@ -747,6 +809,12 @@
switch (recordType)
{
+ case ADD_LARGE_MESSAGE:
+ {
+ messages.put(record.id, parseLargeMessage(messages, buff));
+
+ break;
+ }
case ADD_MESSAGE:
{
ServerMessage message = new ServerMessageImpl(record.id);
@@ -933,7 +1001,7 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
- bindingsJournal.load(records, preparedTransactions);
+ bindingsJournal.load(records, preparedTransactions, null);
for (RecordInfo record : records)
{
@@ -1059,9 +1127,9 @@
* @param messageID
* @return
*/
- SequentialFile createFileForLargeMessage(final long messageID, final boolean completeFile)
+ SequentialFile createFileForLargeMessage(final long messageID, final boolean stored)
{
- if (completeFile)
+ if (stored)
{
return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -126,6 +126,15 @@
// nothing to be done on null persistence
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#isFileExists()
+ */
+ public boolean isFileExists() throws Exception
+ {
+ // There are no real files on null persistence
+ return true;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -610,6 +610,7 @@
{
if (pagingManager.page(message, true))
{
+ message.setStored();
return;
}
}
@@ -1049,6 +1050,7 @@
{
if (pagingManager.page(message, tx.getID(), first))
{
+ message.setStored();
if (message.isDurable())
{
// We only create pageTransactions if using persistent messages
Modified: trunk/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -31,17 +31,13 @@
/** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
LargeServerMessage getLinkedMessage();
+
+ boolean isFileExists() throws Exception;
/** Close the files if opened */
void releaseResources();
long getLargeBodySize();
- void complete() throws Exception;
-
- void setComplete(boolean isComplete);
-
- boolean isComplete();
-
void deleteFile() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -44,11 +44,12 @@
int getMemoryEstimate();
- void setStored();
+ void setStored() throws Exception;
boolean isStored();
int getRefCount();
+
//TODO - we might be able to put this in a better place
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -794,6 +794,30 @@
configuration.isBackup());
}
+ /** for use on sub-classes */
+ protected ExecutorService getExecutor()
+ {
+ return threadPool;
+ }
+
+ /**
+ * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
+ * @return
+ */
+ protected StorageManager createStorageManager()
+ {
+ if (configuration.isPersistenceEnabled())
+ {
+ return new JournalStorageManager(configuration, threadPool);
+ }
+ else
+ {
+ return new NullStorageManager();
+ }
+ }
+
+
+
// Private
// --------------------------------------------------------------------------------------
@@ -921,14 +945,7 @@
deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
}
- if (configuration.isPersistenceEnabled())
- {
- storageManager = new JournalStorageManager(configuration, threadPool);
- }
- else
- {
- storageManager = new NullStorageManager();
- }
+ this.storageManager = createStorageManager();
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
securityRepository.setDefault(new HashSet<Role>());
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -130,7 +130,8 @@
private final Binding binding;
// Constructors ---------------------------------------------------------------------------------
-
+
+
public ServerConsumerImpl(final long id,
final long replicatedSessionID,
final ServerSession session,
@@ -147,6 +148,7 @@
final Executor executor,
final ManagementService managementService) throws Exception
{
+
this.id = id;
this.replicatedSessionID = replicatedSessionID;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -105,7 +105,7 @@
return stored;
}
- public void setStored()
+ public void setStored() throws Exception
{
stored = true;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -2367,7 +2367,7 @@
currentLargeMessage = null;
- message.complete();
+ message.releaseResources();
send(message);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -272,7 +272,7 @@
ArrayList<PreparedTransactionInfo> transactions = new ArrayList<PreparedTransactionInfo>();
journal.start();
- journal.load(records, transactions);
+ journal.load(records, transactions, null);
System.out.println("===============================================");
System.out.println("Journal records at the end:");
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -67,20 +67,19 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
-
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- ClientSession session = null;
+ ClientSession session = null;
try
{
server = createServer(true);
server.start();
-
+
log.info("*********** starting test");
ClientSessionFactory sf = createInVMFactory();
@@ -94,7 +93,7 @@
Message clientFile = createLargeClientMessage(session, messageSize, true);
log.info("*********** sending large message");
-
+
producer.send(clientFile);
session.commit();
@@ -106,13 +105,13 @@
msg1.acknowledge();
session.commit();
assertNotNull(msg1);
-
+
consumer.close();
try
{
msg1.getBody().readByte();
- fail ("Exception was expected");
+ fail("Exception was expected");
}
catch (Throwable ignored)
{
@@ -142,20 +141,18 @@
}
}
-
-
public void testDLALargeMessage() throws Exception
{
final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- ClientSession session = null;
+ ClientSession session = null;
try
{
server = createServer(true);
server.start();
-
+
log.info("*********** starting test");
ClientSessionFactory sf = createInVMFactory();
@@ -181,7 +178,7 @@
Message clientFile = createLargeClientMessage(session, messageSize, true);
log.info("*********** sending large message");
-
+
producer.send(clientFile);
session.commit();
@@ -275,7 +272,6 @@
}
}
-
public void testDeliveryCount() throws Exception
{
final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -289,12 +285,11 @@
server.start();
ClientSessionFactory sf = createInVMFactory();
-
+
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
-
ClientProducer producer = session.createProducer(ADDRESS);
Message clientFile = createLargeClientMessage(session, messageSize, true);
@@ -305,19 +300,19 @@
session.start();
ClientConsumer consumer = session.createConsumer(ADDRESS);
-
+
ClientMessage msg = consumer.receive(10000);
assertNotNull(msg);
msg.acknowledge();
assertEquals(1, msg.getDeliveryCount());
- for (int i = 0 ; i < messageSize; i++)
+ for (int i = 0; i < messageSize; i++)
{
assertEquals(getSamplebyte(i), msg.getBody().readByte());
}
session.rollback();
-
+
session.close();
-
+
session = sf.createSession(false, false, false);
session.start();
@@ -325,16 +320,16 @@
msg = consumer.receive(10000);
assertNotNull(msg);
msg.acknowledge();
- for (int i = 0 ; i < messageSize; i++)
+ for (int i = 0; i < messageSize; i++)
{
assertEquals(getSamplebyte(i), msg.getBody().readByte());
}
assertEquals(2, msg.getDeliveryCount());
msg.acknowledge();
consumer.close();
-
- session.commit();
-
+
+ session.commit();
+
validateNoFilesOnLargeDir();
}
finally
@@ -357,7 +352,139 @@
}
}
-
+ public void testDLAOnExpiryNonDurableMessage() throws Exception
+ {
+ final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
+ SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+ AddressSettings addressSettings = new AddressSettings();
+
+ addressSettings.setDeadLetterAddress(ADDRESS_DLA);
+ addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+ addressSettings.setMaxDeliveryAttempts(1);
+
+ server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
+ session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, false);
+ clientFile.setExpiration(System.currentTimeMillis());
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumerExpired = session.createConsumer(ADDRESS);
+ // to kick expiry quicker than waiting reaper thread
+ assertNull(consumerExpired.receive(1000));
+ consumerExpired.close();
+
+ ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
+
+ ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertNotNull(msg1);
+ msg1.acknowledge();
+
+ session.rollback();
+
+ for (int j = 0; j < messageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+ }
+
+ consumerExpiry.close();
+
+ for (int i = 0; i < 10; i++)
+ {
+
+ consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+ msg1 = consumerExpiry.receive(5000);
+ assertNotNull(msg1);
+ msg1.acknowledge();
+
+ session.rollback();
+
+ for (int j = 0; j < messageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+ }
+
+ consumerExpiry.close();
+ }
+
+ session.close();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+ msg1 = consumerExpiry.receive(5000);
+ assertNotNull(msg1);
+ // msg1.acknowledge();
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ session.commit();
+
+ consumerExpiry.close();
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+
+ server.start();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLAOnExpiry() throws Exception
{
final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -371,7 +498,7 @@
server.start();
ClientSessionFactory sf = createInVMFactory();
-
+
SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
@@ -387,7 +514,6 @@
session.createQueue(ADDRESS, ADDRESS, true);
-
session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
@@ -506,11 +632,11 @@
server = createServer(true);
server.start();
-
+
AddressSettings addressSettings = new AddressSettings();
SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
-
+
addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
server.getAddressSettingsRepository().addMatch("*", addressSettings);
@@ -520,7 +646,7 @@
session = sf.createSession(false, false, false);
session.createQueue(ADDRESS, ADDRESS, true);
-
+
session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
ClientProducer producer = session.createProducer(ADDRESS);
@@ -1163,6 +1289,12 @@
{
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
+ session.close();
+ server.stop();
+ server.start();
+
+ session = sf.createSession(isXA, false, false);
+
session.rollback(xid);
}
else
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -95,7 +95,7 @@
List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
journal.start();
- journal.load(committedRecords, preparedTransactions);
+ journal.load(committedRecords, preparedTransactions, null);
assertEquals(0, committedRecords.size());
assertEquals(0, preparedTransactions.size());
Added: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -0,0 +1,561 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.largemessage;
+
+import java.lang.management.ManagementFactory;
+import java.util.concurrent.Executor;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalLargeServerMessage;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.security.HornetQSecurityManager;
+import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.SpawnedVMSupport;
+
+/**
+ * A LargeMessageCrashTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class LargeMessageCrashTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ static String QUEUE_NAME = "MY-QUEUE";
+
+ static int LARGE_MESSAGE_SIZE = 5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ static int PAGED_MESSAGE_SIZE = 1024;
+
+ static int NUMBER_OF_PAGES_MESSAGES = 100;
+
+ boolean failAfterRename;
+
+ // Static --------------------------------------------------------
+
+ public static void main(String args[])
+ {
+ LargeMessageCrashTest serverTest = new LargeMessageCrashTest();
+
+ serverTest.failAfterRename = false;
+
+ for (String arg : args)
+ {
+ if (arg.equals("failAfterRename"))
+ {
+ serverTest.failAfterRename = true;
+ }
+ }
+
+ for (String arg : args)
+ {
+ if (arg.equals("remoteJournalSendNonTransactional"))
+ {
+ serverTest.remoteJournalSendNonTransactional();
+ }
+ else if (arg.equals("remoteJournalSendTransactional"))
+ {
+ serverTest.remoteJournalSendTransactional();
+ }
+ else if (arg.equals("remotePreparedTransaction"))
+ {
+ serverTest.remotePreparedTransaction();
+ }
+ else if (arg.equals("remotePaging"))
+ {
+ serverTest.remotePaging();
+ }
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testJournalSendNonTransactional1() throws Exception
+ {
+ internalTestSend(false, false);
+ }
+
+ public void testJournalSendNonTransactional2() throws Exception
+ {
+ internalTestSend(true, false);
+ }
+
+ public void testJournalSendTransactional1() throws Exception
+ {
+ internalTestSend(false, true);
+ }
+
+ public void testJournalSendTransactional2() throws Exception
+ {
+ internalTestSend(true, true);
+ }
+
+ public void internalTestSend(boolean failureAfterRename, boolean transactional) throws Exception
+ {
+ if (transactional)
+ {
+ runExternalProcess(failureAfterRename, "remoteJournalSendTransactional");
+ }
+ else
+ {
+ runExternalProcess(failureAfterRename, "remoteJournalSendNonTransactional");
+ }
+
+ HornetQServer server = newServer(false);
+
+ try
+ {
+ server.start();
+
+ ClientSessionFactory cf = createInVMFactory();
+
+ ClientSession session = cf.createSession(true, true);
+
+ ClientConsumer cons = session.createConsumer(QUEUE_NAME);
+
+ session.start();
+
+ assertNull(cons.receive(100));
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testPreparedTransaction() throws Exception
+ {
+ runExternalProcess(false, "remotePreparedTransaction");
+
+ HornetQServer server = newServer(false);
+
+ server.start();
+
+ ClientSessionFactory cf = createInVMFactory();
+
+ ClientSession session = cf.createSession(true, false, false);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+
+ assertEquals(1, xids.length);
+
+ session.rollback(xids[0]);
+
+ session.close();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+
+ }
+
+ public void testPreparedTransactionAndCommit() throws Exception
+ {
+ runExternalProcess(false, "remotePreparedTransaction");
+
+ HornetQServer server = newServer(false);
+
+ server.start();
+
+ ClientSessionFactory cf = createInVMFactory();
+
+ ClientSession session = cf.createSession(true, false, false);
+
+ Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+
+ assertEquals(1, xids.length);
+
+ session.commit(xids[0], false);
+
+ session.close();
+
+ session = cf.createSession(false, false);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+
+ session.start();
+
+ ClientMessage msg = consumer.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
+ {
+ assertEquals(getSamplebyte(i), msg.getBody().readByte());
+ }
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+
+ }
+
+
+ public void testPaging() throws Exception
+ {
+ runExternalProcess(false, "remotePaging");
+
+ HornetQServer server = newServer(false);
+
+ server.start();
+
+ ClientSessionFactory cf = createInVMFactory();
+
+ ClientSession session = cf.createSession(false, true, true);
+
+ ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
+
+ session.start();
+
+ for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
+ {
+ ClientMessage msg = consumer.receive(50000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.commit();
+ }
+
+ ClientMessage msg = consumer.receiveImmediate();
+ assertNull(msg);
+
+ session.close();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ */
+ private void runExternalProcess(boolean failAfterRename, String methodName) throws Exception, InterruptedException
+ {
+ System.err.println("running external process...");
+
+ Process process = SpawnedVMSupport.spawnVM(this.getClass().getCanonicalName(),
+ "-Xms128m -Xmx128m ",
+ new String[] {},
+ true,
+ true,
+ methodName,
+ (failAfterRename ? "failAfterRename" : "regularFail"));
+
+ assertEquals(100, process.waitFor());
+ }
+
+ // Inner classes -------------------------------------------------
+
+ public void remoteJournalSendNonTransactional()
+ {
+
+ try
+ {
+ startServer(failAfterRename, true);
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = factory.createSession(true, true);
+
+ try
+ {
+ session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+ prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+
+ public void remoteJournalSendTransactional()
+ {
+ try
+ {
+ startServer(failAfterRename, true);
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = factory.createSession(false, false);
+
+ try
+ {
+ session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+ prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+
+ public void remotePreparedTransaction()
+ {
+ try
+ {
+ startServer(failAfterRename, false);
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = factory.createSession(true, false, false);
+
+ try
+ {
+ session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+ Xid xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ Runtime.getRuntime().halt(100);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+
+ public void remotePaging()
+ {
+ try
+ {
+ startServer(failAfterRename, true);
+
+ ClientSessionFactory factory = createInVMFactory();
+ ClientSession session = factory.createSession(false, false, false);
+
+ try
+ {
+ session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ ClientProducer prod = session.createProducer(QUEUE_NAME);
+
+ byte body[] = new byte[PAGED_MESSAGE_SIZE];
+ for (int i = 0; i < body.length; i++)
+ {
+ body[i] = getSamplebyte(i);
+ }
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.setBody(ChannelBuffers.wrappedBuffer(body));
+
+ for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
+ {
+ prod.send(msg);
+ }
+
+ session.commit();
+
+ session.close();
+
+ session = factory.createSession(false, true, true);
+ prod = session.createProducer(QUEUE_NAME);
+
+ prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
+
+ Runtime.getRuntime().halt(100);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+
+ protected ClientMessage createLargeClientMessage(final ClientSession session,
+ final long numberOfBytes,
+ final boolean persistent) throws Exception
+ {
+
+ ClientMessage clientMessage = session.createClientMessage(persistent);
+
+ clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
+
+ return clientMessage;
+ }
+
+ protected void startServer(boolean failAfterRename, boolean fail)
+ {
+ this.failAfterRename = failAfterRename;
+ try
+ {
+ HornetQServer server = newServer(fail);
+ server.start();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ private HornetQServer newServer(boolean failing)
+ {
+ Configuration configuration = createDefaultConfig(false);
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+ HornetQServer server;
+
+ if (failing)
+ {
+ server = new FailingHornetQServer(configuration, securityManager);
+ }
+ else
+ {
+ server = new HornetQServerImpl(configuration, securityManager);
+ }
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(10 * 1024);
+ defaultSetting.setMaxSizeBytes(100 * 1024);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+
+ /** This is hacking HornetQServerImpl,
+ * to make sure the server will fail right
+ * before the page-file was removed */
+ private class FailingHornetQServer extends HornetQServerImpl
+ {
+ FailingHornetQServer(final Configuration config, final HornetQSecurityManager securityManager)
+ {
+ super(config, ManagementFactory.getPlatformMBeanServer(), securityManager);
+ }
+
+ @Override
+ protected StorageManager createStorageManager()
+ {
+ return new FailingStorageManager(getConfiguration(), getExecutor());
+ }
+
+ }
+
+ private class FailingStorageManager extends JournalStorageManager
+ {
+
+ public FailingStorageManager(final Configuration config, final Executor executor)
+ {
+ super(config, executor);
+ }
+
+ @Override
+ public LargeServerMessage createLargeMessage()
+ {
+ return new FailinJournalLargeServerMessage(this);
+ }
+
+ }
+
+ private class FailinJournalLargeServerMessage extends JournalLargeServerMessage
+ {
+ /**
+ * @param storageManager
+ */
+ public FailinJournalLargeServerMessage(final JournalStorageManager storageManager)
+ {
+ super(storageManager);
+ }
+
+ @Override
+ public void setStored() throws Exception
+ {
+ if (failAfterRename)
+ {
+ super.setStored();
+ }
+ Runtime.getRuntime().halt(100);
+ }
+
+ }
+
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageTestBase.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -72,7 +72,7 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected void tearDown() throws Exception
{
if (server != null && server.isStarted())
@@ -86,9 +86,9 @@
log.warn(e.getMessage(), e);
}
}
-
+
server = null;
-
+
super.tearDown();
}
@@ -180,8 +180,21 @@
if (isXA)
{
+
session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+ server.start();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
session.rollback(xid);
+ producer = session.createProducer(ADDRESS);
xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
}
@@ -198,6 +211,20 @@
if (isXA)
{
session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+
+ session.close();
+
+ if (realFiles)
+ {
+ server.stop();
+ server.start();
+ }
+
+ session = sf.createSession(null, null, isXA, false, false, preAck, 0);
+
+ producer = session.createProducer(ADDRESS);
+
session.commit(xid, true);
xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
@@ -325,7 +352,7 @@
{
log.debug("Read " + b + " bytes");
}
-
+
assertEquals(getSamplebyte(b), buffer.readByte());
}
}
@@ -614,37 +641,6 @@
consumer.close();
}
- /**
- * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
- */
- protected void validateNoFilesOnLargeDir(int expect) throws Exception
- {
- File largeMessagesFileDir = new File(getLargeMessagesDir());
-
- // Deleting the file is async... we keep looking for a period of the time until the file is really gone
- for (int i = 0; i < 100; i++)
- {
- if (largeMessagesFileDir.listFiles().length != expect)
- {
- Thread.sleep(10);
- }
- else
- {
- break;
- }
- }
-
- assertEquals(expect, largeMessagesFileDir.listFiles().length);
- }
-
- /**
- * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
- */
- protected void validateNoFilesOnLargeDir() throws Exception
- {
- validateNoFilesOnLargeDir(0);
- }
-
protected OutputStream createFakeOutputStream() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/performance/journal/JournalImplTestUnit.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -170,7 +170,7 @@
stopJournal();
createJournal();
startJournal();
- journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+ journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
@@ -195,7 +195,7 @@
journal.start();
- journal.load(new ArrayList<RecordInfo>(), null);
+ journal.load(new ArrayList<RecordInfo>(), null, null);
try
{
@@ -256,7 +256,7 @@
journal.start();
- journal.load(new ArrayList<RecordInfo>(), null);
+ journal.load(new ArrayList<RecordInfo>(), null, null);
log.debug("Adding data");
SimpleEncoding data = new SimpleEncoding(700, (byte)'j');
@@ -279,7 +279,7 @@
journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "hornetq-data", "hq", 5000);
journal.start();
- journal.load(new ArrayList<RecordInfo>(), null);
+ journal.load(new ArrayList<RecordInfo>(), null, null);
journal.stop();
}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AddAndRemoveStressTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -15,6 +15,7 @@
import java.io.File;
import java.util.ArrayList;
+import java.util.List;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -53,6 +54,10 @@
public void updateRecord(final RecordInfo info)
{
}
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
};
private static final long NUMBER_OF_MESSAGES = 210000l;
@@ -122,7 +127,7 @@
ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
- impl.load(info, trans);
+ impl.load(info, trans, null);
impl.forceMoveNextFile();
@@ -196,7 +201,7 @@
ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
- impl.load(info, trans);
+ impl.load(info, trans, null);
if (info.size() > 0)
{
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/ValidateTransactionHealthTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -15,6 +15,7 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.List;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.journal.LoaderCallback;
@@ -234,6 +235,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.TransactionFailureCallback#failedTransaction(long, java.util.List, java.util.List)
+ */
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
}
}
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/remote/RemoteJournalAppender.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -14,6 +14,7 @@
package org.hornetq.tests.stress.journal.remote;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -98,6 +99,10 @@
public void updateRecord(RecordInfo info)
{
}
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
});
LocalThreads threads[] = new LocalThreads[numberOfThreads];
Modified: trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -167,7 +167,7 @@
stopJournal();
createJournal();
startJournal();
- journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+ journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -15,6 +15,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
@@ -27,6 +28,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
@@ -61,6 +63,10 @@
public void updateRecord(RecordInfo info)
{
}
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
};
// Attributes ----------------------------------------------------
@@ -70,6 +76,8 @@
JournalImpl journalImpl = null;
private ArrayList<RecordInfo> records = null;
+
+ private ArrayList<Long> incompleteTransactions = null;
private ArrayList<PreparedTransactionInfo> transactions = null;
@@ -436,6 +444,9 @@
assertEquals(0, records.size());
assertEquals(0, transactions.size());
+ assertEquals(2, incompleteTransactions.size());
+ assertEquals((Long)77l, incompleteTransactions.get(0));
+ assertEquals((Long)78l, incompleteTransactions.get(1));
try
{
@@ -1284,7 +1295,7 @@
ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
- impl.load(info, trans);
+ impl.load(info, trans, null);
assertEquals(0, info.size());
assertEquals(0, trans.size());
@@ -1303,6 +1314,8 @@
records = new ArrayList<RecordInfo>();
transactions = new ArrayList<PreparedTransactionInfo>();
+
+ incompleteTransactions = new ArrayList<Long>();
factory = null;
@@ -1327,6 +1340,8 @@
records = null;
transactions = null;
+
+ incompleteTransactions = null;
factory = null;
@@ -1360,8 +1375,17 @@
records.clear();
transactions.clear();
+ incompleteTransactions.clear();
- journalImpl.load(records, transactions);
+ journalImpl.load(records, transactions, new TransactionFailureCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ System.out.println("records.length = " + records.size());
+ incompleteTransactions.add(transactionID);
+ }
+
+ });
}
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -355,7 +355,7 @@
records.clear();
transactions.clear();
- journalImpl.load(records, transactions);
+ journalImpl.load(records, transactions, null);
}
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -167,7 +167,7 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
- journal.load(committedRecords, preparedTransactions);
+ journal.load(committedRecords, preparedTransactions, null);
checkRecordsEquivalent(records, committedRecords);
@@ -199,7 +199,7 @@
protected void load() throws Exception
{
- journal.load(null, null);
+ journal.load(null, null, null);
}
protected void add(final long... arguments) throws Exception
Modified: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -53,7 +53,7 @@
journal.start();
- journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+ journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>(), null);
BatchingIDGenerator batch = new BatchingIDGenerator(0, 1000, journal);
long id1 = batch.generateID();
@@ -134,7 +134,7 @@
ArrayList<PreparedTransactionInfo> tx = new ArrayList<PreparedTransactionInfo>();
journal.start();
- journal.load(records, tx);
+ journal.load(records, tx, null);
assertEquals(0, tx.size());
Modified: trunk/tests/src/org/hornetq/tests/util/JournalExample.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JournalExample.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/util/JournalExample.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -73,7 +73,7 @@
ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
journalExample.start();
System.out.println("Loading records and creating data files");
- journalExample.load(committedRecords, preparedTransactions);
+ journalExample.load(committedRecords, preparedTransactions, null);
System.out.println("Loaded Record List:");
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -69,7 +69,7 @@
ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
journal.start();
- journal.load(records, prepared);
+ journal.load(records, prepared, null);
if (prepared.size() > 0)
{
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-09-22 15:08:41 UTC (rev 7979)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-09-22 23:39:47 UTC (rev 7980)
@@ -13,6 +13,7 @@
package org.hornetq.tests.util;
+import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
import java.util.HashMap;
@@ -313,6 +314,39 @@
message.getBody().writeBytes(b);
return message;
}
+
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir(int expect) throws Exception
+ {
+ File largeMessagesFileDir = new File(getLargeMessagesDir());
+
+ // Deleting the file is async... we keep looking for a period of the time until the file is really gone
+ for (int i = 0; i < 100; i++)
+ {
+ if (largeMessagesFileDir.listFiles().length != expect)
+ {
+ Thread.sleep(10);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ assertEquals(expect, largeMessagesFileDir.listFiles().length);
+ }
+
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir() throws Exception
+ {
+ validateNoFilesOnLargeDir(0);
+ }
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the hornetq-commits
mailing list