[hornetq-commits] JBoss hornetq SVN: r11141 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 5 11:00:17 EDT 2011


Author: borges
Date: 2011-08-05 11:00:17 -0400 (Fri, 05 Aug 2011)
New Revision: 11141

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Control when backup reaches "up-to-date" status during tests

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-08-05 15:00:17 UTC (rev 11141)
@@ -21,7 +21,7 @@
    private JournalContent journalType;
    /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
    private long fileId;
-   private boolean backupIsUpToDate = false;
+   private boolean backupIsUpToDate;
    private byte[] byteArray;
 
    public ReplicationJournalFileMessage()
@@ -33,6 +33,7 @@
    {
       this();
       this.fileId = id;
+      this.backupIsUpToDate = id == -1;
       this.dataSize = size;
       this.data = buffer;
       this.journalType = content;

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java	2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java	2011-08-05 15:00:17 UTC (rev 11141)
@@ -23,4 +23,4 @@
 public interface IOCompletion extends IOAsyncTask
 {
    void storeLineUp();
-}
+}
\ No newline at end of file

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-05 15:00:17 UTC (rev 11141)
@@ -4,6 +4,7 @@
 import java.util.Set;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientProducer;
@@ -13,7 +14,14 @@
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.TransportConfigurationUtils;
 
@@ -24,6 +32,7 @@
    private ClientSessionFactoryInternal sessionFactory;
    private ClientSession session;
    private ClientProducer producer;
+   private ReplicationChannelHandler handler;
    private static final int N_MSGS = 100;
 
    @Override
@@ -49,6 +58,8 @@
 
    public void testReserveFileIdValuesOnBackup() throws Exception
    {
+      handler = new ReplicationChannelHandler();
+      liveServer.addInterceptor(new BackupSyncDelay(handler));
       createProducerSendSomeMessages();
       JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
       for (int i = 0; i < 5; i++)
@@ -57,11 +68,14 @@
          sendMessages(session, producer, N_MSGS);
       }
       backupServer.start();
-      waitForBackup(sessionFactory, 10);
+      waitForBackup(sessionFactory, 10, false);
 
       // SEND more messages, now with the backup replicating
       sendMessages(session, producer, N_MSGS);
 
+      handler.notifyAll();
+      waitForBackup(sessionFactory, 10, true);
+
       Set<Long> liveIds = getFileIds(messageJournal);
       assertFalse("should not be initialized", backupServer.getServer().isInitialised());
       crash(session);
@@ -146,6 +160,10 @@
    @Override
    protected void tearDown() throws Exception
    {
+      if (handler != null)
+      {
+         handler.notifyAll();
+      }
       if (sessionFactory != null)
          sessionFactory.close();
       if (session != null)
@@ -173,4 +191,71 @@
       return TransportConfigurationUtils.getInVMConnector(live);
    }
 
+   private class BackupSyncDelay implements Interceptor
+   {
+
+      private final ReplicationChannelHandler handler;
+
+      public BackupSyncDelay(ReplicationChannelHandler handler)
+      {
+         this.handler = handler;
+         // TODO Auto-generated constructor stub
+      }
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+      {
+         if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+         {
+            try
+            {
+               ReplicationEndpoint repEnd = backupServer.getServer().getReplicationEndpoint();
+               handler.addSubHandler(repEnd);
+               Channel repChannel = repEnd.getChannel();
+               repChannel.setHandler(handler);
+            }
+            catch (Exception e)
+            {
+               throw new RuntimeException(e);
+            }
+         }
+         return true;
+      }
+
+   }
+
+   private static class ReplicationChannelHandler implements ChannelHandler
+   {
+
+      private ChannelHandler handler;
+
+      public void addSubHandler(ChannelHandler handler)
+      {
+         this.handler = handler;
+      }
+
+      @Override
+      public void handlePacket(Packet packet)
+      {
+         System.out.println(packet);
+         if (packet.getType() == PacketImpl.REPLICATION_SYNC)
+         {
+            ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
+            if (syncMsg.isUpToDate())
+            {
+               // Hold the message that notifies the backup that sync is done.
+               try
+               {
+                  wait();
+               }
+               catch (InterruptedException e)
+               {
+                  // no-op
+               }
+            }
+         }
+         handler.handlePacket(packet);
+      }
+
+   }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-08-05 14:59:02 UTC (rev 11140)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-08-05 15:00:17 UTC (rev 11141)
@@ -240,12 +240,22 @@
     */
    protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds) throws Exception
    {
+      waitForBackup(sessionFactory, seconds, true);
+   }
+
+   /**
+    * @param sessionFactory
+    * @param seconds
+    * @param waitForSync whether to wait for sync'ing data with the live to finish or not
+    */
+   protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds, boolean waitForSync)
+   {
       final long toWait = seconds * 1000;
       final long time = System.currentTimeMillis();
       final HornetQServerImpl actualServer = (HornetQServerImpl)backupServer.getServer();
       while (true)
       {
-         if (sessionFactory.getBackupConnector() != null && actualServer.isRemoteBackupUpToDate())
+         if (sessionFactory.getBackupConnector() != null && (actualServer.isRemoteBackupUpToDate() || !waitForSync))
          {
             break;
          }
@@ -263,9 +273,6 @@
             //ignore
          }
       }
-
-      System.out.println("Backup server state: [started=" + actualServer.isStarted() + ", upToDate=" +
-               actualServer.isRemoteBackupUpToDate() + "]");
    }
 
    protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)



More information about the hornetq-commits mailing list