[hornetq-commits] JBoss hornetq SVN: r11094 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Aug 2 06:18:08 EDT 2011


Author: borges
Date: 2011-08-02 06:18:08 -0400 (Tue, 02 Aug 2011)
New Revision: 11094

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 FIX sending of replicated packets & document mechanism.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java	2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java	2011-08-02 10:18:08 UTC (rev 11094)
@@ -6,7 +6,11 @@
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Send all fileIDs used in the live server to the backup.
+ * Sends all fileIDs used in the live server to the backup. This is done so that we:
+ * <ol>
+ * <li>reserve those IDs in the backup;
+ * <li>start replicating while the journal synchronization is taking place.
+ * </ol>
  */
 public class ReplicationFileIdMessage extends PacketImpl
 {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-02 10:18:08 UTC (rev 11094)
@@ -410,7 +410,6 @@
       {
          throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
       }
-
       final Journal journalIf = journals[packet.getJournalContentType().typeByte];
 
       JournalImpl journal = assertJournalImpl(journalIf);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-02 10:18:08 UTC (rev 11094)
@@ -69,10 +69,6 @@
    // Attributes ----------------------------------------------------
 
    private final ResponseHandler responseHandler = new ResponseHandler();
-
-//   private final ClientSessionFactoryInternal sessionFactory;
-//   private CoreRemotingConnection replicatingConnection;
-
    private final Channel replicatingChannel;
 
    private boolean started;
@@ -424,6 +420,11 @@
       }
    }
 
+   /**
+    * @throws IllegalStateException By default, all replicated packets generate a replicated
+    *            response. If your packets are triggering this exception, it may be because the
+    *            packets were not sent with {@link #sendReplicatePacket(Packet)}.
+    */
    private void replicated()
    {
       OperationContext ctx = pendingTokens.poll();
@@ -523,7 +524,7 @@
          buffer.rewind();
 
          // sending -1 or 0 bytes will close the file at the backup
-         replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
+         sendReplicatePacket(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
          if (bytesRead == -1 || bytesRead == 0)
             break;
       }
@@ -532,13 +533,13 @@
    @Override
    public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
    {
-      replicatingChannel.send(new ReplicationFileIdMessage(datafiles, contentType));
+      sendReplicatePacket(new ReplicationFileIdMessage(datafiles, contentType));
    }
 
    @Override
    public void sendSynchronizationDone()
    {
-      replicatingChannel.send(new ReplicationJournalFileMessage(-1, null, null, -1));
+      sendReplicatePacket(new ReplicationJournalFileMessage(-1, null, null, -1));
    }
 
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-02 10:18:08 UTC (rev 11094)
@@ -15,6 +15,7 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -3276,6 +3277,7 @@
       writeLock();
       try
       {
+         log.info("Reserving fileIDs before synchronization: " + Arrays.toString(fileIds));
          long maxID = -1;
          for (long id : fileIds)
          {
@@ -3321,7 +3323,7 @@
          {
             filesRepository.addDataFileOnTop(file);
          }
-         // XXX HORNETQ-720 still missing a "reload" call
+         // XXX HORNETQ-720 still missing a "reload" call?
       }
       finally
       {

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-02 03:27:37 UTC (rev 11093)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-02 10:18:08 UTC (rev 11094)
@@ -58,6 +58,10 @@
       }
       backupServer.start();
       waitForBackup(sessionFactory, 10);
+
+      // SEND more messages, now with the backup replicating
+      sendMessages(session, producer, N_MSGS);
+
       Set<Long> liveIds = getFileIds(messageJournal);
       assertFalse("should not be initialized", backupServer.getServer().isInitialised());
       crash(session);
@@ -65,7 +69,7 @@
 
       JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
       Set<Long> backupIds = getFileIds(backupMsgJournal);
-      assertEquals("sets must match! " + liveIds, liveIds, backupIds);
+      assertEquals("File IDs must match!", liveIds, backupIds);
    }
 
    private static void waitForServerInitialization(HornetQServer server, int seconds)



More information about the hornetq-commits mailing list