Author: timfox
Date: 2009-11-03 03:45:42 -0500 (Tue, 03 Nov 2009)
New Revision: 8189
Removed:
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
removed over-complex large message cleanup code
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -36,6 +36,9 @@
*
*
*/
+
+
+//FIXME - this class should be renamed to just large message
public class JournalLargeServerMessage extends ServerMessageImpl implements
LargeServerMessage
{
// Constants -----------------------------------------------------
@@ -54,6 +57,8 @@
private SequentialFile file;
private long bodySize = -1;
+
+ private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -177,29 +182,18 @@
public void decode(final HornetQBuffer buffer)
{
file = null;
- try
- {
- this.setStored();
- }
- catch (Exception e)
- {
- // File still null, this wasn't supposed to happen ever.
- log.warn(e.getMessage(), e);
- }
decodeHeadersAndProperties(buffer);
}
- private final AtomicInteger delayDeletionCount = new AtomicInteger(0);
-
public synchronized void incrementDelayDeletionCount()
{
this.delayDeletionCount.incrementAndGet();
}
-
+
public synchronized void decrementDelayDeletionCount() throws Exception
{
int count = this.delayDeletionCount.decrementAndGet();
-
+
if (count == 0)
{
checkDelete();
@@ -207,7 +201,7 @@
}
private void checkDelete() throws Exception
- {
+ {
if (getRefCount() <= 0)
{
if (linkMessage != null)
@@ -265,7 +259,7 @@
public boolean isFileExists() throws Exception
{
- SequentialFile localfile = storageManager.createFileForLargeMessage(getMessageID(),
isStored());
+ SequentialFile localfile =
storageManager.createFileForLargeMessage(getMessageID());
return localfile.exists();
}
@@ -284,18 +278,6 @@
return memoryEstimate;
}
- @Override
- public void setStored() throws Exception
- {
- super.setStored();
- releaseResources();
-
- if (file != null && linkMessage == null)
- {
- storageManager.completeLargeMessage(this);
- }
- }
-
public synchronized void releaseResources()
{
if (file != null && file.isOpen())
@@ -323,12 +305,12 @@
idToUse = linkMessage.getMessageID();
}
- SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse,
isStored());
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse);
ServerMessage newMessage = new JournalLargeServerMessage(linkMessage == null ?
this
-
: (JournalLargeServerMessage)linkMessage,
- newfile,
- newID);
+ :
(JournalLargeServerMessage)linkMessage,
+ newfile,
+ newID);
return newMessage;
}
@@ -360,7 +342,7 @@
throw new RuntimeException("MessageID not set on LargeMessage");
}
- file = storageManager.createFileForLargeMessage(getMessageID(), isStored());
+ file = storageManager.createFileForLargeMessage(getMessageID());
file.open();
@@ -369,14 +351,6 @@
}
}
-// /* (non-Javadoc)
-// * @see org.hornetq.core.server.LargeServerMessage#getLinkedMessage()
-// */
-// public LargeServerMessage getLinkedMessage()
-// {
-// return linkMessage;
-// }
-
/* (non-Javadoc)
* @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
*/
@@ -390,7 +364,7 @@
this.linkMessage = message;
- file = storageManager.createFileForLargeMessage(message.getMessageID(), true);
+ file = storageManager.createFileForLargeMessage(message.getMessageID());
try
{
file.open();
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -95,8 +95,9 @@
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
- //grouping journal record type
+ // grouping journal record type
public static final byte GROUP_RECORD = 41;
+
// Bindings journal record type
public static final byte QUEUE_BINDING_RECORD = 21;
@@ -190,13 +191,13 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
Journal localBindings = new JournalImpl(1024 * 1024,
- 2,
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- bindingsFF,
- "hornetq-bindings",
- "bindings",
- 1);
+ 2,
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ bindingsFF,
+ "hornetq-bindings",
+ "bindings",
+ 1);
if (replicator != null)
{
@@ -254,17 +255,17 @@
}
else
{
- this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
+ this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
- config.getJournalMinFiles(),
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- journalFF,
- "hornetq-data",
- "hq",
- config.getJournalMaxAIO());
+ config.getJournalMinFiles(),
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ journalFF,
+ "hornetq-data",
+ "hq",
+ config.getJournalMaxAIO());
if (replicator != null)
{
@@ -471,8 +472,8 @@
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
- ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-
ref.getQueue().getID());
+ ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
+
.getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
@@ -548,12 +549,12 @@
messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true);
return id;
}
-
+
public void deleteHeuristicCompletion(long id) throws Exception
{
messageJournal.appendDeleteRecord(id, true);
}
-
+
public void deletePageTransactional(final long txID, final long recordID) throws
Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
@@ -561,8 +562,8 @@
public void updateScheduledDeliveryTimeTransactional(final long txID, final
MessageReference ref) throws Exception
{
- ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
-
ref.getQueue().getID());
+ ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
+
.getID());
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
@@ -627,19 +628,6 @@
updateInfo,
syncNonTransactional);
}
- /**
- * @param journalLargeServerMessage
- * @throws Exception
- */
- public void completeLargeMessage(JournalLargeServerMessage message) throws Exception
- {
- if (isReplicated())
- {
- replicator.largeMessageEnd(message.getMessageID());
- }
- SequentialFile fileToRename = createFileForLargeMessage(message.getMessageID(),
true);
- message.getFile().renameTo(fileToRename.getFileName());
- }
private static final class AddMessageRecord
{
@@ -654,11 +642,11 @@
int deliveryCount;
}
-
+
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
{
private final Map<Long, ServerMessage> messages;
-
+
public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
{
super();
@@ -687,7 +675,7 @@
}
}
}
-
+
}
public void loadMessageJournal(final PostOffice postOffice,
@@ -701,7 +689,7 @@
List<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
-
+
messageJournal.load(records, preparedTransactions, new
LargeMessageTXFailureCallback(messages));
ArrayList<LargeServerMessage> largeMessages = new
ArrayList<LargeServerMessage>();
@@ -926,7 +914,7 @@
msg.decrementDelayDeletionCount();
}
}
-
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
@@ -946,25 +934,24 @@
LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
messageEncoding.decode(buff);
-
+
Long originalMessageID =
(Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
+
// Using the linked file by the original file
if (originalMessageID != null)
{
LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
-
+
if (originalMessage == null)
{
// this could happen if the message was deleted but the file still exists as
the file still being used
originalMessage = createLargeMessage();
originalMessage.setMessageID(originalMessageID);
- originalMessage.setStored();
messages.put(originalMessageID, originalMessage);
}
-
+
originalMessage.incrementDelayDeletionCount();
-
+
largeMessage.setLinkedMessage(originalMessage);
}
return largeMessage;
@@ -992,7 +979,7 @@
// Use same method as load message journal to prune out acks, so they don't
get added.
// Then have reacknowledge(tx) methods on queue, which needs to add the page
size
-
+
// first get any sent messages for this tx and recreate
for (RecordInfo record : preparedTransaction.records)
{
@@ -1001,13 +988,13 @@
HornetQBuffer buff = ChannelBuffers.wrappedBuffer(data);
byte recordType = record.getUserRecordType();
-
+
switch (recordType)
{
case ADD_LARGE_MESSAGE:
- {
+ {
messages.put(record.id, parseLargeMessage(messages, buff));
-
+
break;
}
case ADD_MESSAGE:
@@ -1022,6 +1009,7 @@
}
case ADD_REF:
{
+
long messageID = record.id;
RefEncoding encoding = new RefEncoding();
@@ -1163,11 +1151,13 @@
resourceManager.putTransaction(xid, tx);
}
}
-
- //grouping handler operations
+
+ // grouping handler operations
public void addGrouping(GroupBinding groupBinding) throws Exception
{
- GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
groupBinding.getGroupId(), groupBinding.getClusterName());
+ GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
+
groupBinding.getGroupId(),
+
groupBinding.getClusterName());
bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD,
groupingEncoding, true);
}
@@ -1222,7 +1212,7 @@
bindingEncoding.setId(id);
- queueBindingInfos.add(bindingEncoding);
+ queueBindingInfos.add(bindingEncoding);
}
else if (rec == PERSISTENT_ID_RECORD)
{
@@ -1236,7 +1226,7 @@
{
idGenerator.loadState(record.id, buffer);
}
- else if(rec == GROUP_RECORD)
+ else if (rec == GROUP_RECORD)
{
GroupingEncoding encoding = new GroupingEncoding();
encoding.decode(buffer);
@@ -1285,7 +1275,7 @@
// Must call close to make sure last id is persisted
if (idGenerator != null)
{
- idGenerator.close();
+ idGenerator.close();
}
bindingsJournal.stop();
@@ -1371,7 +1361,7 @@
if (executor == null)
{
deleteAction.run();
- }
+ }
else
{
executor.execute(deleteAction);
@@ -1382,16 +1372,9 @@
* @param messageID
* @return
*/
- SequentialFile createFileForLargeMessage(final long messageID, final boolean stored)
+ SequentialFile createFileForLargeMessage(final long messageID)
{
- if (stored)
- {
- return largeMessagesFactory.createSequentialFile(messageID + ".msg",
-1);
- }
- else
- {
- return largeMessagesFactory.createSequentialFile(messageID + ".tmp",
-1);
- }
+ return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
}
// Private
----------------------------------------------------------------------------------
@@ -1464,13 +1447,13 @@
return XidCodecSupport.getXidEncodeLength(xid);
}
}
-
+
private static class HeuristicCompletionEncoding implements EncodingSupport
{
Xid xid;
boolean isCommit;
-
+
HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
{
this.xid = xid;
@@ -1921,5 +1904,4 @@
}
-
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-02
23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -600,8 +600,6 @@
{
if (pagingManager.page(message, true))
{
- message.setStored();
-
return;
}
}
@@ -668,7 +666,7 @@
}
public MessageReference reroute(final ServerMessage message, final Queue queue, final
Transaction tx) throws Exception
- {
+ {
MessageReference reference = message.createReference(queue);
Long scheduledDeliveryTime =
(Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
@@ -680,8 +678,6 @@
message.incrementDurableRefCount();
- message.setStored();
-
PagingStore store = pagingManager.getPageStore(message.getDestination());
message.incrementRefCount(store, reference);
@@ -865,11 +861,11 @@
{
reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
-
+
if (message.isDurable() && queue.isDurable())
{
int durableRefCount = message.incrementDurableRefCount();
-
+
if (durableRefCount == 1)
{
if (tx != null)
@@ -880,8 +876,6 @@
{
storageManager.storeMessage(message);
}
-
- message.setStored();
}
if (tx != null)
@@ -1163,7 +1157,6 @@
{
if (pagingManager.page(message, tx.getID(), first))
{
- message.setStored();
if (message.isDurable())
{
// We only create pageTransactions if using persistent messages
@@ -1232,7 +1225,7 @@
public void beforeRollback(Transaction tx) throws Exception
{
// Reverse the ref counts, and paging sizes
-
+
for (MessageReference ref : refs)
{
ServerMessage message = ref.getMessage();
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationLargemessageEndMessage.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -31,8 +31,6 @@
long messageId;
- boolean isDelete;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -42,11 +40,10 @@
super(REPLICATION_LARGE_MESSAGE_END);
}
- public ReplicationLargemessageEndMessage(final long messageId, final boolean
isDelete)
+ public ReplicationLargemessageEndMessage(final long messageId)
{
this();
this.messageId = messageId;
- this.isDelete = isDelete;
}
// Public --------------------------------------------------------
@@ -54,21 +51,19 @@
@Override
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_BOOLEAN;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG;
}
@Override
public void encodeBody(final HornetQBuffer buffer)
{
buffer.writeLong(messageId);
- buffer.writeBoolean(isDelete);
}
@Override
public void decodeBody(final HornetQBuffer buffer)
{
messageId = buffer.readLong();
- isDelete = buffer.readBoolean();
}
/**
@@ -79,14 +74,6 @@
return messageId;
}
- /**
- * @return the isDelete
- */
- public boolean isDelete()
- {
- return isDelete;
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-02
23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -78,8 +78,6 @@
void largeMessageWrite(long messageId, byte [] body);
- void largeMessageEnd(long messageId);
-
void largeMessageDelete(long messageId);
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -66,7 +66,7 @@
// Attributes ----------------------------------------------------
private static final boolean trace = log.isTraceEnabled();
-
+
private static void trace(String msg)
{
log.trace(msg);
@@ -85,7 +85,7 @@
private PagingManager pageManager;
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
-
+
private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new
ConcurrentHashMap<Long, LargeServerMessage>();
// Constructors --------------------------------------------------
@@ -201,7 +201,7 @@
{
channel.close();
storage.stop();
-
+
for (ConcurrentMap<Integer, Page> map : pageIndex.values())
{
for (Page page : map.values())
@@ -216,15 +216,14 @@
}
}
}
-
+
pageIndex.clear();
-
-
+
for (LargeServerMessage largeMessage : largeMessages.values())
{
largeMessage.releaseResources();
}
-
+
largeMessages.clear();
}
@@ -254,30 +253,17 @@
*/
private void handleLargeMessageEnd(ReplicationLargemessageEndMessage packet)
{
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(),
packet.isDelete());
+ LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
+
if (message != null)
{
- if (packet.isDelete())
+ try
{
- try
- {
- message.deleteFile();
- }
- catch (Exception e)
- {
- log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
- }
+ message.deleteFile();
}
- else
+ catch (Exception e)
{
- try
- {
- message.setStored();
- }
- catch (Exception e)
- {
- log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
- }
+ log.warn("Error deleting large message ID = " +
packet.getMessageId(), e);
}
}
}
@@ -293,13 +279,12 @@
message.addBytes(packet.getBody());
}
}
-
-
- private LargeServerMessage lookupLargeMessage(long messageId, boolean isDelete)
+
+ private LargeServerMessage lookupLargeMessage(long messageId, boolean delete)
{
LargeServerMessage message;
-
- if (isDelete)
+
+ if (delete)
{
message = largeMessages.remove(messageId);
}
@@ -307,12 +292,12 @@
{
message = largeMessages.get(messageId);
}
-
+
if (message == null)
{
log.warn("Large MessageID " + messageId + " is not available on
backup server. Ignoring replication message");
}
-
+
return message;
}
@@ -328,7 +313,6 @@
this.largeMessages.put(largeMessage.getMessageID(), largeMessage);
}
-
/**
* @param packet
*/
@@ -433,12 +417,11 @@
ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
Page page = pages.remove(packet.getPageNumber());
-
+
if (page == null)
{
page = getPage(packet.getStoreName(), packet.getPageNumber());
}
-
if (page != null)
{
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -276,22 +276,11 @@
{
if (enabled)
{
- sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, true));
+ sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId));
}
}
/* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#largeMessageEnd(long)
- */
- public void largeMessageEnd(long messageId)
- {
- if (enabled)
- {
- sendReplicatePacket(new ReplicationLargemessageEndMessage(messageId, false));
- }
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long,
byte[])
*/
public void largeMessageWrite(long messageId, byte[] body)
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-02 23:23:19 UTC (rev
8188)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-03 08:45:42 UTC (rev
8189)
@@ -47,11 +47,6 @@
int getMemoryEstimate();
- //TODO - do we really need this? Can't we use durable ref count?
- void setStored() throws Exception;
-
- boolean isStored();
-
int getRefCount();
ServerMessage makeCopyForExpiryOrDLA(long newID, boolean expiry) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-02 23:23:19
UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-03 08:45:42
UTC (rev 8189)
@@ -1379,9 +1379,10 @@
config.getName(),
config.getAddress(),
config.getTimeout());
- }
- log.info("deploying grouping handler: " + groupingHandler);
+ }
+
this.groupingHandler = groupingHandler;
+
managementService.addNotificationListener(groupingHandler);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-02 23:23:19
UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-03 08:45:42
UTC (rev 8189)
@@ -42,8 +42,6 @@
/** Global reference counts for paging control */
private final AtomicInteger refCount = new AtomicInteger(0);
- private volatile boolean stored;
-
// We cache this
private volatile int memoryEstimate = -1;
@@ -102,16 +100,6 @@
return ref;
}
- public boolean isStored()
- {
- return stored;
- }
-
- public void setStored() throws Exception
- {
- stored = true;
- }
-
public int incrementRefCount(final PagingStore pagingStore, final MessageReference
reference) throws Exception
{
int count = refCount.incrementAndGet();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-02 23:23:19
UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-03 08:45:42
UTC (rev 8189)
@@ -324,7 +324,7 @@
remotingConnection.removeFailureListener(this);
// Return any outstanding credits
-
+
closed = true;
for (CreditManagerHolder holder : creditManagerHolders.values())
@@ -648,7 +648,7 @@
{
log.error("Failed to query consumer deliveries", e);
}
-
+
sendResponse(message, null, false, false);
}
@@ -1064,9 +1064,9 @@
response = new SessionXAResponseMessage(true, XAException.XAER_PROTO, msg);
}
else
- {
+ {
Transaction theTx = resourceManager.removeTransaction(xid);
-
+
if (theTx == null)
{
// checked heuristic committed transactions
@@ -1436,9 +1436,9 @@
Packet response = null;
ServerMessage message = packet.getServerMessage();
-
+
try
- {
+ {
long id = storageManager.generateUniqueID();
message.setMessageID(id);
@@ -1510,9 +1510,9 @@
currentLargeMessage = null;
message.releaseResources();
-
+
send(message);
-
+
releaseOutStanding(message);
}
@@ -1857,7 +1857,7 @@
private void doRollback(final boolean lastMessageAsDelived, final Transaction theTx)
throws Exception
{
boolean wasStarted = started;
-
+
List<MessageReference> toCancel = new ArrayList<MessageReference>();
for (ServerConsumer consumer : consumers.values())
@@ -1874,7 +1874,7 @@
{
ref.getQueue().cancel(theTx, ref);
}
-
+
theTx.rollback();
if (wasStarted)
@@ -1899,7 +1899,7 @@
tx = new TransactionImpl(storageManager);
}
-
+
/*
* The way flow producer flow control works is as follows:
* The client can only send messages as long as it has credits. It requests credits
from the server
@@ -1914,11 +1914,11 @@
private void releaseOutStanding(final ServerMessage message) throws Exception
{
CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
-
+
int size = message.getEncodeSize();
-
+
holder.outstandingCredits -= size;
-
+
holder.store.returnProducerCredits(size);
}
@@ -1945,6 +1945,6 @@
else
{
postOffice.route(msg, tx);
- }
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-02
23:23:19 UTC (rev 8188)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -62,7 +62,7 @@
private final long createTime;
public TransactionImpl(final StorageManager storageManager)
- {
+ {
this.storageManager = storageManager;
xid = null;
@@ -73,7 +73,7 @@
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final
PostOffice postOffice)
- {
+ {
this.storageManager = storageManager;
this.xid = xid;
@@ -84,7 +84,7 @@
}
public TransactionImpl(final long id, final Xid xid, final StorageManager
storageManager)
- {
+ {
this.storageManager = storageManager;
this.xid = xid;
@@ -276,7 +276,7 @@
if (operations != null)
{
for (TransactionOperation operation : operations)
- {
+ {
operation.beforeRollback(this);
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-02
23:23:19 UTC (rev 8188)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -37,6 +37,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.tests.integration.largemessage.LargeMessageTestBase;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
@@ -436,8 +437,10 @@
consumerExpiry = session.createConsumer(ADDRESS_DLA);
msg1 = consumerExpiry.receive(5000);
+
assertNotNull(msg1);
- // msg1.acknowledge();
+
+ msg1.acknowledge();
for (int i = 0; i < messageSize; i++)
{
@@ -1348,19 +1351,28 @@
}
- public void testSendRollbackXA() throws Exception
+ public void testSendRollbackXADurable() throws Exception
{
- internalTestSendRollback(true);
+ internalTestSendRollback(true, true);
}
+
+ public void testSendRollbackXANonDurable() throws Exception
+ {
+ internalTestSendRollback(true, false);
+ }
- public void testSendRollback() throws Exception
+ public void testSendRollbackDurable() throws Exception
{
- internalTestSendRollback(false);
+ internalTestSendRollback(false, true);
}
+
+ public void testSendRollbackNonDurable() throws Exception
+ {
+ internalTestSendRollback(false, false);
+ }
- private void internalTestSendRollback(final boolean isXA) throws Exception
+ private void internalTestSendRollback(final boolean isXA, final boolean durable)
throws Exception
{
-
ClientSession session = null;
try
@@ -1379,13 +1391,13 @@
if (isXA)
{
- xid = newXID();
+ xid = RandomUtil.randomXid();
session.start(xid, XAResource.TMNOFLAGS);
}
ClientProducer producer = session.createProducer(ADDRESS);
- Message clientFile = createLargeClientMessage(session, 50000, false);
+ Message clientFile = createLargeClientMessage(session, 50000, durable);
for (int i = 0; i < 1; i++)
{
Deleted:
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCleanupTest.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -1,196 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.largemessage;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
-import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.integration.largemessage.mock.MockConnector;
-import org.hornetq.tests.integration.largemessage.mock.MockConnectorFactory;
-
-/**
- * A LargeMessageCleanupTest
- *
- * @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- *
- */
-public class LargeMessageCleanupTest extends LargeMessageTestBase
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(LargeMessageCleanupTest.class);
-
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
-
- public void testCleanup() throws Exception
- {
- clearData();
-
- FileOutputStream fileOut = new FileOutputStream(new File(getLargeMessagesDir(),
"1234.tmp"));
-
- fileOut.write(new byte[1024]); // anything
-
- fileOut.close();
-
- Configuration config = createDefaultConfig();
-
- server = createServer(true, config, -1, -1, new HashMap<String,
AddressSettings>());
-
- server.start();
-
- try
- {
-
- File directoryLarge = new File(getLargeMessagesDir());
-
- assertEquals("The startup should have been deleted 1234.tmp", 0,
directoryLarge.list().length);
- }
- finally
- {
- server.stop();
- }
- }
-
- public void testFailureOnSendingFile() throws Exception
- {
- clearData();
-
- Configuration config = createDefaultConfig();
-
- server = createServer(true, config, 10 * 1024, 20 * 1024, new HashMap<String,
AddressSettings>());
-
- server.start();
-
- final int numberOfBytes = 2 * 1024 * 1024;
-
- ClientSession session = null;
-
- class LocalCallback implements MockConnector.MockCallback
- {
- AtomicInteger counter = new AtomicInteger(0);
-
- ClientSession session;
-
- public void onWrite(final HornetQBuffer buffer)
- {
- log.info("calling cb onwrite** ");
- if (counter.incrementAndGet() == 5)
- {
- RemotingConnectionImpl conn =
(RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
- RemotingServiceImpl remotingServiceImpl =
(RemotingServiceImpl)server.getRemotingService();
- remotingServiceImpl.connectionException(conn.getID(),
- new
HornetQException(HornetQException.NOT_CONNECTED, "blah!"));
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED,
"blah"));
- throw new IllegalStateException("blah");
- }
- }
- }
-
- LocalCallback callback = new LocalCallback();
-
- try
- {
- HashMap<String, Object> parameters = new HashMap<String, Object>();
- parameters.put("callback", callback);
-
- TransportConfiguration transport = new
TransportConfiguration(MockConnectorFactory.class.getCanonicalName(),
- parameters);
-
- ClientSessionFactory mockFactory = new ClientSessionFactoryImpl(transport);
-
- mockFactory.setBlockOnNonPersistentSend(false);
- mockFactory.setBlockOnPersistentSend(false);
- mockFactory.setBlockOnAcknowledge(false);
-
- session = mockFactory.createSession(null, null, false, true, true, false, 0);
-
- callback.session = session;
-
- session.createQueue(ADDRESS, ADDRESS, null, true);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- ClientMessage clientLarge = createLargeClientMessage(session, numberOfBytes);
-
- try
- {
- producer.send(clientLarge);
-
- fail("Exception was expected!");
- }
- catch (Exception e)
- {
- }
-
- validateNoFilesOnLargeDir();
-
- session.close();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Exception ignored)
- {
- ignored.printStackTrace();
- }
- }
-
- }
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted:
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/tests/src/org/hornetq/tests/integration/largemessage/LargeMessageCrashTest.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -1,567 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.largemessage;
-
-import java.lang.management.ManagementFactory;
-import java.util.concurrent.Executor;
-
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.JournalLargeServerMessage;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.security.HornetQSecurityManager;
-import org.hornetq.core.security.impl.HornetQSecurityManagerImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.impl.HornetQServerImpl;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.tests.util.SpawnedVMSupport;
-
-/**
- * A LargeMessageCrashTest
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class LargeMessageCrashTest extends ServiceTestBase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- static String QUEUE_NAME = "MY-QUEUE";
-
- static int LARGE_MESSAGE_SIZE = 5 *
ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-
- static int PAGED_MESSAGE_SIZE = 1024;
-
- static int NUMBER_OF_PAGES_MESSAGES = 100;
-
- boolean failAfterRename;
-
- // Static --------------------------------------------------------
-
- public static void main(String args[])
- {
- LargeMessageCrashTest serverTest = new LargeMessageCrashTest();
-
- serverTest.failAfterRename = false;
-
- for (String arg : args)
- {
- if (arg.equals("failAfterRename"))
- {
- serverTest.failAfterRename = true;
- }
- }
-
- for (String arg : args)
- {
- if (arg.equals("remoteJournalSendNonTransactional"))
- {
- serverTest.remoteJournalSendNonTransactional();
- }
- else if (arg.equals("remoteJournalSendTransactional"))
- {
- serverTest.remoteJournalSendTransactional();
- }
- else if (arg.equals("remotePreparedTransaction"))
- {
- serverTest.remotePreparedTransaction();
- }
- else if (arg.equals("remotePaging"))
- {
- serverTest.remotePaging();
- }
- }
- }
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Commented out for
https://jira.jboss.org/jira/browse/HORNETQ-49
- public void testFoo()
- {
-
- }
-
-// public void testJournalSendNonTransactional1() throws Exception
-// {
-// internalTestSend(false, false);
-// }
-//
-// public void testJournalSendNonTransactional2() throws Exception
-// {
-// internalTestSend(true, false);
-// }
-//
-// public void testJournalSendTransactional1() throws Exception
-// {
-// internalTestSend(false, true);
-// }
-//
-// public void testJournalSendTransactional2() throws Exception
-// {
-// internalTestSend(true, true);
-// }
-
- public void internalTestSend(boolean failureAfterRename, boolean transactional) throws
Exception
- {
- if (transactional)
- {
- runExternalProcess(failureAfterRename,
"remoteJournalSendTransactional");
- }
- else
- {
- runExternalProcess(failureAfterRename,
"remoteJournalSendNonTransactional");
- }
-
- HornetQServer server = newServer(false);
-
- try
- {
- server.start();
-
- ClientSessionFactory cf = createInVMFactory();
-
- ClientSession session = cf.createSession(true, true);
-
- ClientConsumer cons = session.createConsumer(QUEUE_NAME);
-
- session.start();
-
- assertNull(cons.receive(100));
-
- session.close();
-
- validateNoFilesOnLargeDir();
- }
- finally
- {
- try
- {
- server.stop();
- }
- catch (Throwable ignored)
- {
- }
- }
- }
-
- public void testPreparedTransaction() throws Exception
- {
- runExternalProcess(false, "remotePreparedTransaction");
-
- HornetQServer server = newServer(false);
-
- server.start();
-
- ClientSessionFactory cf = createInVMFactory();
-
- ClientSession session = cf.createSession(true, false, false);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
-
- assertEquals(1, xids.length);
-
- session.rollback(xids[0]);
-
- session.close();
-
- server.stop();
-
- validateNoFilesOnLargeDir();
-
- }
-
- public void testPreparedTransactionAndCommit() throws Exception
- {
- runExternalProcess(false, "remotePreparedTransaction");
-
- HornetQServer server = newServer(false);
-
- server.start();
-
- ClientSessionFactory cf = createInVMFactory();
-
- ClientSession session = cf.createSession(true, false, false);
-
- Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
-
- assertEquals(1, xids.length);
-
- session.commit(xids[0], false);
-
- session.close();
-
- session = cf.createSession(false, false);
-
- ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
-
- session.start();
-
- ClientMessage msg = consumer.receive(5000);
-
- assertNotNull(msg);
-
- msg.acknowledge();
-
- for (int i = 0; i < LARGE_MESSAGE_SIZE; i++)
- {
- assertEquals(getSamplebyte(i), msg.getBody().readByte());
- }
-
- session.commit();
-
- session.close();
-
- server.stop();
-
- validateNoFilesOnLargeDir();
-
- }
-
-
- public void testPaging() throws Exception
- {
- runExternalProcess(false, "remotePaging");
-
- HornetQServer server = newServer(false);
-
- server.start();
-
- ClientSessionFactory cf = createInVMFactory();
-
- ClientSession session = cf.createSession(false, true, true);
-
- ClientConsumer consumer = session.createConsumer(QUEUE_NAME);
-
- session.start();
-
- for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
- {
- ClientMessage msg = consumer.receive(50000);
- assertNotNull(msg);
- msg.acknowledge();
- session.commit();
- }
-
- ClientMessage msg = consumer.receiveImmediate();
- assertNull(msg);
-
- session.close();
-
- server.stop();
-
- validateNoFilesOnLargeDir();
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
- /**
- * @throws Exception
- * @throws InterruptedException
- */
- private void runExternalProcess(boolean failAfterRename, String methodName) throws
Exception, InterruptedException
- {
- System.err.println("running external process...");
-
- Process process = SpawnedVMSupport.spawnVM(this.getClass().getCanonicalName(),
- "-Xms128m -Xmx128m ",
- new String[] {},
- true,
- true,
- methodName,
- (failAfterRename ?
"failAfterRename" : "regularFail"));
-
- assertEquals(100, process.waitFor());
- }
-
- // Inner classes -------------------------------------------------
-
- public void remoteJournalSendNonTransactional()
- {
-
- try
- {
- startServer(failAfterRename, true);
-
- ClientSessionFactory factory = createInVMFactory();
- ClientSession session = factory.createSession(true, true);
-
- try
- {
- session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
- }
- catch (Throwable ignored)
- {
- }
-
- ClientProducer prod = session.createProducer(QUEUE_NAME);
-
- prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
-
- }
-
- public void remoteJournalSendTransactional()
- {
- try
- {
- startServer(failAfterRename, true);
-
- ClientSessionFactory factory = createInVMFactory();
- ClientSession session = factory.createSession(false, false);
-
- try
- {
- session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
- }
- catch (Throwable ignored)
- {
- }
-
- ClientProducer prod = session.createProducer(QUEUE_NAME);
-
- prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
-
- }
-
- public void remotePreparedTransaction()
- {
- try
- {
- startServer(failAfterRename, false);
-
- ClientSessionFactory factory = createInVMFactory();
- ClientSession session = factory.createSession(true, false, false);
-
- try
- {
- session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
- }
- catch (Throwable ignored)
- {
- }
-
- ClientProducer prod = session.createProducer(QUEUE_NAME);
-
- Xid xid = newXID();
- session.start(xid, XAResource.TMNOFLAGS);
-
- prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
-
- session.end(xid, XAResource.TMSUCCESS);
- session.prepare(xid);
-
- Runtime.getRuntime().halt(100);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
-
- }
-
- public void remotePaging()
- {
- try
- {
- startServer(failAfterRename, true);
-
- ClientSessionFactory factory = createInVMFactory();
- ClientSession session = factory.createSession(false, false, false);
-
- try
- {
- session.createQueue(QUEUE_NAME, QUEUE_NAME, true);
- }
- catch (Throwable ignored)
- {
- }
-
- ClientProducer prod = session.createProducer(QUEUE_NAME);
-
- byte body[] = new byte[PAGED_MESSAGE_SIZE];
- for (int i = 0; i < body.length; i++)
- {
- body[i] = getSamplebyte(i);
- }
-
- ClientMessage msg = session.createClientMessage(true);
-
- msg.setBody(ChannelBuffers.wrappedBuffer(body));
-
- for (int i = 0; i < NUMBER_OF_PAGES_MESSAGES; i++)
- {
- prod.send(msg);
- }
-
- session.commit();
-
- session.close();
-
- session = factory.createSession(false, true, true);
- prod = session.createProducer(QUEUE_NAME);
-
- prod.send(createLargeClientMessage(session, LARGE_MESSAGE_SIZE, true));
-
- Runtime.getRuntime().halt(100);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- System.exit(-1);
- }
-
- }
-
- protected ClientMessage createLargeClientMessage(final ClientSession session,
- final long numberOfBytes,
- final boolean persistent) throws
Exception
- {
-
- ClientMessage clientMessage = session.createClientMessage(persistent);
-
- clientMessage.setBodyInputStream(createFakeLargeStream(numberOfBytes));
-
- return clientMessage;
- }
-
- protected void startServer(boolean failAfterRename, boolean fail)
- {
- this.failAfterRename = failAfterRename;
- try
- {
- HornetQServer server = newServer(fail);
- server.start();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
-
- private HornetQServer newServer(boolean failing)
- {
- Configuration configuration = createDefaultConfig(false);
- HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
-
- HornetQServer server;
-
- if (failing)
- {
- server = new FailingHornetQServer(configuration, securityManager);
- }
- else
- {
- server = new HornetQServerImpl(configuration, securityManager);
- }
-
- AddressSettings defaultSetting = new AddressSettings();
- defaultSetting.setPageSizeBytes(10 * 1024);
- defaultSetting.setMaxSizeBytes(100 * 1024);
-
- server.getAddressSettingsRepository().addMatch("#", defaultSetting);
-
- return server;
- }
-
- /** This is hacking HornetQServerImpl,
- * to make sure the server will fail right
- * before the page-file was removed */
- private class FailingHornetQServer extends HornetQServerImpl
- {
- FailingHornetQServer(final Configuration config, final HornetQSecurityManager
securityManager)
- {
- super(config, ManagementFactory.getPlatformMBeanServer(), securityManager);
- }
-
- @Override
- protected StorageManager createStorageManager()
- {
- return new FailingStorageManager(getConfiguration(), getExecutor());
- }
-
- }
-
- private class FailingStorageManager extends JournalStorageManager
- {
-
- public FailingStorageManager(final Configuration config, final Executor executor)
- {
- super(config, executor);
- }
-
- @Override
- public LargeServerMessage createLargeMessage()
- {
- return new FailinJournalLargeServerMessage(this);
- }
-
- }
-
- private class FailinJournalLargeServerMessage extends JournalLargeServerMessage
- {
- /**
- * @param storageManager
- */
- public FailinJournalLargeServerMessage(final JournalStorageManager storageManager)
- {
- super(storageManager);
- }
-
- @Override
- public void setStored() throws Exception
- {
- if (failAfterRename)
- {
- super.setStored();
- }
- Runtime.getRuntime().halt(100);
- }
-
- }
-
-}
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-02
23:23:19 UTC (rev 8188)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-03
08:45:42 UTC (rev 8189)
@@ -248,7 +248,7 @@
manager.largeMessageWrite(500, new byte[1024]);
- manager.largeMessageEnd(500);
+ manager.largeMessageDelete(500);
blockOnReplication(manager);