Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 11:42:03 -0500 (Thu, 03 Dec 2009)
New Revision: 8533
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
no code changes.. just clean up
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03
16:26:48 UTC (rev 8532)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03
16:42:03 UTC (rev 8533)
@@ -20,8 +20,6 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -177,7 +175,7 @@
{
this.executorFactory = executorFactory;
- this.executor = executorFactory.getExecutor();
+ executor = executorFactory.getExecutor();
this.replicator = replicator;
@@ -210,11 +208,11 @@
if (replicator != null)
{
- this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings,
replicator);
+ bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
}
else
{
- this.bindingsJournal = localBindings;
+ bindingsJournal = localBindings;
}
if (journalDir == null)
@@ -264,11 +262,11 @@
if (config.isBackup())
{
- this.idGenerator = null;
+ idGenerator = null;
}
else
{
- this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
+ idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE,
bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -283,11 +281,11 @@
if (replicator != null)
{
- this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
}
else
{
- this.messageJournal = localMessage;
+ messageJournal = localMessage;
}
largeMessagesDirectory = config.getLargeMessagesDirectory();
@@ -338,14 +336,12 @@
}
}
- // TODO: shouldn't those page methods be on the PageManager?
-
/*
*
* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString,
int)
*/
- public void pageClosed(SimpleString storeName, int pageNumber)
+ public void pageClosed(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
{
@@ -356,7 +352,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString,
int)
*/
- public void pageDeleted(SimpleString storeName, int pageNumber)
+ public void pageDeleted(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
{
@@ -367,7 +363,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int,
org.hornetq.core.buffers.ChannelBuffer)
*/
- public void pageWrite(PagedMessage message, int pageNumber)
+ public void pageWrite(final PagedMessage message, final int pageNumber)
{
if (isReplicated())
{
@@ -375,8 +371,6 @@
}
}
- // TODO: shouldn't those page methods be on the PageManager? ^^^^
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#getContext()
*/
@@ -385,7 +379,7 @@
return OperationContextImpl.getContext(executorFactory);
}
- public void setContext(OperationContext context)
+ public void setContext(final OperationContext context)
{
OperationContextImpl.setContext(context);
}
@@ -393,12 +387,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#newContext()
*/
- public OperationContext newContext(Executor executor)
+ public OperationContext newContext(final Executor executor)
{
return new OperationContextImpl(executor);
}
- public void afterCompleteOperations(IOAsyncTask run)
+ public void afterCompleteOperations(final IOAsyncTask run)
{
getContext().executeOnCompletion(run);
}
@@ -408,7 +402,7 @@
return persistentID;
}
- public void setPersistentID(UUID id) throws Exception
+ public void setPersistentID(final UUID id) throws Exception
{
long recordID = generateUniqueID();
@@ -417,7 +411,7 @@
bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new
PersistentIDEncoding(id), true);
}
- this.persistentID = id;
+ persistentID = id;
}
public long generateUniqueID()
@@ -437,7 +431,7 @@
return new LargeServerMessageImpl(this);
}
- public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[]
bytes) throws Exception
+ public void addBytesToLargeMessage(final SequentialFile file, final long messageId,
final byte[] bytes) throws Exception
{
file.position(file.size());
@@ -445,11 +439,11 @@
if (isReplicated())
{
- this.replicator.largeMessageWrite(messageId, bytes);
+ replicator.largeMessageWrite(messageId, bytes);
}
}
- public LargeServerMessage createLargeMessage(long id, byte[] header)
+ public LargeServerMessage createLargeMessage(final long id, final byte[] header)
{
if (isReplicated())
{
@@ -471,9 +465,9 @@
public void storeMessage(final ServerMessage message) throws Exception
{
- // TODO - how can this be less than zero?
if (message.getMessageID() <= 0)
{
+ // Sanity check only... this shouldn't happen unless there is a bug
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
}
@@ -540,7 +534,7 @@
getContext(syncNonTransactional));
}
- public void deleteDuplicateID(long recordID) throws Exception
+ public void deleteDuplicateID(final long recordID) throws Exception
{
messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -595,7 +589,7 @@
messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF,
new RefEncoding(queueID));
}
- public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+ public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws
Exception
{
long id = generateUniqueID();
@@ -607,7 +601,7 @@
return id;
}
- public void deleteHeuristicCompletion(long id) throws Exception
+ public void deleteHeuristicCompletion(final long id) throws Exception
{
messageJournal.appendDeleteRecord(id, true, getContext(true));
}
@@ -668,7 +662,7 @@
messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID,
encoding);
}
- public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
+ public void deleteDuplicateIDTransactional(final long txID, final long recordID)
throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
@@ -704,13 +698,15 @@
{
private final Map<Long, ServerMessage> messages;
- public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
+ public LargeMessageTXFailureCallback(final Map<Long, ServerMessage>
messages)
{
super();
this.messages = messages;
}
- public void failedTransaction(long transactionID, List<RecordInfo> records,
List<RecordInfo> recordsToDelete)
+ public void failedTransaction(final long transactionID,
+ final List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete)
{
for (RecordInfo record : records)
{
@@ -986,240 +982,8 @@
return info;
}
-
- /**
- * @param messages
- * @param buff
- * @return
- * @throws Exception
- */
- private LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages,
HornetQBuffer buff) throws Exception
- {
- LargeServerMessage largeMessage = createLargeMessage();
-
- LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
- messageEncoding.decode(buff);
-
- if
(largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
- {
- long originalMessageID =
largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
- {
- // this could happen if the message was deleted but the file still exists as
the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
- }
-
- originalMessage.incrementDelayDeletionCount();
-
- largeMessage.setLinkedMessage(originalMessage);
- }
- return largeMessage;
- }
-
- private void loadPreparedTransactions(final PostOffice postOffice,
- final PagingManager pagingManager,
- final ResourceManager resourceManager,
- final Map<Long, Queue> queues,
- final List<PreparedTransactionInfo>
preparedTransactions,
- final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
- {
- // recover prepared transactions
- for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
- {
- XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
-
- Xid xid = encodingXid.xid;
-
- Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
-
- List<MessageReference> referencesToAck = new
ArrayList<MessageReference>();
-
- Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
-
- // Use same method as load message journal to prune out acks, so they don't
get added.
- // Then have reacknowledge(tx) methods on queue, which needs to add the page
size
-
- // first get any sent messages for this tx and recreate
- for (RecordInfo record : preparedTransaction.records)
- {
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- byte recordType = record.getUserRecordType();
-
- switch (recordType)
- {
- case ADD_LARGE_MESSAGE:
- {
- messages.put(record.id, parseLargeMessage(messages, buff));
-
- break;
- }
- case ADD_MESSAGE:
- {
- ServerMessage message = new ServerMessageImpl(record.id, 50);
-
- message.decode(buff);
-
- messages.put(record.id, message);
-
- break;
- }
- case ADD_REF:
- {
-
- long messageID = record.id;
-
- RefEncoding encoding = new RefEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
- }
-
- ServerMessage message = messages.get(messageID);
-
- if (message == null)
- {
- throw new IllegalStateException("Cannot find message with id
" + messageID);
- }
-
- postOffice.reroute(message, queue, tx);
-
- break;
- }
- case ACKNOWLEDGE_REF:
- {
- long messageID = record.id;
-
- RefEncoding encoding = new RefEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
- }
-
- // TODO - this involves a scan - we should find a quicker qay of doing
it
- MessageReference removed = queue.removeReferenceWithID(messageID);
-
- referencesToAck.add(removed);
-
- if (removed == null)
- {
- throw new IllegalStateException("Failed to remove reference for
" + messageID);
- }
-
- break;
- }
- case PAGE_TRANSACTION:
- {
- PageTransactionInfo pageTransactionInfo = new
PageTransactionInfoImpl();
-
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.markIncomplete();
-
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION,
pageTransactionInfo);
-
- pagingManager.addTransaction(pageTransactionInfo);
-
- tx.addOperation(new FinishPageMessageOperation());
-
- break;
- }
- case SET_SCHEDULED_DELIVERY_TIME:
- {
- // Do nothing - for prepared txs, the set scheduled delivery time will
only occur in a send in which
- // case the message will already have the header for the scheduled
delivery time, so no need to do
- // anything.
-
- break;
- }
- case DUPLICATE_ID:
- {
- // We need load the duplicate ids at prepare time too
- DuplicateIDEncoding encoding = new DuplicateIDEncoding();
-
- encoding.decode(buff);
-
- List<Pair<byte[], Long>> ids =
duplicateIDMap.get(encoding.address);
-
- if (ids == null)
- {
- ids = new ArrayList<Pair<byte[], Long>>();
-
- duplicateIDMap.put(encoding.address, ids);
- }
-
- ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
-
- break;
- }
- default:
- {
- log.warn("InternalError: Record type " + recordType +
- " not recognized. Maybe you're using journal files
created on a different version");
- }
- }
- }
-
- for (RecordInfo record : preparedTransaction.recordsToDelete)
- {
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- long messageID = record.id;
-
- DeleteEncoding encoding = new DeleteEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id " +
encoding.queueID);
- }
-
- MessageReference removed = queue.removeReferenceWithID(messageID);
-
- if (removed != null)
- {
- referencesToAck.add(removed);
- }
-
- }
-
- for (MessageReference ack : referencesToAck)
- {
- ack.getQueue().reacknowledge(tx, ack);
- }
-
- tx.setState(Transaction.State.PREPARED);
-
- resourceManager.putTransaction(xid, tx);
- }
- }
-
// grouping handler operations
- public void addGrouping(GroupBinding groupBinding) throws Exception
+ public void addGrouping(final GroupBinding groupBinding) throws Exception
{
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
groupBinding.getGroupId(),
@@ -1227,7 +991,7 @@
bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD,
groupingEncoding, true);
}
- public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ public void deleteGrouping(final GroupBinding groupBinding) throws Exception
{
bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
}
@@ -1453,15 +1217,249 @@
}
}
}
+
/**
+ * @param messages
+ * @param buff
+ * @return
* @throws Exception
*/
+ private LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage>
messages, final HornetQBuffer buff) throws Exception
+ {
+ LargeServerMessage largeMessage = createLargeMessage();
+
+ LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+
+ messageEncoding.decode(buff);
+
+ if
(largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
+ {
+ long originalMessageID =
largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+ LargeServerMessage originalMessage =
(LargeServerMessage)messages.get(originalMessageID);
+
+ if (originalMessage == null)
+ {
+ // this could happen if the message was deleted but the file still exists as
the file still being used
+ originalMessage = createLargeMessage();
+ originalMessage.setDurable(true);
+ originalMessage.setMessageID(originalMessageID);
+ messages.put(originalMessageID, originalMessage);
+ }
+
+ originalMessage.incrementDelayDeletionCount();
+
+ largeMessage.setLinkedMessage(originalMessage);
+ }
+ return largeMessage;
+ }
+
+ private void loadPreparedTransactions(final PostOffice postOffice,
+ final PagingManager pagingManager,
+ final ResourceManager resourceManager,
+ final Map<Long, Queue> queues,
+ final List<PreparedTransactionInfo>
preparedTransactions,
+ final Map<SimpleString,
List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ {
+ // recover prepared transactions
+ for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
+ {
+ XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+
+ Xid xid = encodingXid.xid;
+
+ Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
+
+ List<MessageReference> referencesToAck = new
ArrayList<MessageReference>();
+
+ Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
+
+ // Use same method as load message journal to prune out acks, so they don't
get added.
+ // Then have reacknowledge(tx) methods on queue, which needs to add the page
size
+
+ // first get any sent messages for this tx and recreate
+ for (RecordInfo record : preparedTransaction.records)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+ byte recordType = record.getUserRecordType();
+
+ switch (recordType)
+ {
+ case ADD_LARGE_MESSAGE:
+ {
+ messages.put(record.id, parseLargeMessage(messages, buff));
+
+ break;
+ }
+ case ADD_MESSAGE:
+ {
+ ServerMessage message = new ServerMessageImpl(record.id, 50);
+
+ message.decode(buff);
+
+ messages.put(record.id, message);
+
+ break;
+ }
+ case ADD_REF:
+ {
+
+ long messageID = record.id;
+
+ RefEncoding encoding = new RefEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
+ }
+
+ ServerMessage message = messages.get(messageID);
+
+ if (message == null)
+ {
+ throw new IllegalStateException("Cannot find message with id
" + messageID);
+ }
+
+ postOffice.reroute(message, queue, tx);
+
+ break;
+ }
+ case ACKNOWLEDGE_REF:
+ {
+ long messageID = record.id;
+
+ RefEncoding encoding = new RefEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
+ }
+
+ // TODO - this involves a scan - we should find a quicker qay of doing
it
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ referencesToAck.add(removed);
+
+ if (removed == null)
+ {
+ throw new IllegalStateException("Failed to remove reference for
" + messageID);
+ }
+
+ break;
+ }
+ case PAGE_TRANSACTION:
+ {
+ PageTransactionInfo pageTransactionInfo = new
PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.markIncomplete();
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION,
pageTransactionInfo);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+
+ tx.addOperation(new FinishPageMessageOperation());
+
+ break;
+ }
+ case SET_SCHEDULED_DELIVERY_TIME:
+ {
+ // Do nothing - for prepared txs, the set scheduled delivery time will
only occur in a send in which
+ // case the message will already have the header for the scheduled
delivery time, so no need to do
+ // anything.
+
+ break;
+ }
+ case DUPLICATE_ID:
+ {
+ // We need load the duplicate ids at prepare time too
+ DuplicateIDEncoding encoding = new DuplicateIDEncoding();
+
+ encoding.decode(buff);
+
+ List<Pair<byte[], Long>> ids =
duplicateIDMap.get(encoding.address);
+
+ if (ids == null)
+ {
+ ids = new ArrayList<Pair<byte[], Long>>();
+
+ duplicateIDMap.put(encoding.address, ids);
+ }
+
+ ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
+
+ break;
+ }
+ default:
+ {
+ log.warn("InternalError: Record type " + recordType +
+ " not recognized. Maybe you're using journal files
created on a different version");
+ }
+ }
+ }
+
+ for (RecordInfo record : preparedTransaction.recordsToDelete)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+ long messageID = record.id;
+
+ DeleteEncoding encoding = new DeleteEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " +
encoding.queueID);
+ }
+
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ if (removed != null)
+ {
+ referencesToAck.add(removed);
+ }
+
+ }
+
+ for (MessageReference ack : referencesToAck)
+ {
+ ack.getQueue().reacknowledge(tx, ack);
+ }
+
+ tx.setState(Transaction.State.PREPARED);
+
+ resourceManager.putTransaction(xid, tx);
+ }
+ }
+
+
+
+ /**
+ * @throws Exception
+ */
private void cleanupIncompleteFiles() throws Exception
{
if (largeMessagesFactory != null)
{
- List<String> tmpFiles =
this.largeMessagesFactory.listFiles("tmp");
+ List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
for (String tmpFile : tmpFiles)
{
SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile,
-1);
@@ -1505,7 +1503,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
*/
- public void executeOnCompletion(IOAsyncTask runnable)
+ public void executeOnCompletion(final IOAsyncTask runnable)
{
}
@@ -1540,7 +1538,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
*/
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
}
@@ -1618,7 +1616,7 @@
SimpleString clusterName;
- public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+ public GroupingEncoding(final long id, final SimpleString groupId, final
SimpleString clusterName)
{
this.id = id;
this.groupId = groupId;
@@ -1634,13 +1632,13 @@
return SimpleString.sizeofString(groupId) +
SimpleString.sizeofString(clusterName);
}
- public void encode(HornetQBuffer buffer)
+ public void encode(final HornetQBuffer buffer)
{
buffer.writeSimpleString(groupId);
buffer.writeSimpleString(clusterName);
}
- public void decode(HornetQBuffer buffer)
+ public void decode(final HornetQBuffer buffer)
{
groupId = buffer.readSimpleString();
clusterName = buffer.readSimpleString();
@@ -1651,7 +1649,7 @@
return id;
}
- public void setId(long id)
+ public void setId(final long id)
{
this.id = id;
}
@@ -1780,7 +1778,7 @@
{
private final LargeServerMessage message;
- public LargeMessageEncoding(LargeServerMessage message)
+ public LargeMessageEncoding(final LargeServerMessage message)
{
this.message = message;
}
@@ -1908,7 +1906,7 @@
{
long scheduledDeliveryTime;
- private ScheduledDeliveryEncoding(long scheduledDeliveryTime, long queueID)
+ private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long
queueID)
{
super(queueID);
this.scheduledDeliveryTime = scheduledDeliveryTime;
@@ -1918,18 +1916,21 @@
{
}
+ @Override
public int getEncodeSize()
{
return super.getEncodeSize() + 8;
}
- public void encode(HornetQBuffer buffer)
+ @Override
+ public void encode(final HornetQBuffer buffer)
{
super.encode(buffer);
buffer.writeLong(scheduledDeliveryTime);
}
- public void decode(HornetQBuffer buffer)
+ @Override
+ public void decode(final HornetQBuffer buffer)
{
super.decode(buffer);
scheduledDeliveryTime = buffer.readLong();
@@ -2022,14 +2023,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
- */
- public Collection<Queue> getDistinctQueues()
- {
- return Collections.emptySet();
- }
-
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03
16:26:48 UTC (rev 8532)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03
16:42:03 UTC (rev 8533)
@@ -186,6 +186,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -256,12 +257,12 @@
{
volatile boolean failed;
- public void connectionFailed(HornetQException me)
+ public void connectionFailed(final HornetQException me)
{
failed = true;
}
- public void beforeReconnect(HornetQException exception)
+ public void beforeReconnect(final HornetQException exception)
{
}
}
@@ -299,6 +300,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -382,7 +384,7 @@
producer.send(message);
}
- ClientConsumer consumer = session.createConsumer(ADDRESS);
+ session.createConsumer(ADDRESS);
InVMConnector.failOnCreateConnection = true;
@@ -392,6 +394,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -435,7 +438,7 @@
Timer timer = new Timer();
ClientSession session = null;
-
+
try
{
@@ -446,21 +449,16 @@
final int reconnectAttempts = -1;
final ClientSessionFactoryInternal sf = createFactory(false);
-
-
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
sf.setConfirmationWindowSize(1024 * 1024);
-
-
+
session = sf.createSession();
final RemotingConnection connFailure =
((ClientSessionInternal)session).getConnection();
-
-
int numberOfThreads = 100;
final int numberOfSessionsToCreate = 10;
@@ -471,24 +469,25 @@
{
Throwable failure;
+ @Override
public void run()
{
try
{
alignLatch.countDown();
startFlag.await();
- for (int i = 0 ; i < numberOfSessionsToCreate; i++)
+ for (int i = 0; i < numberOfSessionsToCreate; i++)
{
Thread.yield();
ClientSession session = sf.createSession(false, true, true);
-
+
session.close();
}
}
catch (Throwable e)
{
e.printStackTrace();
- this.failure = e;
+ failure = e;
}
}
}
@@ -500,8 +499,6 @@
threads[i].start();
}
- // Sleep 3 times retryInterval, so it should at least have 3 retries
-
alignLatch.await();
timer.schedule(new TimerTask()
@@ -518,9 +515,9 @@
log.warn("Error on the timer " + e);
}
}
-
+
}, 10, 10);
-
+
startFlag.countDown();
Throwable failure = null;
@@ -546,7 +543,7 @@
finally
{
timer.cancel();
-
+
if (session != null)
{
session.close();
@@ -580,6 +577,7 @@
{
Throwable failure;
+ @Override
public void run()
{
try
@@ -593,7 +591,7 @@
catch (Throwable e)
{
e.printStackTrace();
- this.failure = e;
+ failure = e;
}
}
}
@@ -609,6 +607,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -678,6 +677,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -836,6 +836,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -874,7 +875,7 @@
long end = System.currentTimeMillis();
- assertTrue((end - start) >= retryInterval);
+ assertTrue(end - start >= retryInterval);
session.close();
@@ -952,7 +953,7 @@
double wait = retryInterval + retryMultiplier * retryInterval + retryMultiplier *
retryMultiplier * retryInterval;
- assertTrue((end - start) >= wait);
+ assertTrue(end - start >= wait);
session.close();
@@ -1031,9 +1032,9 @@
double wait = retryInterval + retryMultiplier * 2 * retryInterval +
retryMultiplier;
- assertTrue((end - start) >= wait);
+ assertTrue(end - start >= wait);
- assertTrue((end - start) < wait + 500);
+ assertTrue(end - start < wait + 500);
session.close();