JBoss hornetq SVN: r8536 - trunk/src/main/org/hornetq/core/management/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 13:41:20 -0500 (Thu, 03 Dec 2009)
New Revision: 8536
Modified:
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
Log:
Adding a few more waitOnCompletion calls on the QueueControl
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 17:13:58 UTC (rev 8535)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 18:41:20 UTC (rev 8536)
@@ -280,7 +280,7 @@
int retValue = queue.deleteMatchingReferences(filter);
- // Waiting on IO otherwise the JMX operation would return before the operation completed
+ // Waiting on IO otherwise the operation would return before the operation completed
storageManager.waitOnOperations();
return retValue;
@@ -288,7 +288,12 @@
public boolean expireMessage(final long messageID) throws Exception
{
- return queue.expireReference(messageID);
+ boolean retValue =queue.expireReference(messageID);
+
+ // Waiting on IO otherwise the operation would return before the operation completed
+ storageManager.waitOnOperations();
+
+ return retValue;
}
public int expireMessages(final String filterStr) throws Exception
@@ -296,7 +301,12 @@
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- return queue.expireReferences(filter);
+ int retValue = queue.expireReferences(filter);
+
+ // Waiting on IO otherwise the operation would return before the operation completed
+ storageManager.waitOnOperations();
+
+ return retValue;
}
catch (HornetQException e)
{
@@ -313,7 +323,12 @@
throw new IllegalArgumentException("No queue found for " + otherQueueName);
}
- return queue.moveReference(messageID, binding.getAddress());
+ boolean retValue = queue.moveReference(messageID, binding.getAddress());
+
+ // Waiting on IO otherwise the operation would return before the operation completed
+ storageManager.waitOnOperations();
+
+ return retValue;
}
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
@@ -327,7 +342,13 @@
throw new IllegalArgumentException("No queue found for " + otherQueueName);
}
- return queue.moveReferences(filter, binding.getAddress());
+ int retValue = queue.moveReferences(filter, binding.getAddress());
+
+ // Waiting on IO otherwise the operation would return before the operation completed
+ storageManager.waitOnOperations();
+
+ return retValue;
+
}
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
@@ -340,13 +361,21 @@
{
sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
}
+
+ // Waiting on IO otherwise the operation would return before the operation completed
+ storageManager.waitOnOperations();
return refs.size();
}
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
- return queue.sendMessageToDeadLetterAddress(messageID);
+ boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
+
+ // Waiting on IO otherwise the operation would return before the operation completed
+ storageManager.waitOnOperations();
+
+ return retValue;
}
public int changeMessagesPriority(String filterStr, int newPriority) throws Exception
15 years, 1 month
JBoss hornetq SVN: r8535 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-03 12:13:58 -0500 (Thu, 03 Dec 2009)
New Revision: 8535
Modified:
trunk/docs/user-manual/en/client-classpath.xml
Log:
documentation update
* updated client classpath with netty in the core dependencies
Modified: trunk/docs/user-manual/en/client-classpath.xml
===================================================================
--- trunk/docs/user-manual/en/client-classpath.xml 2009-12-03 16:46:12 UTC (rev 8534)
+++ trunk/docs/user-manual/en/client-classpath.xml 2009-12-03 17:13:58 UTC (rev 8535)
@@ -18,9 +18,8 @@
<!-- ============================================================================= -->
<chapter id="client-classpath">
<title>The Client Classpath</title>
- <para>In this chapter we explain which jars you need on the Java classpath of a HornetQ client
- application. This depends on various factors including whether you're using just core, JMS,
- JNDI or Netty. We explain which jars are needed in each case.</para>
+ <para>HornetQ requires several jars on the <emphasis>Client Classpath</emphasis> depending on
+ whether the client uses HornetQ Core API, JMS, and JNDI.</para>
<note>
<para>All the jars mentioned here can be found in the <literal>lib</literal> directory of
the HornetQ distribution. Be sure you only use the jars from the correct version of the
@@ -29,27 +28,26 @@
failures to occur.</para>
</note>
<section>
- <title>Pure Core Client</title>
- <para>If you're using just a pure HornetQ core client (i.e. no JMS) then you need <literal
- >hornetq-core-client.jar</literal> on your client classpath.</para>
- <para>If you're using a Netty transport then you will also netty <literal
- >netty.jar</literal> and <literal>hornetq-transports.jar</literal>.</para>
+ <title>HornetQ Core Client</title>
+ <para>If you are using just a pure HornetQ Core client (i.e. no JMS) then you need <literal
+ >hornetq-core-client.jar</literal>, <literal>hornetq-transports.jar</literal>
+ and <literal>netty.jar</literal> on your client classpath.</para>
</section>
<section>
<title>JMS Client</title>
- <para>If you're using JMS on the client side, then you will need <literal
- >hornetq-core-client.jar</literal>, <literal>hornetq-jms-client.jar</literal> and
- <literal>jboss-jms-api.jar</literal>. Note that <literal>jboss-jms-api.jar</literal>
+ <para>If you are using JMS on the client side, then you will also need to include
+ <literal>hornetq-jms-client.jar</literal> and <literal>jboss-jms-api.jar</literal>.</para>
+ <note>
+ <para><literal>jboss-jms-api.jar</literal>
just contains Java EE API interface classes needed for the <literal
- >javax.jms.*</literal> classes, so if you already have a jar with these interface
- classes on your classpath you won't need it.</para>
- <para>If you're using a Netty transport then you will also netty <literal
- >netty.jar</literal> and <literal>hornetq-transports.jar</literal>.</para>
+ >javax.jms.*</literal> classes. If you already have a jar with these interface
+ classes on your classpath, you will not need it.</para>
+ </note>
</section>
<section>
- <title>JNDI</title>
- <para>If you're looking up JNDI objects from the JNDI server co-located with the HornetQ
- standalone server you'll also need the jar <literal>jnp-client.jar</literal> jar on your
+ <title>JMS Client with JNDI</title>
+ <para>If you are looking up JMS resources from the JNDI server co-located with the HornetQ
+ standalone server, you wil also need the jar <literal>jnp-client.jar</literal> jar on your
client classpath as well as any other jars mentioned previously.</para>
</section>
</chapter>
15 years, 1 month
JBoss hornetq SVN: r8534 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-03 11:46:12 -0500 (Thu, 03 Dec 2009)
New Revision: 8534
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/perf-tuning.xml
Log:
documentation update
* updated doc for memory-measure-interval
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-12-03 16:42:03 UTC (rev 8533)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-12-03 16:46:12 UTC (rev 8534)
@@ -398,7 +398,7 @@
>memory-measure-interval</link></entry>
<entry>Long</entry>
<entry>frequency to sample JVM memory in ms (or -1 to disable memory sampling)</entry>
- <entry>30000</entry>
+ <entry>-1</entry>
</row>
<row>
<entry><link linkend="perf-tuning.memory"
Modified: trunk/docs/user-manual/en/perf-tuning.xml
===================================================================
--- trunk/docs/user-manual/en/perf-tuning.xml 2009-12-03 16:42:03 UTC (rev 8533)
+++ trunk/docs/user-manual/en/perf-tuning.xml 2009-12-03 16:46:12 UTC (rev 8534)
@@ -203,10 +203,10 @@
them to the same high value.</para>
<para>HornetQ will regularly sample JVM memory and reports if the available memory is below
a configurable threshold. Use this information to properly set JVM memory and paging.
- The sample frequency can be configured by setting <literal>memory-measure-interval</literal>
- in <literal>hornetq-configuration.xml</literal> (default is 30000ms, set it to -1 to disable
- memory sampling). When the available memory goes below the configured threshold, a warning is logged.
- The threshold can be configured by setting <literal>memory-warning-threshold</literal> in
+ The sample is disabled by default. To enabled it, configure the sample frequency by setting <literal>memory-measure-interval</literal>
+ in <literal>hornetq-configuration.xml</literal> (in milliseconds).
+ When the available memory goes below the configured threshold, a warning is logged.
+ The threshold can be also configured by setting <literal>memory-warning-threshold</literal> in
<literal>hornetq-configuration.xml</literal> (default is 25%).</para>
</listitem>
<listitem>
15 years, 1 month
JBoss hornetq SVN: r8533 - in trunk: tests/src/org/hornetq/tests/integration/cluster/reattach and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 11:42:03 -0500 (Thu, 03 Dec 2009)
New Revision: 8533
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
no code changes.. just clean up
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03 16:26:48 UTC (rev 8532)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03 16:42:03 UTC (rev 8533)
@@ -20,8 +20,6 @@
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -177,7 +175,7 @@
{
this.executorFactory = executorFactory;
- this.executor = executorFactory.getExecutor();
+ executor = executorFactory.getExecutor();
this.replicator = replicator;
@@ -210,11 +208,11 @@
if (replicator != null)
{
- this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
+ bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
}
else
{
- this.bindingsJournal = localBindings;
+ bindingsJournal = localBindings;
}
if (journalDir == null)
@@ -264,11 +262,11 @@
if (config.isBackup())
{
- this.idGenerator = null;
+ idGenerator = null;
}
else
{
- this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
+ idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
@@ -283,11 +281,11 @@
if (replicator != null)
{
- this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
}
else
{
- this.messageJournal = localMessage;
+ messageJournal = localMessage;
}
largeMessagesDirectory = config.getLargeMessagesDirectory();
@@ -338,14 +336,12 @@
}
}
- // TODO: shouldn't those page methods be on the PageManager?
-
/*
*
* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString, int)
*/
- public void pageClosed(SimpleString storeName, int pageNumber)
+ public void pageClosed(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
{
@@ -356,7 +352,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString, int)
*/
- public void pageDeleted(SimpleString storeName, int pageNumber)
+ public void pageDeleted(final SimpleString storeName, final int pageNumber)
{
if (isReplicated())
{
@@ -367,7 +363,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int, org.hornetq.core.buffers.ChannelBuffer)
*/
- public void pageWrite(PagedMessage message, int pageNumber)
+ public void pageWrite(final PagedMessage message, final int pageNumber)
{
if (isReplicated())
{
@@ -375,8 +371,6 @@
}
}
- // TODO: shouldn't those page methods be on the PageManager? ^^^^
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#getContext()
*/
@@ -385,7 +379,7 @@
return OperationContextImpl.getContext(executorFactory);
}
- public void setContext(OperationContext context)
+ public void setContext(final OperationContext context)
{
OperationContextImpl.setContext(context);
}
@@ -393,12 +387,12 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#newContext()
*/
- public OperationContext newContext(Executor executor)
+ public OperationContext newContext(final Executor executor)
{
return new OperationContextImpl(executor);
}
- public void afterCompleteOperations(IOAsyncTask run)
+ public void afterCompleteOperations(final IOAsyncTask run)
{
getContext().executeOnCompletion(run);
}
@@ -408,7 +402,7 @@
return persistentID;
}
- public void setPersistentID(UUID id) throws Exception
+ public void setPersistentID(final UUID id) throws Exception
{
long recordID = generateUniqueID();
@@ -417,7 +411,7 @@
bindingsJournal.appendAddRecord(recordID, PERSISTENT_ID_RECORD, new PersistentIDEncoding(id), true);
}
- this.persistentID = id;
+ persistentID = id;
}
public long generateUniqueID()
@@ -437,7 +431,7 @@
return new LargeServerMessageImpl(this);
}
- public void addBytesToLargeMessage(SequentialFile file, long messageId, final byte[] bytes) throws Exception
+ public void addBytesToLargeMessage(final SequentialFile file, final long messageId, final byte[] bytes) throws Exception
{
file.position(file.size());
@@ -445,11 +439,11 @@
if (isReplicated())
{
- this.replicator.largeMessageWrite(messageId, bytes);
+ replicator.largeMessageWrite(messageId, bytes);
}
}
- public LargeServerMessage createLargeMessage(long id, byte[] header)
+ public LargeServerMessage createLargeMessage(final long id, final byte[] header)
{
if (isReplicated())
{
@@ -471,9 +465,9 @@
public void storeMessage(final ServerMessage message) throws Exception
{
- // TODO - how can this be less than zero?
if (message.getMessageID() <= 0)
{
+ // Sanity check only... this shouldn't happen unless there is a bug
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
@@ -540,7 +534,7 @@
getContext(syncNonTransactional));
}
- public void deleteDuplicateID(long recordID) throws Exception
+ public void deleteDuplicateID(final long recordID) throws Exception
{
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
}
@@ -595,7 +589,7 @@
messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
}
- public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+ public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
{
long id = generateUniqueID();
@@ -607,7 +601,7 @@
return id;
}
- public void deleteHeuristicCompletion(long id) throws Exception
+ public void deleteHeuristicCompletion(final long id) throws Exception
{
messageJournal.appendDeleteRecord(id, true, getContext(true));
}
@@ -668,7 +662,7 @@
messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
}
- public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
+ public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
@@ -704,13 +698,15 @@
{
private final Map<Long, ServerMessage> messages;
- public LargeMessageTXFailureCallback(Map<Long, ServerMessage> messages)
+ public LargeMessageTXFailureCallback(final Map<Long, ServerMessage> messages)
{
super();
this.messages = messages;
}
- public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ public void failedTransaction(final long transactionID,
+ final List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete)
{
for (RecordInfo record : records)
{
@@ -986,240 +982,8 @@
return info;
}
-
- /**
- * @param messages
- * @param buff
- * @return
- * @throws Exception
- */
- private LargeServerMessage parseLargeMessage(Map<Long, ServerMessage> messages, HornetQBuffer buff) throws Exception
- {
- LargeServerMessage largeMessage = createLargeMessage();
-
- LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
- messageEncoding.decode(buff);
-
- if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
- {
- long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
-
- LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
- if (originalMessage == null)
- {
- // this could happen if the message was deleted but the file still exists as the file still being used
- originalMessage = createLargeMessage();
- originalMessage.setDurable(true);
- originalMessage.setMessageID(originalMessageID);
- messages.put(originalMessageID, originalMessage);
- }
-
- originalMessage.incrementDelayDeletionCount();
-
- largeMessage.setLinkedMessage(originalMessage);
- }
- return largeMessage;
- }
-
- private void loadPreparedTransactions(final PostOffice postOffice,
- final PagingManager pagingManager,
- final ResourceManager resourceManager,
- final Map<Long, Queue> queues,
- final List<PreparedTransactionInfo> preparedTransactions,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
- {
- // recover prepared transactions
- for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
- {
- XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
-
- Xid xid = encodingXid.xid;
-
- Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
-
- List<MessageReference> referencesToAck = new ArrayList<MessageReference>();
-
- Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
-
- // Use same method as load message journal to prune out acks, so they don't get added.
- // Then have reacknowledge(tx) methods on queue, which needs to add the page size
-
- // first get any sent messages for this tx and recreate
- for (RecordInfo record : preparedTransaction.records)
- {
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- byte recordType = record.getUserRecordType();
-
- switch (recordType)
- {
- case ADD_LARGE_MESSAGE:
- {
- messages.put(record.id, parseLargeMessage(messages, buff));
-
- break;
- }
- case ADD_MESSAGE:
- {
- ServerMessage message = new ServerMessageImpl(record.id, 50);
-
- message.decode(buff);
-
- messages.put(record.id, message);
-
- break;
- }
- case ADD_REF:
- {
-
- long messageID = record.id;
-
- RefEncoding encoding = new RefEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
- }
-
- ServerMessage message = messages.get(messageID);
-
- if (message == null)
- {
- throw new IllegalStateException("Cannot find message with id " + messageID);
- }
-
- postOffice.reroute(message, queue, tx);
-
- break;
- }
- case ACKNOWLEDGE_REF:
- {
- long messageID = record.id;
-
- RefEncoding encoding = new RefEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
- }
-
- // TODO - this involves a scan - we should find a quicker qay of doing it
- MessageReference removed = queue.removeReferenceWithID(messageID);
-
- referencesToAck.add(removed);
-
- if (removed == null)
- {
- throw new IllegalStateException("Failed to remove reference for " + messageID);
- }
-
- break;
- }
- case PAGE_TRANSACTION:
- {
- PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
-
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.markIncomplete();
-
- tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
-
- pagingManager.addTransaction(pageTransactionInfo);
-
- tx.addOperation(new FinishPageMessageOperation());
-
- break;
- }
- case SET_SCHEDULED_DELIVERY_TIME:
- {
- // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which
- // case the message will already have the header for the scheduled delivery time, so no need to do
- // anything.
-
- break;
- }
- case DUPLICATE_ID:
- {
- // We need load the duplicate ids at prepare time too
- DuplicateIDEncoding encoding = new DuplicateIDEncoding();
-
- encoding.decode(buff);
-
- List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
-
- if (ids == null)
- {
- ids = new ArrayList<Pair<byte[], Long>>();
-
- duplicateIDMap.put(encoding.address, ids);
- }
-
- ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
-
- break;
- }
- default:
- {
- log.warn("InternalError: Record type " + recordType +
- " not recognized. Maybe you're using journal files created on a different version");
- }
- }
- }
-
- for (RecordInfo record : preparedTransaction.recordsToDelete)
- {
- byte[] data = record.data;
-
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
-
- long messageID = record.id;
-
- DeleteEncoding encoding = new DeleteEncoding();
-
- encoding.decode(buff);
-
- Queue queue = queues.get(encoding.queueID);
-
- if (queue == null)
- {
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
- }
-
- MessageReference removed = queue.removeReferenceWithID(messageID);
-
- if (removed != null)
- {
- referencesToAck.add(removed);
- }
-
- }
-
- for (MessageReference ack : referencesToAck)
- {
- ack.getQueue().reacknowledge(tx, ack);
- }
-
- tx.setState(Transaction.State.PREPARED);
-
- resourceManager.putTransaction(xid, tx);
- }
- }
-
// grouping handler operations
- public void addGrouping(GroupBinding groupBinding) throws Exception
+ public void addGrouping(final GroupBinding groupBinding) throws Exception
{
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
groupBinding.getGroupId(),
@@ -1227,7 +991,7 @@
bindingsJournal.appendAddRecord(groupBinding.getId(), GROUP_RECORD, groupingEncoding, true);
}
- public void deleteGrouping(GroupBinding groupBinding) throws Exception
+ public void deleteGrouping(final GroupBinding groupBinding) throws Exception
{
bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
}
@@ -1453,15 +1217,249 @@
}
}
}
+
/**
+ * @param messages
+ * @param buff
+ * @return
* @throws Exception
*/
+ private LargeServerMessage parseLargeMessage(final Map<Long, ServerMessage> messages, final HornetQBuffer buff) throws Exception
+ {
+ LargeServerMessage largeMessage = createLargeMessage();
+
+ LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
+
+ messageEncoding.decode(buff);
+
+ if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
+ {
+ long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+ LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
+
+ if (originalMessage == null)
+ {
+ // this could happen if the message was deleted but the file still exists as the file still being used
+ originalMessage = createLargeMessage();
+ originalMessage.setDurable(true);
+ originalMessage.setMessageID(originalMessageID);
+ messages.put(originalMessageID, originalMessage);
+ }
+
+ originalMessage.incrementDelayDeletionCount();
+
+ largeMessage.setLinkedMessage(originalMessage);
+ }
+ return largeMessage;
+ }
+
+ private void loadPreparedTransactions(final PostOffice postOffice,
+ final PagingManager pagingManager,
+ final ResourceManager resourceManager,
+ final Map<Long, Queue> queues,
+ final List<PreparedTransactionInfo> preparedTransactions,
+ final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+ {
+ // recover prepared transactions
+ for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
+ {
+ XidEncoding encodingXid = new XidEncoding(preparedTransaction.extraData);
+
+ Xid xid = encodingXid.xid;
+
+ Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this);
+
+ List<MessageReference> referencesToAck = new ArrayList<MessageReference>();
+
+ Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+
+ // Use same method as load message journal to prune out acks, so they don't get added.
+ // Then have reacknowledge(tx) methods on queue, which needs to add the page size
+
+ // first get any sent messages for this tx and recreate
+ for (RecordInfo record : preparedTransaction.records)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+ byte recordType = record.getUserRecordType();
+
+ switch (recordType)
+ {
+ case ADD_LARGE_MESSAGE:
+ {
+ messages.put(record.id, parseLargeMessage(messages, buff));
+
+ break;
+ }
+ case ADD_MESSAGE:
+ {
+ ServerMessage message = new ServerMessageImpl(record.id, 50);
+
+ message.decode(buff);
+
+ messages.put(record.id, message);
+
+ break;
+ }
+ case ADD_REF:
+ {
+
+ long messageID = record.id;
+
+ RefEncoding encoding = new RefEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+
+ ServerMessage message = messages.get(messageID);
+
+ if (message == null)
+ {
+ throw new IllegalStateException("Cannot find message with id " + messageID);
+ }
+
+ postOffice.reroute(message, queue, tx);
+
+ break;
+ }
+ case ACKNOWLEDGE_REF:
+ {
+ long messageID = record.id;
+
+ RefEncoding encoding = new RefEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+
+ // TODO - this involves a scan - we should find a quicker qay of doing it
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ referencesToAck.add(removed);
+
+ if (removed == null)
+ {
+ throw new IllegalStateException("Failed to remove reference for " + messageID);
+ }
+
+ break;
+ }
+ case PAGE_TRANSACTION:
+ {
+ PageTransactionInfo pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.markIncomplete();
+
+ tx.putProperty(TransactionPropertyIndexes.PAGE_TRANSACTION, pageTransactionInfo);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+
+ tx.addOperation(new FinishPageMessageOperation());
+
+ break;
+ }
+ case SET_SCHEDULED_DELIVERY_TIME:
+ {
+ // Do nothing - for prepared txs, the set scheduled delivery time will only occur in a send in which
+ // case the message will already have the header for the scheduled delivery time, so no need to do
+ // anything.
+
+ break;
+ }
+ case DUPLICATE_ID:
+ {
+ // We need load the duplicate ids at prepare time too
+ DuplicateIDEncoding encoding = new DuplicateIDEncoding();
+
+ encoding.decode(buff);
+
+ List<Pair<byte[], Long>> ids = duplicateIDMap.get(encoding.address);
+
+ if (ids == null)
+ {
+ ids = new ArrayList<Pair<byte[], Long>>();
+
+ duplicateIDMap.put(encoding.address, ids);
+ }
+
+ ids.add(new Pair<byte[], Long>(encoding.duplID, record.id));
+
+ break;
+ }
+ default:
+ {
+ log.warn("InternalError: Record type " + recordType +
+ " not recognized. Maybe you're using journal files created on a different version");
+ }
+ }
+ }
+
+ for (RecordInfo record : preparedTransaction.recordsToDelete)
+ {
+ byte[] data = record.data;
+
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+
+ long messageID = record.id;
+
+ DeleteEncoding encoding = new DeleteEncoding();
+
+ encoding.decode(buff);
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ if (removed != null)
+ {
+ referencesToAck.add(removed);
+ }
+
+ }
+
+ for (MessageReference ack : referencesToAck)
+ {
+ ack.getQueue().reacknowledge(tx, ack);
+ }
+
+ tx.setState(Transaction.State.PREPARED);
+
+ resourceManager.putTransaction(xid, tx);
+ }
+ }
+
+
+
+ /**
+ * @throws Exception
+ */
private void cleanupIncompleteFiles() throws Exception
{
if (largeMessagesFactory != null)
{
- List<String> tmpFiles = this.largeMessagesFactory.listFiles("tmp");
+ List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
for (String tmpFile : tmpFiles)
{
SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile, -1);
@@ -1505,7 +1503,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
*/
- public void executeOnCompletion(IOAsyncTask runnable)
+ public void executeOnCompletion(final IOAsyncTask runnable)
{
}
@@ -1540,7 +1538,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
*/
- public void onError(int errorCode, String errorMessage)
+ public void onError(final int errorCode, final String errorMessage)
{
}
@@ -1618,7 +1616,7 @@
SimpleString clusterName;
- public GroupingEncoding(long id, SimpleString groupId, SimpleString clusterName)
+ public GroupingEncoding(final long id, final SimpleString groupId, final SimpleString clusterName)
{
this.id = id;
this.groupId = groupId;
@@ -1634,13 +1632,13 @@
return SimpleString.sizeofString(groupId) + SimpleString.sizeofString(clusterName);
}
- public void encode(HornetQBuffer buffer)
+ public void encode(final HornetQBuffer buffer)
{
buffer.writeSimpleString(groupId);
buffer.writeSimpleString(clusterName);
}
- public void decode(HornetQBuffer buffer)
+ public void decode(final HornetQBuffer buffer)
{
groupId = buffer.readSimpleString();
clusterName = buffer.readSimpleString();
@@ -1651,7 +1649,7 @@
return id;
}
- public void setId(long id)
+ public void setId(final long id)
{
this.id = id;
}
@@ -1780,7 +1778,7 @@
{
private final LargeServerMessage message;
- public LargeMessageEncoding(LargeServerMessage message)
+ public LargeMessageEncoding(final LargeServerMessage message)
{
this.message = message;
}
@@ -1908,7 +1906,7 @@
{
long scheduledDeliveryTime;
- private ScheduledDeliveryEncoding(long scheduledDeliveryTime, long queueID)
+ private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long queueID)
{
super(queueID);
this.scheduledDeliveryTime = scheduledDeliveryTime;
@@ -1918,18 +1916,21 @@
{
}
+ @Override
public int getEncodeSize()
{
return super.getEncodeSize() + 8;
}
- public void encode(HornetQBuffer buffer)
+ @Override
+ public void encode(final HornetQBuffer buffer)
{
super.encode(buffer);
buffer.writeLong(scheduledDeliveryTime);
}
- public void decode(HornetQBuffer buffer)
+ @Override
+ public void decode(final HornetQBuffer buffer)
{
super.decode(buffer);
scheduledDeliveryTime = buffer.readLong();
@@ -2022,14 +2023,6 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
- */
- public Collection<Queue> getDistinctQueues()
- {
- return Collections.emptySet();
- }
-
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03 16:26:48 UTC (rev 8532)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03 16:42:03 UTC (rev 8533)
@@ -186,6 +186,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -256,12 +257,12 @@
{
volatile boolean failed;
- public void connectionFailed(HornetQException me)
+ public void connectionFailed(final HornetQException me)
{
failed = true;
}
- public void beforeReconnect(HornetQException exception)
+ public void beforeReconnect(final HornetQException exception)
{
}
}
@@ -299,6 +300,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -382,7 +384,7 @@
producer.send(message);
}
- ClientConsumer consumer = session.createConsumer(ADDRESS);
+ session.createConsumer(ADDRESS);
InVMConnector.failOnCreateConnection = true;
@@ -392,6 +394,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -435,7 +438,7 @@
Timer timer = new Timer();
ClientSession session = null;
-
+
try
{
@@ -446,21 +449,16 @@
final int reconnectAttempts = -1;
final ClientSessionFactoryInternal sf = createFactory(false);
-
-
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
sf.setConfirmationWindowSize(1024 * 1024);
-
-
+
session = sf.createSession();
final RemotingConnection connFailure = ((ClientSessionInternal)session).getConnection();
-
-
int numberOfThreads = 100;
final int numberOfSessionsToCreate = 10;
@@ -471,24 +469,25 @@
{
Throwable failure;
+ @Override
public void run()
{
try
{
alignLatch.countDown();
startFlag.await();
- for (int i = 0 ; i < numberOfSessionsToCreate; i++)
+ for (int i = 0; i < numberOfSessionsToCreate; i++)
{
Thread.yield();
ClientSession session = sf.createSession(false, true, true);
-
+
session.close();
}
}
catch (Throwable e)
{
e.printStackTrace();
- this.failure = e;
+ failure = e;
}
}
}
@@ -500,8 +499,6 @@
threads[i].start();
}
- // Sleep 3 times retryInterval, so it should at least have 3 retries
-
alignLatch.await();
timer.schedule(new TimerTask()
@@ -518,9 +515,9 @@
log.warn("Error on the timer " + e);
}
}
-
+
}, 10, 10);
-
+
startFlag.countDown();
Throwable failure = null;
@@ -546,7 +543,7 @@
finally
{
timer.cancel();
-
+
if (session != null)
{
session.close();
@@ -580,6 +577,7 @@
{
Throwable failure;
+ @Override
public void run()
{
try
@@ -593,7 +591,7 @@
catch (Throwable e)
{
e.printStackTrace();
- this.failure = e;
+ failure = e;
}
}
}
@@ -609,6 +607,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -678,6 +677,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -836,6 +836,7 @@
Thread t = new Thread()
{
+ @Override
public void run()
{
try
@@ -874,7 +875,7 @@
long end = System.currentTimeMillis();
- assertTrue((end - start) >= retryInterval);
+ assertTrue(end - start >= retryInterval);
session.close();
@@ -952,7 +953,7 @@
double wait = retryInterval + retryMultiplier * retryInterval + retryMultiplier * retryMultiplier * retryInterval;
- assertTrue((end - start) >= wait);
+ assertTrue(end - start >= wait);
session.close();
@@ -1031,9 +1032,9 @@
double wait = retryInterval + retryMultiplier * 2 * retryInterval + retryMultiplier;
- assertTrue((end - start) >= wait);
+ assertTrue(end - start >= wait);
- assertTrue((end - start) < wait + 500);
+ assertTrue(end - start < wait + 500);
session.close();
15 years, 1 month
JBoss hornetq SVN: r8532 - in trunk/src/main/org/hornetq/core: persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 11:26:48 -0500 (Thu, 03 Dec 2009)
New Revision: 8532
Modified:
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
Removing waitOnCompletion for deleteMatchQueue
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-03 16:16:23 UTC (rev 8531)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-03 16:26:48 UTC (rev 8532)
@@ -442,7 +442,7 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.commit(false);
- server.getStorageManager().waitOnOperations(-1);
+ server.getStorageManager().waitOnOperations();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
@@ -461,7 +461,7 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.rollback();
- server.getStorageManager().completeOperations();
+ server.getStorageManager().waitOnOperations();
long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
@@ -577,6 +577,8 @@
public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
{
postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
+ // blocking on IO. Otherwise the method would return before the operation was finished
+ server.getStorageManager().waitOnOperations();
}
// NotificationEmitter implementation ----------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-03 16:16:23 UTC (rev 8531)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-03 16:26:48 UTC (rev 8532)
@@ -259,6 +259,7 @@
QueueControlImpl queueControl = new QueueControlImpl(queue,
address.toString(),
postOffice,
+ storageManager,
addressSettingsRepository);
if (messageCounterManager != null)
{
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 16:16:23 UTC (rev 8531)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 16:26:48 UTC (rev 8532)
@@ -27,6 +27,7 @@
import org.hornetq.core.message.Message;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.impl.MessageCounterHelper;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
@@ -58,6 +59,8 @@
private final PostOffice postOffice;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+ private final StorageManager storageManager;
private MessageCounter counter;
@@ -79,6 +82,7 @@
public QueueControlImpl(final Queue queue,
final String address,
final PostOffice postOffice,
+ final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
{
super(QueueControl.class);
@@ -86,6 +90,7 @@
this.address = address;
this.postOffice = postOffice;
this.addressSettingsRepository = addressSettingsRepository;
+ this.storageManager = storageManager;
}
// Public --------------------------------------------------------
@@ -272,7 +277,13 @@
public int removeMessages(final String filterStr) throws Exception
{
Filter filter = FilterImpl.createFilter(filterStr);
- return queue.deleteMatchingReferences(filter);
+
+ int retValue = queue.deleteMatchingReferences(filter);
+
+ // Waiting on IO otherwise the JMX operation would return before the operation completed
+ storageManager.waitOnOperations();
+
+ return retValue;
}
public boolean expireMessage(final long messageID) throws Exception
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03 16:16:23 UTC (rev 8531)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03 16:26:48 UTC (rev 8532)
@@ -317,7 +317,7 @@
public void waitOnOperations() throws Exception
{
- waitOnOperations(-1);
+ waitOnOperations(0);
}
/* (non-Javadoc)
@@ -328,7 +328,7 @@
SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
afterCompleteOperations(waitCallback);
completeOperations();
- if (timeout <= 0)
+ if (timeout == 0)
{
waitCallback.waitCompletion();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-03 16:16:23 UTC (rev 8531)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-03 16:26:48 UTC (rev 8532)
@@ -666,8 +666,6 @@
tx.commit();
}
-
- storageManager.waitOnOperations(-1);
return count;
}
15 years, 1 month
JBoss hornetq SVN: r8531 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-03 11:16:23 -0500 (Thu, 03 Dec 2009)
New Revision: 8531
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
fix to clientsessionfactoryimpl
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-03 16:12:38 UTC (rev 8530)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-03 16:16:23 UTC (rev 8531)
@@ -267,8 +267,7 @@
private synchronized void initialise() throws Exception
{
if (!readOnly)
- {
- readOnly = true;
+ {
setThreadPools();
instantiateLoadBalancingPolicy();
@@ -315,6 +314,7 @@
{
throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
}
+ readOnly = true;
}
}
@@ -1093,19 +1093,16 @@
{
throw new IllegalStateException("Cannot create session, factory is closed (maybe it has been garbage collected)");
}
-
- if (!readOnly)
+
+ try
{
- try
- {
- initialise();
- }
- catch (Exception e)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
- }
+ initialise();
}
-
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
+ }
+
if (discoveryGroup != null && !receivedBroadcast)
{
boolean ok = discoveryGroup.waitForBroadcast(discoveryInitialWaitTimeout);
15 years, 1 month
JBoss hornetq SVN: r8530 - trunk/src/main/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 11:12:38 -0500 (Thu, 03 Dec 2009)
New Revision: 8530
Modified:
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
tweak: comment change
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 15:54:24 UTC (rev 8529)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 16:12:38 UTC (rev 8530)
@@ -375,8 +375,8 @@
enabled = false;
- // The same context will be replicated on the pending tokens...
- // as the multiple operations will be replicated on the same context
+ // Complete any pending operations...
+ // Case the backup crashed, this should clean up any pending requests
while (!pendingTokens.isEmpty())
{
OperationContext ctx = pendingTokens.poll();
15 years, 1 month
JBoss hornetq SVN: r8529 - trunk/tests/src/org/hornetq/tests/integration/cluster/bridge.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-03 10:54:24 -0500 (Thu, 03 Dec 2009)
New Revision: 8529
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
Log:
set InVMConnector.failOnCreateConnection to false in BridgeTestBase.tearDown()
* static fields are the bane of unit tests...
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-12-03 15:30:02 UTC (rev 8528)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTestBase.java 2009-12-03 15:54:24 UTC (rev 8529)
@@ -19,6 +19,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -65,6 +66,8 @@
servers = null;
+ InVMConnector.failOnCreateConnection = false;
+
super.tearDown();
}
15 years, 1 month
JBoss hornetq SVN: r8528 - trunk/src/main/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-03 10:30:02 -0500 (Thu, 03 Dec 2009)
New Revision: 8528
Modified:
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
reverted r8527
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 15:15:26 UTC (rev 8527)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 15:30:02 UTC (rev 8528)
@@ -374,6 +374,21 @@
}
enabled = false;
+
+ // The same context will be replicated on the pending tokens...
+ // as the multiple operations will be replicated on the same context
+ while (!pendingTokens.isEmpty())
+ {
+ OperationContext ctx = pendingTokens.poll();
+ try
+ {
+ ctx.replicationDone();
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error completing callback on replication manager", e);
+ }
+ }
if (replicatingChannel != null)
{
15 years, 1 month
JBoss hornetq SVN: r8527 - trunk/src/main/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-03 10:15:26 -0500 (Thu, 03 Dec 2009)
New Revision: 8527
Modified:
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
server shutdown
* do not wait for pending completions when stopping the replication manager
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 15:13:37 UTC (rev 8526)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 15:15:26 UTC (rev 8527)
@@ -374,21 +374,6 @@
}
enabled = false;
-
- // The same context will be replicated on the pending tokens...
- // as the multiple operations will be replicated on the same context
- while (!pendingTokens.isEmpty())
- {
- OperationContext ctx = pendingTokens.poll();
- try
- {
- ctx.replicationDone();
- }
- catch (Throwable e)
- {
- log.warn("Error completing callback on replication manager", e);
- }
- }
if (replicatingChannel != null)
{
15 years, 1 month