[hornetq-commits] JBoss hornetq SVN: r11179 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 10 07:43:39 EDT 2011


Author: borges
Date: 2011-08-10 07:43:38 -0400 (Wed, 10 Aug 2011)
New Revision: 11179

Modified:
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-720 Add support for appendDelete & more tests

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-08-10 10:30:16 UTC (rev 11178)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-08-10 11:43:38 UTC (rev 11179)
@@ -13,6 +13,7 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
 import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
 
 /**
@@ -111,9 +112,10 @@
    }
 
    @Override
-   public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+   public void appendDeleteRecord(long id, boolean sync, IOCompletion callback) throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
+      writeRecord(deleteRecord, sync, callback);
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-10 10:30:16 UTC (rev 11178)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-10 11:43:38 UTC (rev 11179)
@@ -24,7 +24,6 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -49,6 +48,8 @@
       locator.setBlockOnDurableSend(true);
       locator.setReconnectAttempts(-1);
       sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+      handler = new ReplicationChannelHandler();
+      liveServer.addInterceptor(new BackupSyncDelay(handler));
    }
 
    public void testNodeID() throws Exception
@@ -62,8 +63,6 @@
 
    public void testReserveFileIdValuesOnBackup() throws Exception
    {
-      handler = new ReplicationChannelHandler();
-      liveServer.addInterceptor(new BackupSyncDelay(handler));
       createProducerSendSomeMessages();
       JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
       for (int i = 0; i < 5; i++)
@@ -71,28 +70,90 @@
          messageJournal.forceMoveNextFile();
          sendMessages(session, producer, N_MSGS);
       }
+
       backupServer.start();
+
       waitForBackup(sessionFactory, 10, false);
 
       // SEND more messages, now with the backup replicating
       sendMessages(session, producer, N_MSGS);
+      Set<Long> liveIds = getFileIds(messageJournal);
+
+      finishSyncAndFailover();
+
+      JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+      Set<Long> backupIds = getFileIds(backupMsgJournal);
+      assertEquals("File IDs must match!", liveIds, backupIds);
+   }
+
+   public void testReplicationDuringSync() throws Exception
+   {
+      createProducerSendSomeMessages();
+      backupServer.start();
+      waitForBackup(sessionFactory, 10, false);
+
+      sendMessages(session, producer, N_MSGS);
+      session.commit();
+      receiveMsgs(0, N_MSGS);
+      finishSyncAndFailover();
+   }
+
+   private void finishSyncAndFailover() throws Exception
+   {
       handler.deliver = true;
+      // must send one more message to have the "SYNC is DONE" msg delivered.
       sendMessages(session, producer, 1);
-
       waitForBackup(sessionFactory, 10, true);
-
-      Set<Long> liveIds = getFileIds(messageJournal);
       assertFalse("should not be initialized", backupServer.getServer().isInitialised());
       crash(session);
-      waitForServerInitialization(backupServer.getServer(), 5);
+      waitForServerInitialization(backupServer, 5);
+   }
 
-      JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
-      Set<Long> backupIds = getFileIds(backupMsgJournal);
-      assertEquals("File IDs must match!", liveIds, backupIds);
+   public void testMessageSyncSimple() throws Exception
+   {
+      createProducerSendSomeMessages();
+      startBackupCrashLive();
+      receiveMsgs(0, N_MSGS);
    }
 
-   private static void waitForServerInitialization(HornetQServer server, int seconds)
+   public void testMessageSync() throws Exception
    {
+      createProducerSendSomeMessages();
+      receiveMsgs(0, N_MSGS / 2);
+      startBackupCrashLive();
+      receiveMsgs(N_MSGS / 2, N_MSGS);
+   }
+
+   private void startBackupCrashLive() throws Exception
+   {
+      assertFalse("backup is started?", backupServer.isStarted());
+      handler.setHold(false);
+      backupServer.start();
+      waitForBackup(sessionFactory, 5);
+      crash(session);
+      waitForServerInitialization(backupServer, 5);
+   }
+
+   private void createProducerSendSomeMessages() throws HornetQException, Exception
+   {
+      session = sessionFactory.createSession(true, true);
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+      producer = session.createProducer(FailoverTestBase.ADDRESS);
+      sendMessages(session, producer, N_MSGS);
+      session.commit();
+   }
+
+   private void receiveMsgs(int start, int end) throws HornetQException
+   {
+      session.start();
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      receiveMessagesAndAck(consumer, start, end);
+      consumer.close();
+      session.commit();
+   }
+
+   private static void waitForServerInitialization(TestableServer server, int seconds)
+   {
       long time = System.currentTimeMillis();
       long toWait = seconds * 1000;
       while (!server.isInitialised())
@@ -111,7 +172,6 @@
          }
       }
    }
-
    private Set<Long> getFileIds(JournalImpl journal)
    {
       Set<Long> results = new HashSet<Long>();
@@ -128,40 +188,6 @@
       return (JournalImpl)sm.getMessageJournal();
    }
 
-   public void testMessageSync() throws Exception
-   {
-      createProducerSendSomeMessages();
-
-      receiveMsgs(0, N_MSGS / 2);
-      assertFalse("backup is not started!", backupServer.isStarted());
-
-      // BLOCK ON journals
-      backupServer.start();
-
-      waitForBackup(sessionFactory, 5);
-      crash(session);
-
-      // consume N/2 from 'new' live (the old backup)
-      receiveMsgs(N_MSGS / 2, N_MSGS);
-   }
-
-   private void createProducerSendSomeMessages() throws HornetQException, Exception
-   {
-      session = sessionFactory.createSession(true, true);
-      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-      producer = session.createProducer(FailoverTestBase.ADDRESS);
-
-      sendMessages(session, producer, N_MSGS);
-      session.start();
-   }
-
-   private void receiveMsgs(int start, int end) throws HornetQException
-   {
-      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-      receiveMessagesAndAck(consumer, start, end);
-      session.commit();
-   }
-
    @Override
    protected void tearDown() throws Exception
    {
@@ -233,6 +259,7 @@
       private Packet onHold;
       private Channel channel;
       public volatile boolean deliver;
+      private boolean mustHold = true;
 
       public void addSubHandler(ReplicationEndpoint handler)
       {
@@ -244,6 +271,11 @@
          this.channel = channel;
       }
 
+      public void setHold(boolean hold)
+      {
+         mustHold = hold;
+      }
+
       @Override
       public void handlePacket(Packet packet)
       {
@@ -264,7 +296,7 @@
             }
          }
 
-         if (packet.getType() == PacketImpl.REPLICATION_SYNC)
+         if (packet.getType() == PacketImpl.REPLICATION_SYNC && mustHold)
          {
             ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
             if (syncMsg.isUpToDate())

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-08-10 10:30:16 UTC (rev 11178)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-08-10 11:43:38 UTC (rev 11179)
@@ -513,7 +513,7 @@
    }
 
    /**
-    * Send messages with pre-specified body.
+    * Send durable messages with pre-specified body.
     * @param session
     * @param producer
     * @param numMessages
@@ -531,13 +531,13 @@
    }
 
 
-   protected final
-            void receiveMessagesAndAck(ClientConsumer consumer, int start, int msgCount) throws HornetQException
+   protected final void receiveMessagesAndAck(ClientConsumer consumer, final int start, int msgCount)
+            throws HornetQException
    {
       for (int i = start; i < msgCount; i++)
       {
          ClientMessage message = consumer.receive(1000);
-         Assert.assertNotNull(message);
+         Assert.assertNotNull("Expecting a message " + i, message);
          assertMessageBody(i, message);
          Assert.assertEquals(i, message.getIntProperty("counter").intValue());
          message.acknowledge();



More information about the hornetq-commits mailing list