Author: clebert.suconic(a)jboss.com
Date: 2011-09-24 02:22:26 -0400 (Sat, 24 Sep 2011)
New Revision: 11405
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
JBPAPP-7161 - fixed partial streamed large messages
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import javax.transaction.xa.Xid;
@@ -39,6 +40,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -95,6 +97,12 @@
long getCurrentUniqueID();
+ // Confirms that a large message was finished
+ void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long
recordID) throws Exception;
+
+ // Confirms that a large message was finished
+ void confirmPendingLargeMessage(long recordID) throws Exception;
+
void storeMessage(ServerMessage message) throws Exception;
void storeReference(long queueID, long messageID, boolean last) throws Exception;
@@ -125,8 +133,6 @@
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws
Exception;
- void deleteMessageTransactional(long txID, long queueID, long messageID) throws
Exception;
-
void storeDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID,
long recordID) throws Exception;
void updateDuplicateIDTransactional(long txID, SimpleString address, byte[] duplID,
long recordID) throws Exception;
@@ -141,8 +147,9 @@
* @param message This is a temporary message that holds the parsed properties.
* The remoting layer can't create a ServerMessage directly, then this will
be replaced.
* @return
+ * @throws Exception
*/
- LargeServerMessage createLargeMessage(long id, MessageInternal message);
+ LargeServerMessage createLargeMessage(long id, MessageInternal message) throws
Exception;
void prepare(long txID, Xid xid) throws Exception;
@@ -169,7 +176,8 @@
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo>
queueInfos,
- final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
+ final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap,
+ final Set<Pair<Long, Long>>
pendingLargeMessages) throws Exception;
long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -53,7 +53,6 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
-import org.hornetq.core.journal.impl.ExportJournal;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.JournalReaderCallback;
@@ -98,7 +97,6 @@
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
-import org.hornetq.utils.UUID;
import org.hornetq.utils.XidCodecSupport;
/**
@@ -130,6 +128,10 @@
public static final byte SECURITY_RECORD = 26;
// Message journal record types
+
+ // This is used when a large message is created but not yet stored on the system.
+ // We use this to avoid temporary files missing
+ public static final byte ADD_LARGE_MESSAGE_PENDING = 29;
public static final byte ADD_LARGE_MESSAGE = 30;
@@ -155,8 +157,6 @@
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
- private UUID persistentID;
-
private final BatchingIDGenerator idGenerator;
private final ReplicationManager replicator;
@@ -453,7 +453,7 @@
}
}
- public LargeServerMessage createLargeMessage(final long id, final MessageInternal
message)
+ public LargeServerMessage createLargeMessage(final long id, final MessageInternal
message) throws Exception
{
if (isReplicated())
{
@@ -466,11 +466,45 @@
largeMessage.setMessageID(id);
+ if (largeMessage.isDurable())
+ {
+ // We store a marker on the journal that the large file is pending
+ long pendingRecordID = storePendingLargeMessage(id);
+
+ largeMessage.setPendingRecordID(pendingRecordID);
+ }
+
return largeMessage;
}
// Non transactional operations
-
+
+ public long storePendingLargeMessage(final long messageID) throws Exception
+ {
+ long recordID = generateUniqueID();
+
+ messageJournal.appendAddRecord(recordID,
+ ADD_LARGE_MESSAGE_PENDING,
+ new PendingLargeMessageEncoding(messageID),
+ true,
+ getContext(true));
+
+ return recordID;
+ }
+
+ public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long
recordID) throws Exception
+ {
+ installLargeMessageConfirmationOnTX(tx, recordID);
+ messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID, new
DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ }
+
+
+ /** We don't need messageID now but we are likely to need it we ever decide to
support a database */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ messageJournal.appendDeleteRecord(recordID, true, getContext());
+ }
+
public void storeMessage(final ServerMessage message) throws Exception
{
if (message.getMessageID() <= 0)
@@ -690,11 +724,6 @@
encoding);
}
- public void deleteMessageTransactional(final long txID, final long queueID, final long
messageID) throws Exception
- {
- messageJournal.appendDeleteRecordTransactional(txID, messageID, new
DeleteEncoding(queueID));
- }
-
public void prepare(final long txID, final Xid xid) throws Exception
{
messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional,
getContext(syncTransactional));
@@ -830,7 +859,8 @@
final ResourceManager
resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo>
queueInfos,
- final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap,
+ final Set<Pair<Long,
Long>> pendingLargeMessages) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -869,6 +899,19 @@
switch (recordType)
{
+ case ADD_LARGE_MESSAGE_PENDING:
+ {
+ PendingLargeMessageEncoding pending = new PendingLargeMessageEncoding();
+
+ pending.decode(buff);
+
+ if (pendingLargeMessages != null)
+ {
+ // it could be null on tests, and we don't need anything on that
case
+ pendingLargeMessages.add(new Pair<Long, Long>(record.id,
pending.largeMessageID));
+ }
+ break;
+ }
case ADD_LARGE_MESSAGE:
{
LargeServerMessage largeMessage = parseLargeMessage(messages, buff);
@@ -1188,7 +1231,8 @@
queueInfos,
preparedTransactions,
duplicateIDMap,
- pageSubscriptions);
+ pageSubscriptions,
+ pendingLargeMessages);
for (PageSubscription sub : pageSubscriptions.values())
{
@@ -1547,7 +1591,7 @@
// Package protected ---------------------------------------------
// This should be accessed from this package only
- void deleteFile(final SequentialFile file)
+ void deleteLargeMessage(final SequentialFile file)
{
Runnable deleteAction = new Runnable()
{
@@ -1656,7 +1700,8 @@
final Map<Long, QueueBindingInfo>
queueInfos,
final List<PreparedTransactionInfo>
preparedTransactions,
final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap,
- final Map<Long, PageSubscription>
pageSubscriptions) throws Exception
+ final Map<Long, PageSubscription>
pageSubscriptions,
+ final Set<Pair<Long, Long>>
pendingLargeMessages) throws Exception
{
// recover prepared transactions
for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
@@ -1746,7 +1791,6 @@
throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
}
- // TODO - this involves a scan - we should find a quicker way of doing
it
MessageReference removed = queue.removeReferenceWithID(messageID);
if (removed == null)
@@ -1867,32 +1911,29 @@
}
}
- for (RecordInfo record : preparedTransaction.recordsToDelete)
+ for (RecordInfo recordDeleted : preparedTransaction.recordsToDelete)
{
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- long messageID = record.id;
-
- DeleteEncoding encoding = new DeleteEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
+ byte[] data = recordDeleted.data;
+
+ if (data.length > 0)
{
- throw new IllegalStateException("Cannot find queue with id " +
encoding.queueID);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ byte b = buff.readByte();
+
+ switch (b)
+ {
+ case ADD_LARGE_MESSAGE_PENDING:
+ {
+ long messageID = buff.readLong();
+ pendingLargeMessages.remove(new Pair<Long,
Long>(recordDeleted.id, messageID));
+ installLargeMessageConfirmationOnTX(tx, recordDeleted.id);
+ break;
+ }
+ default:
+ log.warn("can't locate recordType=" + b + " on
loadPreparedTransaction//deleteRecords");
+ }
}
-
- MessageReference removed = queue.removeReferenceWithID(messageID);
-
- if (removed != null)
- {
- referencesToAck.add(removed);
- }
-
+
}
for (MessageReference ack : referencesToAck)
@@ -2301,6 +2342,50 @@
}
+ public static class PendingLargeMessageEncoding implements EncodingSupport
+ {
+ public long largeMessageID;
+
+ public PendingLargeMessageEncoding(final long pendingLargeMessageID)
+ {
+ this.largeMessageID = pendingLargeMessageID;
+ }
+
+ public PendingLargeMessageEncoding()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.spi.core.remoting.HornetQBuffer)
+ */
+ public void decode(final HornetQBuffer buffer)
+ {
+ largeMessageID = buffer.readLong();
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.spi.core.remoting.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(largeMessageID);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG;
+ }
+
+ public String toString()
+ {
+ return "PendingLargeMessageEncoding::MessageID=" + largeMessageID;
+ }
+
+ }
+
public static class DeliveryCountUpdateEncoding implements EncodingSupport
{
public long queueID;
@@ -2388,17 +2473,48 @@
}
- public static class DeleteEncoding extends QueueEncoding
+ public static class DeleteEncoding implements EncodingSupport
{
+ public byte recordType;
+
+ public long id;
+
public DeleteEncoding()
{
super();
}
- public DeleteEncoding(final long queueID)
+ public DeleteEncoding(final byte recordType, final long id)
{
- super(queueID);
+ this.recordType = recordType;
+ this.id = id;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_BYTE + DataConstants.SIZE_LONG;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeByte(recordType);
+ buffer.writeLong(id);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.EncodingSupport#decode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void decode(HornetQBuffer buffer)
+ {
+ recordType = buffer.readByte();
+ id = buffer.readLong();
+ }
}
public static class RefEncoding extends QueueEncoding
@@ -2866,7 +2982,15 @@
switch (rec)
{
+ case ADD_LARGE_MESSAGE_PENDING:
+ {
+ PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
+ lmEncoding.decode(buffer);
+
+ return lmEncoding;
+ }
case ADD_LARGE_MESSAGE:
+ {
LargeServerMessage largeMessage = new LargeServerMessageImpl(null);
@@ -2875,19 +2999,20 @@
messageEncoding.decode(buffer);
return new MessageDescribe(largeMessage);
-
+ }
case ADD_MESSAGE:
+ {
ServerMessage message = new ServerMessageImpl(rec, 50);
message.decode(buffer);
return new MessageDescribe(message);
-
+ }
case ADD_REF:
{
final RefEncoding encoding = new RefEncoding();
encoding.decode(buffer);
- return new ReferenceDescribe(encoding);
+ return encoding;
}
case ACKNOWLEDGE_REF:
@@ -3341,5 +3466,90 @@
journal.stop();
}
+
+ private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
+ {
+ TXLargeMessageConfirmationOperation txoper =
(TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+ if (txoper == null)
+ {
+ txoper = new TXLargeMessageConfirmationOperation();
+ tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
+ }
+ txoper.confirmedMessages.add(recordID);
+ }
+
+
+
+ class TXLargeMessageConfirmationOperation implements TransactionOperation
+ {
+
+ public List<Long> confirmedMessages = new LinkedList<Long>();
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#beforePrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterPrepare(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#beforeCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterCommit(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterCommit(Transaction tx)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#beforeRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#afterRollback(org.hornetq.core.transaction.Transaction)
+ */
+ public void afterRollback(Transaction tx)
+ {
+ for (Long msg : confirmedMessages)
+ {
+ try
+ {
+ JournalStorageManager.this.confirmPendingLargeMessage(msg);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error while confirming large message completion on rollback
for recordID=" + msg +
+ "->" +
+ e.getMessage(),
+ e);
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
+ */
+ public List<MessageReference> getRelatedMessageReferences()
+ {
+ return null;
+ }
+
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -51,6 +51,8 @@
private LargeServerMessage linkMessage;
+ private long pendingRecordID = -1;
+
private boolean paged;
// We should only use the NIO implementation on the Journal
@@ -87,6 +89,19 @@
// Public --------------------------------------------------------
+ /**
+ * @param pendingRecordID
+ */
+ public void setPendingRecordID(long pendingRecordID)
+ {
+ this.pendingRecordID = pendingRecordID;
+ }
+
+ public long getPendingRecordID()
+ {
+ return this.pendingRecordID;
+ }
+
public void setPaged()
{
paged = true;
@@ -228,7 +243,12 @@
{
validateFile();
releaseResources();
- storageManager.deleteFile(file);
+ storageManager.deleteLargeMessage(file);
+ if (pendingRecordID >= 0)
+ {
+ storageManager.confirmPendingLargeMessage(pendingRecordID);
+ pendingRecordID = -1;
+ }
}
public boolean isFileExists() throws Exception
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -164,7 +164,22 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#setPendingRecordID(long)
+ */
+ public void setPendingRecordID(long pendingRecordID)
+ {
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.LargeServerMessage#getPendingRecordID()
+ */
+ public long getPendingRecordID()
+ {
+ return -1;
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -16,6 +16,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,6 +46,7 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -296,7 +298,8 @@
final ResourceManager
resourceManager,
final Map<Long, Queue> queues,
Map<Long, QueueBindingInfo>
queueInfos,
- final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap,
+ Set<Pair<Long, Long>>
pendingLM) throws Exception
{
return new JournalLoadInformation();
}
@@ -569,4 +572,18 @@
}
-}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction,
long, long)
+ */
+ public void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long
recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
+ */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ }
+
+ }
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -47,6 +47,7 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
@@ -931,6 +932,10 @@
if (store.page(message, context, entry.getValue()))
{
+ if (message.isLargeMessage())
+ {
+ confirmLargeMessageSend(tx, message);
+ }
// We need to kick delivery so the Queues may check for the cursors case they
are empty
schedulePageDelivery(tx, entry);
@@ -984,6 +989,11 @@
{
storageManager.storeMessage(message);
}
+
+ if (message.isLargeMessage())
+ {
+ confirmLargeMessageSend(tx, message);
+ }
}
if (tx != null)
@@ -1040,6 +1050,31 @@
}
/**
+ * @param tx
+ * @param message
+ * @throws Exception
+ */
+ private void confirmLargeMessageSend(Transaction tx, final ServerMessage message)
throws Exception
+ {
+ LargeServerMessage largeServerMessage = (LargeServerMessage)message;
+ if (largeServerMessage.getPendingRecordID() >= 0)
+ {
+ if (tx == null)
+ {
+
storageManager.confirmPendingLargeMessage(largeServerMessage.getPendingRecordID());
+ }
+ else
+ {
+
+ storageManager.confirmPendingLargeMessageTX(tx,
+
largeServerMessage.getMessageID(),
+
largeServerMessage.getPendingRecordID());
+ }
+ largeServerMessage.setPendingRecordID(-1);
+ }
+ }
+
+ /**
* This will kick a delivery async on the queue, so the queue may have a chance to
depage messages
* @param tx
* @param entry
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/LargeServerMessage.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -28,6 +28,10 @@
/** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we
specify a link between the messages */
void setLinkedMessage(LargeServerMessage message);
+
+ void setPendingRecordID(long pendingRecordID);
+
+ long getPendingRecordID();
boolean isFileExists() throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -92,6 +92,7 @@
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MemoryManager;
import org.hornetq.core.server.NodeManager;
import org.hornetq.core.server.Queue;
@@ -1545,13 +1546,16 @@
}
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
+
+ HashSet<Pair<Long, Long>> pendingLargeMessages = new
HashSet<Pair<Long, Long>>();
journalInfo[1] = storageManager.loadMessageJournal(postOffice,
pagingManager,
resourceManager,
queues,
queueBindingInfosMap,
- duplicateIDMap);
+ duplicateIDMap,
+ pendingLargeMessages);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry :
duplicateIDMap.entrySet())
{
@@ -1564,6 +1568,16 @@
cache.load(entry.getValue());
}
}
+
+ for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
+ {
+ log.info("Deleting pending large message as it wasn't completed:"
+ msgToDelete);
+ LargeServerMessage msg = storageManager.createLargeMessage();
+ msg.setMessageID(msgToDelete.b);
+ msg.setPendingRecordID(msgToDelete.a);
+ msg.setDurable(true);
+ msg.deleteFile();
+ }
return journalInfo;
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -1292,6 +1292,11 @@
// Public
// ----------------------------------------------------------------------------
+
+ public void clearLargeMessage()
+ {
+ currentLargeMessage = null;
+ }
// Private
// ----------------------------------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -24,6 +24,8 @@
*/
public class TransactionPropertyIndexes
{
+
+ public static final int LARGE_MESSAGE_CONFIRMATIONS = 1;
public static final int PAGE_SYNC = 2;
Added:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
(rev 0)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -0,0 +1,539 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerSessionImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A LargeMessageTest
+ *
+ * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
+ *
+ * Created 29-Sep-08 4:04:10 PM
+ *
+ *
+ */
+public class InterruptedLargeMessageTest extends LargeMessageTestBase
+{
+ // Constants -----------------------------------------------------
+
+ final static int RECEIVE_WAIT_TIME = 60000;
+
+ private final int LARGE_MESSAGE_SIZE = HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE *
3;
+
+ // Attributes ----------------------------------------------------
+
+ static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ // Static --------------------------------------------------------
+ private final Logger log = Logger.getLogger(LargeMessageTest.class);
+
+ protected ServerLocator locator;
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = true;
+ clearData();
+ locator = createFactory(isNetty());
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ locator.close();
+ super.tearDown();
+ }
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ public void testInterruptLargeMessageSend() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = true;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.getConfiguration()
+ .getInterceptorClassNames()
+ .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(false);
+ locator.setBlockOnDurableSend(false);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.setExpiration(System.currentTimeMillis());
+
+ producer.send(clientFile);
+
+ Thread.sleep(500);
+
+ for (ServerSession srvSession : server.getSessions())
+ {
+ ((ServerSessionImpl)srvSession).clearLargeMessage();
+ }
+
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendNonPersistentQueue() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ // server.getConfiguration()
+ // .getInterceptorClassNames()
+ // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, false);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ session.close();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int h = 0; h < 5; h++)
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientMessage = cons.receive(5000);
+ assertNotNull(clientMessage);
+ for (int countByte = 0; countByte < messageSize; countByte++)
+ {
+ assertEquals(getSamplebyte(countByte),
clientMessage.getBodyBuffer().readByte());
+ }
+ clientMessage.acknowledge();
+ }
+ session.rollback();
+ }
+
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendPaging() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
+
+ // server.getConfiguration()
+ // .getInterceptorClassNames()
+ // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, true, true);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ server.getPagingManager().getPageStore(ADDRESS).startPaging();
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+ }
+ session.commit();
+
+ for (int h = 0; h < 5; h++)
+ {
+ session.close();
+
+ sf.close();
+
+ server.stop();
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false);
+
+ ClientConsumer cons = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage clientMessage = cons.receive(5000);
+
+ System.out.println("msg " + clientMessage);
+ assertNotNull(clientMessage);
+ for (int countByte = 0; countByte < messageSize; countByte++)
+ {
+ assertEquals(getSamplebyte(countByte),
clientMessage.getBodyBuffer().readByte());
+ }
+ clientMessage.acknowledge();
+ }
+ if (h == 4)
+ {
+ session.commit();
+ }
+ else
+ {
+ session.rollback();
+ }
+ }
+
+ server.stop(false);
+ server.start();
+
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testSendPreparedXA() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ LargeMessageTestInterceptorIgnoreLastPacket.interruptMessages = false;
+
+ try
+ {
+ server = createServer(true, createDefaultConfig(isNetty()), 10000, 20000, new
HashMap<String, AddressSettings>());
+
+ // server.getConfiguration()
+ // .getInterceptorClassNames()
+ // .add(LargeMessageTestInterceptorIgnoreLastPacket.class.getName());
+
+ server.start();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+
+ Xid xid1 = newXID();
+ Xid xid2 = newXID();
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ session.start(xid1, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+ clientFile.putIntProperty("txid", 1);
+ producer.send(clientFile);
+ }
+ session.end(xid1, XAResource.TMSUCCESS);
+
+ session.prepare(xid1);
+
+
+ session.start(xid2, XAResource.TMNOFLAGS);
+
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+ clientFile.putIntProperty("txid", 2);
+ producer.send(clientFile);
+ }
+ session.end(xid2, XAResource.TMSUCCESS);
+
+ session.prepare(xid2);
+
+ session.close();
+ sf.close();
+
+ server.stop(false);
+ server.start();
+
+ for (int start = 0 ; start < 2; start++)
+ {
+
+ sf = locator.createSessionFactory();
+
+ if (start == 0)
+ {
+ session = sf.createSession(true, false, false);
+ session.commit(xid1, false);
+ session.close();
+ }
+
+ session = sf.createSession(false, false, false);
+ ClientConsumer cons1 = session.createConsumer(ADDRESS);
+ session.start();
+ for (int i = 0 ; i < 10; i++)
+ {
+ ClientMessage msg = cons1.receive(5000);
+ assertNotNull(msg);
+ assertEquals(1, msg.getIntProperty("txid").intValue());
+ msg.acknowledge();
+ }
+
+ if (start == 1)
+ {
+ session.commit();
+ }
+ else
+ {
+ session.rollback();
+ }
+
+ session.close();
+ sf.close();
+
+ server.stop();
+ server.start();
+ }
+ server.stop();
+
+ validateNoFilesOnLargeDir(10);
+
+ server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, false, false);
+ session.rollback(xid2);
+
+ sf.close();
+
+ server.stop();
+ server.start();
+ server.stop();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public static class LargeMessageTestInterceptorIgnoreLastPacket implements
Interceptor
+ {
+
+ public static boolean interruptMessages = false;
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.api.core.Interceptor#intercept(org.hornetq.core.protocol.core.Packet,
org.hornetq.spi.core.protocol.RemotingConnection)
+ */
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (packet instanceof SessionSendContinuationMessage)
+ {
+ SessionSendContinuationMessage msg = (SessionSendContinuationMessage)packet;
+ if (!msg.isContinues() && interruptMessages)
+ {
+ System.out.println("Ignored a message");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ }
+
+}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -2973,15 +2973,11 @@
super.setUp();
clearData();
locator = createFactory(isNetty());
-
log.info("\n*********************************************************************************\n
Starting " + getName() +
-
"\n*********************************************************************************");
}
@Override
protected void tearDown() throws Exception
{
-
log.info("\n*********************************************************************************\nDone
with " + getName() +
-
"\n*********************************************************************************");
locator.close();
super.tearDown();
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -76,7 +76,7 @@
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null,
null);
assertEquals(98, deletedMessage.size());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -101,7 +101,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null, null);
journal.stop();
@@ -111,7 +111,7 @@
queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(postOffice, null, null, queues, null, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null, null, null);
queueBindingInfos = new ArrayList<QueueBindingInfo>();
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/persistence/StorageManagerTestBase.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -128,7 +128,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null);
+ journal.loadMessageJournal(new FakePostOffice(), null, null, queues, null, null,
null);
}
/**
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -18,6 +18,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -72,6 +73,7 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.core.transaction.Transaction;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
@@ -1231,7 +1233,8 @@
final ResourceManager
resourceManager,
final Map<Long, Queue>
queues,
Map<Long, QueueBindingInfo>
queueInfos,
- final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap,
+ Set<Pair<Long, Long>>
pendingLargeMessages) throws Exception
{
return new JournalLoadInformation();
}
@@ -1680,8 +1683,22 @@
}
- }
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction,
long, long)
+ */
+ public void confirmPendingLargeMessageTX(Transaction transaction, long messageID,
long recordID) throws Exception
+ {
+ }
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessage(long)
+ */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ }
+
+ }
+
class FakeStoreFactory implements PagingStoreFactory
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -112,7 +112,8 @@
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
null,
- mapDups);
+ mapDups,
+ null);
Assert.assertEquals(0, mapDups.size());
@@ -134,7 +135,8 @@
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
null,
- mapDups);
+ mapDups,
+ null);
Assert.assertEquals(1, mapDups.size());
@@ -163,7 +165,8 @@
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
null,
- mapDups);
+ mapDups,
+ null);
Assert.assertEquals(1, mapDups.size());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-23
16:09:48 UTC (rev 11404)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-24
06:22:26 UTC (rev 11405)
@@ -306,9 +306,12 @@
server = HornetQServers.newHornetQServer(configuration, false);
}
- for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ if (settings != null)
{
- server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ for (Map.Entry<String, AddressSettings> setting : settings.entrySet())
+ {
+ server.getAddressSettingsRepository().addMatch(setting.getKey(),
setting.getValue());
+ }
}
AddressSettings defaultSetting = new AddressSettings();