Author: borges
Date: 2011-10-27 07:40:08 -0400 (Thu, 27 Oct 2011)
New Revision: 11610
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
HORNETQ-720 Fix incorrect transaction parameters.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java 2011-10-27
11:39:36 UTC (rev 11609)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java 2011-10-27
11:40:08 UTC (rev 11610)
@@ -37,6 +37,8 @@
private long txId;
+ private boolean sync;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -46,12 +48,13 @@
super(PacketImpl.REPLICATION_COMMIT_ROLLBACK);
}
- public ReplicationCommitMessage(final byte journalID, final boolean rollback, final
long txId)
+ public ReplicationCommitMessage(final byte journalID, final boolean rollback, final
long txId, boolean sync)
{
this();
this.journalID = journalID;
this.rollback = rollback;
this.txId = txId;
+ this.sync = sync;
}
// Public --------------------------------------------------------
@@ -62,6 +65,7 @@
buffer.writeByte(journalID);
buffer.writeBoolean(rollback);
buffer.writeLong(txId);
+ buffer.writeBoolean(sync);
}
@Override
@@ -70,6 +74,7 @@
journalID = buffer.readByte();
rollback = buffer.readBoolean();
txId = buffer.readLong();
+ sync = buffer.readBoolean();
}
public boolean isRollback()
@@ -82,6 +87,11 @@
return txId;
}
+ public boolean getSync()
+ {
+ return sync;
+ }
+
/**
* @return the journalID
*/
@@ -89,13 +99,4 @@
{
return journalID;
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-10-27
11:39:36 UTC (rev 11609)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-10-27
11:40:08 UTC (rev 11610)
@@ -48,11 +48,11 @@
void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws
Exception;
- void appendCommitRecord(byte journalID, long txID, boolean lineUp) throws Exception;
+ void appendCommitRecord(byte journalID, long txID, boolean sync, boolean lineUp)
throws Exception;
void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData)
throws Exception;
- void appendRollbackRecord(byte journalID, long txID) throws Exception;
+ void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
/** A list of tokens that are still waiting for replications to be completed */
Set<OperationContext> getActiveTokens();
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-10-27
11:39:36 UTC (rev 11609)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-10-27
11:40:08 UTC (rev 11610)
@@ -38,7 +38,6 @@
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
* @see JournalStorageManager
- *
*/
public class ReplicatedJournal implements Journal
{
@@ -177,7 +176,7 @@
{
ReplicatedJournal.trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID, true);
+ replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync);
}
@@ -190,7 +189,7 @@
{
ReplicatedJournal.trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID, true);
+ replicationManager.appendCommitRecord(journalID, txID, sync, true);
localJournal.appendCommitRecord(txID, sync, callback);
}
@@ -203,9 +202,8 @@
{
ReplicatedJournal.trace("AppendCommit " + txID);
}
- replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
+ replicationManager.appendCommitRecord(journalID, txID, sync, lineUpContext);
localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
-
}
@@ -225,9 +223,6 @@
localJournal.appendDeleteRecord(id, sync);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean,
org.hornetq.core.journal.IOCompletion)
- */
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion
completionCallback) throws Exception
{
if (ReplicatedJournal.trace)
@@ -351,7 +346,7 @@
{
ReplicatedJournal.trace("AppendRollback " + txID);
}
- replicationManager.appendRollbackRecord(journalID, txID);
+ replicationManager.appendRollbackRecord(journalID, txID, sync);
localJournal.appendRollbackRecord(txID, sync);
}
@@ -364,7 +359,7 @@
{
ReplicatedJournal.trace("AppendRollback " + txID);
}
- replicationManager.appendRollbackRecord(journalID, txID);
+ replicationManager.appendRollbackRecord(journalID, txID, sync);
localJournal.appendRollbackRecord(txID, sync, callback);
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-27
11:39:36 UTC (rev 11609)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-27
11:40:08 UTC (rev 11610)
@@ -683,11 +683,11 @@
if (packet.isRollback())
{
- journalToUse.appendRollbackRecord(packet.getTxId(), false);
+ journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
}
else
{
- journalToUse.appendCommitRecord(packet.getTxId(), false);
+ journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-10-27
11:39:36 UTC (rev 11609)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-10-27
11:40:08 UTC (rev 11610)
@@ -167,11 +167,13 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long,
boolean)
*/
- public void appendCommitRecord(final byte journalID, final long txID, final boolean
lineUp) throws Exception
+ public
+ void
+ appendCommitRecord(final byte journalID, final long txID, boolean sync, final
boolean lineUp) throws Exception
{
if (enabled)
{
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID),
lineUp);
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID, sync),
lineUp);
}
}
@@ -214,11 +216,11 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte,
long, boolean)
*/
- public void appendRollbackRecord(final byte journalID, final long txID) throws
Exception
+ public void appendRollbackRecord(final byte journalID, final long txID, boolean sync)
throws Exception
{
if (enabled)
{
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, true, txID, sync));
}
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-10-27
11:39:36 UTC (rev 11609)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-10-27
11:40:08 UTC (rev 11610)
@@ -299,7 +299,7 @@
{
ClientMessage message = consumer.receive(1000);
- assertNotNull(message);
+ assertNotNull("Expecting message #" + i, message);
message.acknowledge();
}