[hornetq-commits] JBoss hornetq SVN: r9407 - in trunk: src/main/org/hornetq/core/server/impl and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jul 19 05:06:53 EDT 2010
Author: jmesnil
Date: 2010-07-19 05:06:52 -0400 (Mon, 19 Jul 2010)
New Revision: 9407
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-442: Out of Order delivery with depaging during a transaction
* ensure that during a tx completion, message routed to a an address which is depaging keep the transactional context
and remain in order
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -14,8 +14,6 @@
package org.hornetq.core.postoffice.impl;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -60,9 +58,9 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -1104,20 +1102,14 @@
private class PageMessageOperation implements TransactionOperation
{
private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
-
+
+ private Transaction subTX = null;
+
void addMessageToPage(final ServerMessage message)
{
messagesToPage.add(message);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
- */
- public Collection<Queue> getDistinctQueues()
- {
- return Collections.emptySet();
- }
-
public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -1130,10 +1122,19 @@
{
pageTransaction.commit();
}
+
+ if (subTX != null)
+ {
+ subTX.afterCommit();
+ }
}
public void afterPrepare(final Transaction tx)
{
+ if (subTX != null)
+ {
+ subTX.afterPrepare();
+ }
}
public void afterRollback(final Transaction tx)
@@ -1144,6 +1145,11 @@
{
pageTransaction.rollback();
}
+
+ if (subTX != null)
+ {
+ subTX.afterRollback();
+ }
}
public void beforeCommit(final Transaction tx) throws Exception
@@ -1152,15 +1158,30 @@
{
pageMessages(tx);
}
+
+ if (subTX != null)
+ {
+ subTX.beforeCommit();
+ }
+
}
public void beforePrepare(final Transaction tx) throws Exception
{
pageMessages(tx);
+
+ if (subTX != null)
+ {
+ subTX.beforePrepare();
+ }
}
public void beforeRollback(final Transaction tx) throws Exception
{
+ if (subTX != null)
+ {
+ subTX.beforeRollback();
+ }
}
private void pageMessages(final Transaction tx) throws Exception
@@ -1201,9 +1222,13 @@
else
{
// This could happen when the PageStore left the pageState
-
- // TODO is this correct - don't we lose transactionality here???
- route(message, false);
+ // we create a copy of the transaction so that messages are routed with the same tx ID.
+ // but we can not use directly the tx as it has already its own set of TransactionOperations
+ if (subTX == null)
+ {
+ subTX = tx.copy();
+ }
+ route(message, subTX, false);
}
first = false;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -1385,7 +1385,7 @@
return status;
}
- private void postAcknowledge(final MessageReference ref) throws Exception
+ private void postAcknowledge(final MessageReference ref)
{
final ServerMessage message = ref.getMessage();
@@ -1423,7 +1423,14 @@
queue.deliveringCount.decrementAndGet();
- message.decrementRefCount();
+ try
+ {
+ message.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
+ }
}
void postRollback(final LinkedList<MessageReference> refs)
@@ -1500,7 +1507,7 @@
}
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
for (MessageReference ref : refsToAck)
{
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -22,7 +22,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
- */
+ */
public interface Transaction
{
void prepare() throws Exception;
@@ -32,6 +32,10 @@
void commit(boolean onePhase) throws Exception;
void rollback() throws Exception;
+
+ /** Used for pages during commit.
+ * When paging messages we need to guarantee that they are in the same transaction (but not with the same set of TransactionOperation). */
+ Transaction copy();
int getOperationsCount();
@@ -64,9 +68,24 @@
void setContainsPersistent();
void setTimeout(int timeout);
+
+ // To be used by sub-contexts. Mainly on paging
+
+ void beforeCommit() throws Exception;
+
+ void beforeRollback() throws Exception;
+
+ void beforePrepare() throws Exception;;
+
+ void afterPrepare();
+
+ void afterCommit();
+
+ void afterRollback();
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
}
+
}
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -24,13 +24,16 @@
{
void beforePrepare(Transaction tx) throws Exception;
+ /** After prepare shouldn't throw any exception. Any verification has to be done on before prepare */
+ void afterPrepare(Transaction tx);
+
void beforeCommit(Transaction tx) throws Exception;
+ /** After commit shouldn't throw any exception. Any verification has to be done on before commit */
+ void afterCommit(Transaction tx);
+
void beforeRollback(Transaction tx) throws Exception;
- void afterPrepare(Transaction tx) throws Exception;
-
- void afterCommit(Transaction tx) throws Exception;
-
- void afterRollback(Transaction tx) throws Exception;
+ /** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
+ void afterRollback(Transaction tx);
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -73,6 +73,20 @@
this.timeoutSeconds = timeoutSeconds;
}
+ /** Used for copying */
+ private TransactionImpl(final TransactionImpl other)
+ {
+ this.storageManager = other.storageManager;
+
+ this.xid = other.xid;
+
+ this.id = other.id;
+
+ this.createTime = other.createTime;
+
+ this.timeoutSeconds = other.timeoutSeconds;
+ }
+
public TransactionImpl(final StorageManager storageManager)
{
this.storageManager = storageManager;
@@ -130,7 +144,7 @@
{
return createTime;
}
-
+
public boolean hasTimedOut(final long currentTime,final int defaultTimeout)
{
if(timeoutSeconds == - 1)
@@ -169,13 +183,7 @@
throw new IllegalStateException("Cannot prepare non XA transaction");
}
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- operation.beforePrepare(this);
- }
- }
+ beforePrepare();
storageManager.prepare(id, xid);
@@ -195,22 +203,7 @@
public void done()
{
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterPrepare(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- TransactionImpl.log.warn(e.getMessage(), e);
- }
- }
- }
+ afterPrepare();
}
});
}
@@ -252,15 +245,9 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
}
+
+ beforeCommit();
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- operation.beforeCommit(this);
- }
- }
-
if (containsPersistent || xid != null && state == State.PREPARED)
{
storageManager.commit(id);
@@ -285,22 +272,7 @@
public void done()
{
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterCommit(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- TransactionImpl.log.warn(e.getMessage(), e);
- }
- }
- }
+ afterCommit();
}
});
@@ -326,13 +298,7 @@
}
}
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- operation.beforeRollback(this);
- }
- }
+ beforeRollback();
doRollback();
@@ -353,22 +319,7 @@
public void done()
{
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterRollback(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- TransactionImpl.log.warn(e.getMessage(), e);
- }
- }
- }
+ afterRollback();
}
});
}
@@ -471,4 +422,75 @@
}
}
+ public Transaction copy()
+ {
+ return new TransactionImpl(this);
+ }
+
+ public void afterCommit()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.afterCommit(this);
+ }
+ }
+ }
+
+ public void afterRollback()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.afterRollback(this);
+ }
+ }
+ }
+
+ public void beforeCommit() throws Exception
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.beforeCommit(this);
+ }
+ }
+ }
+
+ public void beforePrepare() throws Exception
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.beforePrepare(this);
+ }
+ }
+ }
+
+ public void beforeRollback() throws Exception
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.beforeRollback(this);
+ }
+ }
+ }
+
+ public void afterPrepare()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.afterPrepare(this);
+ }
+ }
+ }
+
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -247,7 +247,7 @@
server.start();
- final int numberOfIntegers = 256;
+ final int messageSize = 1024; // 1k
try
{
@@ -263,7 +263,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
- byte[] body = new byte[DataConstants.SIZE_INT * numberOfIntegers];
+ byte[] body = new byte[messageSize];
// HornetQBuffer bodyLocal = HornetQChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
ClientMessage message = null;
@@ -372,7 +372,7 @@
* Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
*
*/
- public void disabled_testDepageDuringTransaction2() throws Exception
+ public void testDepageDuringTransaction2() throws Exception
{
boolean IS_DURABLE_MESSAGE = true;
clearData();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -292,6 +292,35 @@
}
+ public Transaction copy()
+ {
+ return null;
+ }
+
+ public void afterCommit()
+ {
+ }
+
+ public void afterPrepare()
+ {
+ }
+
+ public void afterRollback()
+ {
+ }
+
+ public void beforeCommit() throws Exception
+ {
+ }
+
+ public void beforePrepare() throws Exception
+ {
+ }
+
+ public void beforeRollback() throws Exception
+ {
+ }
+
}
class FakeMessage implements ServerMessage
More information about the hornetq-commits
mailing list