[hornetq-commits] JBoss hornetq SVN: r11610 - in trunk: hornetq-core/src/main/java/org/hornetq/core/replication and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 27 07:40:08 EDT 2011


Author: borges
Date: 2011-10-27 07:40:08 -0400 (Thu, 27 Oct 2011)
New Revision: 11610

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
HORNETQ-720 Fix incorrect transaction parameters.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java	2011-10-27 11:39:36 UTC (rev 11609)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationCommitMessage.java	2011-10-27 11:40:08 UTC (rev 11610)
@@ -37,6 +37,8 @@
 
    private long txId;
 
+   private boolean sync;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -46,12 +48,13 @@
       super(PacketImpl.REPLICATION_COMMIT_ROLLBACK);
    }
 
-   public ReplicationCommitMessage(final byte journalID, final boolean rollback, final long txId)
+   public ReplicationCommitMessage(final byte journalID, final boolean rollback, final long txId, boolean sync)
    {
       this();
       this.journalID = journalID;
       this.rollback = rollback;
       this.txId = txId;
+      this.sync = sync;
    }
 
    // Public --------------------------------------------------------
@@ -62,6 +65,7 @@
       buffer.writeByte(journalID);
       buffer.writeBoolean(rollback);
       buffer.writeLong(txId);
+      buffer.writeBoolean(sync);
    }
 
    @Override
@@ -70,6 +74,7 @@
       journalID = buffer.readByte();
       rollback = buffer.readBoolean();
       txId = buffer.readLong();
+      sync = buffer.readBoolean();
    }
 
    public boolean isRollback()
@@ -82,6 +87,11 @@
       return txId;
    }
 
+   public boolean getSync()
+   {
+      return sync;
+   }
+
    /**
     * @return the journalID
     */
@@ -89,13 +99,4 @@
    {
       return journalID;
    }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-10-27 11:39:36 UTC (rev 11609)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-10-27 11:40:08 UTC (rev 11610)
@@ -48,11 +48,11 @@
 
    void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception;
 
-   void appendCommitRecord(byte journalID, long txID, boolean lineUp) throws Exception;
+   void appendCommitRecord(byte journalID, long txID, boolean sync, boolean lineUp) throws Exception;
 
    void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception;
 
-   void appendRollbackRecord(byte journalID, long txID) throws Exception;
+   void appendRollbackRecord(byte journalID, long txID, boolean sync) throws Exception;
 
    /** A list of tokens that are still waiting for replications to be completed */
    Set<OperationContext> getActiveTokens();

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-10-27 11:39:36 UTC (rev 11609)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-10-27 11:40:08 UTC (rev 11610)
@@ -38,7 +38,6 @@
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
  * @see JournalStorageManager
- *
  */
 public class ReplicatedJournal implements Journal
 {
@@ -177,7 +176,7 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID, true);
+      replicationManager.appendCommitRecord(journalID, txID, sync, true);
       localJournal.appendCommitRecord(txID, sync);
    }
 
@@ -190,7 +189,7 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID, true);
+      replicationManager.appendCommitRecord(journalID, txID, sync, true);
       localJournal.appendCommitRecord(txID, sync, callback);
    }
 
@@ -203,9 +202,8 @@
       {
          ReplicatedJournal.trace("AppendCommit " + txID);
       }
-      replicationManager.appendCommitRecord(journalID, txID, lineUpContext);
+      replicationManager.appendCommitRecord(journalID, txID, sync, lineUpContext);
       localJournal.appendCommitRecord(txID, sync, callback, lineUpContext);
-
    }
 
 
@@ -225,9 +223,6 @@
       localJournal.appendDeleteRecord(id, sync);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean, org.hornetq.core.journal.IOCompletion)
-    */
    public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion completionCallback) throws Exception
    {
       if (ReplicatedJournal.trace)
@@ -351,7 +346,7 @@
       {
          ReplicatedJournal.trace("AppendRollback " + txID);
       }
-      replicationManager.appendRollbackRecord(journalID, txID);
+      replicationManager.appendRollbackRecord(journalID, txID, sync);
       localJournal.appendRollbackRecord(txID, sync);
    }
 
@@ -364,7 +359,7 @@
       {
          ReplicatedJournal.trace("AppendRollback " + txID);
       }
-      replicationManager.appendRollbackRecord(journalID, txID);
+      replicationManager.appendRollbackRecord(journalID, txID, sync);
       localJournal.appendRollbackRecord(txID, sync, callback);
    }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-27 11:39:36 UTC (rev 11609)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-27 11:40:08 UTC (rev 11610)
@@ -683,11 +683,11 @@
 
       if (packet.isRollback())
       {
-         journalToUse.appendRollbackRecord(packet.getTxId(), false);
+         journalToUse.appendRollbackRecord(packet.getTxId(), packet.getSync());
       }
       else
       {
-         journalToUse.appendCommitRecord(packet.getTxId(), false);
+         journalToUse.appendCommitRecord(packet.getTxId(), packet.getSync());
       }
    }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-10-27 11:39:36 UTC (rev 11609)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-10-27 11:40:08 UTC (rev 11610)
@@ -167,11 +167,13 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
     */
-   public void appendCommitRecord(final byte journalID, final long txID, final boolean lineUp) throws Exception
+   public
+            void
+            appendCommitRecord(final byte journalID, final long txID, boolean sync, final boolean lineUp) throws Exception
    {
       if (enabled)
       {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID, sync), lineUp);
       }
    }
 
@@ -214,11 +216,11 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
     */
-   public void appendRollbackRecord(final byte journalID, final long txID) throws Exception
+   public void appendRollbackRecord(final byte journalID, final long txID, boolean sync) throws Exception
    {
       if (enabled)
       {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, true, txID, sync));
       }
    }
 

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-10-27 11:39:36 UTC (rev 11609)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-10-27 11:40:08 UTC (rev 11610)
@@ -299,7 +299,7 @@
       {
          ClientMessage message = consumer.receive(1000);
 
-         assertNotNull(message);
+         assertNotNull("Expecting message #" + i, message);
 
          message.acknowledge();
       }



More information about the hornetq-commits mailing list