[hornetq-commits] JBoss hornetq SVN: r11291 - in branches/HORNETQ-720_Replication: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 2 07:52:18 EDT 2011


Author: borges
Date: 2011-09-02 07:52:16 -0400 (Fri, 02 Sep 2011)
New Revision: 11291

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Add tests for large-message delete, and improve other tests

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-02 11:52:16 UTC (rev 11291)
@@ -443,7 +443,10 @@
    }
 
    /**
-    * Assumes the
+    * Collects a list of existing large messages and their current size.
+    * <p>
+    * So we know how much of a given message to sync with the backup. Further data appends to the
+    * messages will be replicated normally.
     * @return
     * @throws Exception
     */

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-02 11:52:16 UTC (rev 11291)
@@ -6,6 +6,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
@@ -20,12 +21,13 @@
 public class BackupSyncJournalTest extends FailoverTestBase
 {
 
+   private static final int BACKUP_WAIT_TIME = 20;
    private ServerLocatorInternal locator;
    private ClientSessionFactoryInternal sessionFactory;
    private ClientSession session;
    private ClientProducer producer;
    private BackupSyncDelay syncDelay;
-   private static final int N_MSGS = 10;
+   protected static final int N_MSGS = 10;
 
    @Override
    protected void setUp() throws Exception
@@ -51,9 +53,10 @@
 
    public void testReserveFileIdValuesOnBackup() throws Exception
    {
+      final int totalRounds = 5;
       createProducerSendSomeMessages();
       JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
-      for (int i = 0; i < 5; i++)
+      for (int i = 0; i < totalRounds; i++)
       {
          messageJournal.forceMoveNextFile();
          sendMessages(session, producer, N_MSGS);
@@ -61,7 +64,7 @@
 
       backupServer.start();
 
-      waitForBackup(sessionFactory, 10, false);
+      waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
 
       // SEND more messages, now with the backup replicating
       sendMessages(session, producer, N_MSGS);
@@ -72,24 +75,53 @@
       JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
       Set<Long> backupIds = getFileIds(backupMsgJournal);
       assertEquals("File IDs must match!", liveIds, backupIds);
+
+      // "+ 2": there two other calls that send N_MSGS.
+      for (int i = 0; i < totalRounds + 2; i++)
+      {
+         receiveMsgsInRange(0, N_MSGS);
+      }
+      assertNoMoreMessages();
    }
 
+   private void assertNoMoreMessages() throws HornetQException
+   {
+      session.start();
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      ClientMessage msg = consumer.receive(200);
+      assertNull("there should be no more messages to receive! " + msg, msg);
+      consumer.close();
+      session.commit();
+
+   }
+
+   protected void startBackupFinishSyncing() throws Exception
+   {
+      backupServer.start();
+      syncDelay.deliverUpToDateMsg();
+      waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
+   }
+
    public void testReplicationDuringSync() throws Exception
    {
       createProducerSendSomeMessages();
       backupServer.start();
-      waitForBackup(sessionFactory, 10, false);
+      waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
 
       sendMessages(session, producer, N_MSGS);
       session.commit();
-      receiveMsgs(0, N_MSGS);
+      receiveMsgsInRange(0, N_MSGS);
+
       finishSyncAndFailover();
+
+      receiveMsgsInRange(0, N_MSGS);
+      assertNoMoreMessages();
    }
 
    private void finishSyncAndFailover() throws Exception
    {
       syncDelay.deliverUpToDateMsg();
-      waitForBackup(sessionFactory, 10, true);
+      waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
       assertFalse("should not be initialized", backupServer.getServer().isInitialised());
       crash(session);
       waitForServerInitialization(backupServer, 5);
@@ -99,15 +131,17 @@
    {
       createProducerSendSomeMessages();
       startBackupCrashLive();
-      receiveMsgs(0, N_MSGS);
+      receiveMsgsInRange(0, N_MSGS);
+      assertNoMoreMessages();
    }
 
    public void testMessageSync() throws Exception
    {
       createProducerSendSomeMessages();
-      receiveMsgs(0, N_MSGS / 2);
+      receiveMsgsInRange(0, N_MSGS / 2);
       startBackupCrashLive();
-      receiveMsgs(N_MSGS / 2, N_MSGS);
+      receiveMsgsInRange(N_MSGS / 2, N_MSGS);
+      assertNoMoreMessages();
    }
 
    private void startBackupCrashLive() throws Exception
@@ -115,12 +149,12 @@
       assertFalse("backup is started?", backupServer.isStarted());
       liveServer.removeInterceptor(syncDelay);
       backupServer.start();
-      waitForBackup(sessionFactory, 20);
+      waitForBackup(sessionFactory, BACKUP_WAIT_TIME);
       crash(session);
       waitForServerInitialization(backupServer, 5);
    }
 
-   private void createProducerSendSomeMessages() throws HornetQException, Exception
+   protected void createProducerSendSomeMessages() throws HornetQException, Exception
    {
       session = sessionFactory.createSession(true, true);
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
@@ -129,7 +163,7 @@
       session.commit();
    }
 
-   private void receiveMsgs(int start, int end) throws HornetQException
+   protected void receiveMsgsInRange(int start, int end) throws HornetQException
    {
       session.start();
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2011-09-02 11:52:16 UTC (rev 11291)
@@ -1,5 +1,9 @@
 package org.hornetq.tests.integration.cluster.failover;
 
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -7,10 +11,6 @@
 public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
 {
 
-   /**
-    * @param i
-    * @param message
-    */
    @Override
    protected void assertMessageBody(final int i, final ClientMessage message)
    {
@@ -25,13 +25,36 @@
       return (ServerLocatorInternal)locator;
    }
 
-   /**
-    * @param i
-    * @param message
-    */
    @Override
    protected void setBody(final int i, final ClientMessage message) throws Exception
    {
       setLargeMessageBody(i, message);
    }
+
+   // ------------------------
+
+   public void testDeleteLargeMessages() throws Exception
+   {
+      createProducerSendSomeMessages();
+      startBackupFinishSyncing();
+      File dir = new File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
+      System.out.println("Dir " + dir.getAbsolutePath() + " " + dir.exists());
+      // Set<Long> idsOnBkp = getAllMessageFileIds(dir);
+      receiveMsgsInRange(0, N_MSGS / 2);
+      assertEquals("we really ought to delete these after delivery", N_MSGS / 2, getAllMessageFileIds(dir).size());
+   }
+
+   private Set<Long> getAllMessageFileIds(File dir)
+   {
+      Set<Long> idsOnBkp = new HashSet<Long>();
+      for (String filename : dir.list())
+      {
+         if (filename.endsWith(".msg"))
+         {
+            idsOnBkp.add(Long.valueOf(filename.split("\\.")[0]));
+         }
+      }
+      return idsOnBkp;
+   }
+
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-02 00:30:14 UTC (rev 11290)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-02 11:52:16 UTC (rev 11291)
@@ -43,6 +43,7 @@
    {
       if (backup.isStarted())
          handler.deliver();
+      live.removeInterceptor(this);
    }
 
    public BackupSyncDelay(TestableServer backup, TestableServer live)



More information about the hornetq-commits mailing list