[hornetq-commits] JBoss hornetq SVN: r11271 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 1 12:12:51 EDT 2011


Author: borges
Date: 2011-09-01 12:12:50 -0400 (Thu, 01 Sep 2011)
New Revision: 11271

Added:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Removed:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Modified:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
Log:
HORNETQ-720 tests for LargeMessage sync'ing

Deleted: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-09-01 13:36:39 UTC (rev 11270)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-09-01 16:12:50 UTC (rev 11271)
@@ -1,208 +0,0 @@
-package org.hornetq.tests.integration.cluster.failover;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientConsumer;
-import org.hornetq.api.core.client.ClientProducer;
-import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
-import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-import org.hornetq.tests.util.TransportConfigurationUtils;
-
-public class BackupJournalSyncTest extends FailoverTestBase
-{
-
-   private ServerLocatorInternal locator;
-   private ClientSessionFactoryInternal sessionFactory;
-   private ClientSession session;
-   private ClientProducer producer;
-   private BackupSyncDelay syncDelay;
-   private static final int N_MSGS = 100;
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      startBackupServer = false;
-      super.setUp();
-      locator = getServerLocator();
-      locator.setBlockOnNonDurableSend(true);
-      locator.setBlockOnDurableSend(true);
-      locator.setReconnectAttempts(-1);
-      sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
-      syncDelay = new BackupSyncDelay(backupServer, liveServer);
-   }
-
-   public void testNodeID() throws Exception
-   {
-      backupServer.start();
-      waitForComponent(backupServer, 5);
-      assertTrue("must be running", backupServer.isStarted());
-      assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
-                   backupServer.getServer().getNodeID());
-   }
-
-   public void testReserveFileIdValuesOnBackup() throws Exception
-   {
-      createProducerSendSomeMessages();
-      JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
-      for (int i = 0; i < 5; i++)
-      {
-         messageJournal.forceMoveNextFile();
-         sendMessages(session, producer, N_MSGS);
-      }
-
-      backupServer.start();
-
-      waitForBackup(sessionFactory, 10, false);
-
-      // SEND more messages, now with the backup replicating
-      sendMessages(session, producer, N_MSGS);
-      Set<Long> liveIds = getFileIds(messageJournal);
-
-      finishSyncAndFailover();
-
-      JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
-      Set<Long> backupIds = getFileIds(backupMsgJournal);
-      assertEquals("File IDs must match!", liveIds, backupIds);
-   }
-
-   public void testReplicationDuringSync() throws Exception
-   {
-      createProducerSendSomeMessages();
-      backupServer.start();
-      waitForBackup(sessionFactory, 10, false);
-
-      sendMessages(session, producer, N_MSGS);
-      session.commit();
-      receiveMsgs(0, N_MSGS);
-      finishSyncAndFailover();
-   }
-
-   private void finishSyncAndFailover() throws Exception
-   {
-      syncDelay.deliverUpToDateMsg();
-      waitForBackup(sessionFactory, 10, true);
-      assertFalse("should not be initialized", backupServer.getServer().isInitialised());
-      crash(session);
-      waitForServerInitialization(backupServer, 5);
-   }
-
-   public void testMessageSyncSimple() throws Exception
-   {
-      createProducerSendSomeMessages();
-      startBackupCrashLive();
-      receiveMsgs(0, N_MSGS);
-   }
-
-   public void testMessageSync() throws Exception
-   {
-      createProducerSendSomeMessages();
-      receiveMsgs(0, N_MSGS / 2);
-      startBackupCrashLive();
-      receiveMsgs(N_MSGS / 2, N_MSGS);
-   }
-
-   private void startBackupCrashLive() throws Exception
-   {
-      assertFalse("backup is started?", backupServer.isStarted());
-      liveServer.removeInterceptor(syncDelay);
-      backupServer.start();
-      waitForBackup(sessionFactory, 5);
-      crash(session);
-      waitForServerInitialization(backupServer, 5);
-   }
-
-   private void createProducerSendSomeMessages() throws HornetQException, Exception
-   {
-      session = sessionFactory.createSession(true, true);
-      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-      producer = session.createProducer(FailoverTestBase.ADDRESS);
-      sendMessages(session, producer, N_MSGS);
-      session.commit();
-   }
-
-   private void receiveMsgs(int start, int end) throws HornetQException
-   {
-      session.start();
-      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-      receiveMessagesAndAck(consumer, start, end);
-      consumer.close();
-      session.commit();
-   }
-
-   private static void waitForServerInitialization(TestableServer server, int seconds)
-   {
-      long time = System.currentTimeMillis();
-      long toWait = seconds * 1000;
-      while (!server.isInitialised())
-      {
-         try
-         {
-            Thread.sleep(50);
-         }
-         catch (InterruptedException e)
-         {
-            // ignore
-         }
-         if (System.currentTimeMillis() > (time + toWait))
-         {
-            fail("component did not start within timeout of " + seconds);
-         }
-      }
-   }
-   private Set<Long> getFileIds(JournalImpl journal)
-   {
-      Set<Long> results = new HashSet<Long>();
-      for (JournalFile jf : journal.getDataFiles())
-      {
-         results.add(Long.valueOf(jf.getFileID()));
-      }
-      return results;
-   }
-
-   static JournalImpl getMessageJournalFromServer(TestableServer server)
-   {
-      JournalStorageManager sm = (JournalStorageManager)server.getServer().getStorageManager();
-      return (JournalImpl)sm.getMessageJournal();
-   }
-
-   @Override
-   protected void tearDown() throws Exception
-   {
-      if (sessionFactory != null)
-         sessionFactory.close();
-      if (session != null)
-         session.close();
-      closeServerLocator(locator);
-
-      super.tearDown();
-   }
-
-   @Override
-   protected void createConfigs() throws Exception
-   {
-      createReplicatedConfigs();
-   }
-
-   @Override
-   protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
-   {
-      return TransportConfigurationUtils.getInVMAcceptor(live);
-   }
-
-   @Override
-   protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
-   {
-      return TransportConfigurationUtils.getInVMConnector(live);
-   }
-
-
-}

Copied: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java (from rev 11243, branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java)
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-09-01 16:12:50 UTC (rev 11271)
@@ -0,0 +1,209 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.tests.integration.cluster.util.BackupSyncDelay;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupSyncJournalTest extends FailoverTestBase
+{
+
+   private ServerLocatorInternal locator;
+   private ClientSessionFactoryInternal sessionFactory;
+   private ClientSession session;
+   private ClientProducer producer;
+   private BackupSyncDelay syncDelay;
+   private static final int N_MSGS = 100;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      startBackupServer = false;
+      super.setUp();
+      locator = getServerLocator();
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setReconnectAttempts(-1);
+      sessionFactory = createSessionFactoryAndWaitForTopology(locator, 1);
+      syncDelay = new BackupSyncDelay(backupServer, liveServer);
+   }
+
+   public void testNodeID() throws Exception
+   {
+      backupServer.start();
+      waitForComponent(backupServer, 5);
+      assertTrue("must be running", backupServer.isStarted());
+      assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
+                   backupServer.getServer().getNodeID());
+   }
+
+   public void testReserveFileIdValuesOnBackup() throws Exception
+   {
+      createProducerSendSomeMessages();
+      JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
+      for (int i = 0; i < 5; i++)
+      {
+         messageJournal.forceMoveNextFile();
+         sendMessages(session, producer, N_MSGS);
+      }
+
+      backupServer.start();
+
+      waitForBackup(sessionFactory, 10, false);
+
+      // SEND more messages, now with the backup replicating
+      sendMessages(session, producer, N_MSGS);
+      Set<Long> liveIds = getFileIds(messageJournal);
+
+      finishSyncAndFailover();
+
+      JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+      Set<Long> backupIds = getFileIds(backupMsgJournal);
+      assertEquals("File IDs must match!", liveIds, backupIds);
+   }
+
+   public void testReplicationDuringSync() throws Exception
+   {
+      createProducerSendSomeMessages();
+      backupServer.start();
+      waitForBackup(sessionFactory, 10, false);
+
+      sendMessages(session, producer, N_MSGS);
+      session.commit();
+      receiveMsgs(0, N_MSGS);
+      finishSyncAndFailover();
+   }
+
+   private void finishSyncAndFailover() throws Exception
+   {
+      syncDelay.deliverUpToDateMsg();
+      waitForBackup(sessionFactory, 10, true);
+      assertFalse("should not be initialized", backupServer.getServer().isInitialised());
+      crash(session);
+      waitForServerInitialization(backupServer, 5);
+   }
+
+   public void testMessageSyncSimple() throws Exception
+   {
+      createProducerSendSomeMessages();
+      startBackupCrashLive();
+      receiveMsgs(0, N_MSGS);
+   }
+
+   public void testMessageSync() throws Exception
+   {
+      createProducerSendSomeMessages();
+      receiveMsgs(0, N_MSGS / 2);
+      startBackupCrashLive();
+      receiveMsgs(N_MSGS / 2, N_MSGS);
+   }
+
+   private void startBackupCrashLive() throws Exception
+   {
+      assertFalse("backup is started?", backupServer.isStarted());
+      liveServer.removeInterceptor(syncDelay);
+      backupServer.start();
+      waitForBackup(sessionFactory, 5);
+      crash(session);
+      waitForServerInitialization(backupServer, 5);
+   }
+
+   private void createProducerSendSomeMessages() throws HornetQException, Exception
+   {
+      session = sessionFactory.createSession(true, true);
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+      producer = session.createProducer(FailoverTestBase.ADDRESS);
+      sendMessages(session, producer, N_MSGS);
+      session.commit();
+   }
+
+   private void receiveMsgs(int start, int end) throws HornetQException
+   {
+      session.start();
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      receiveMessagesAndAck(consumer, start, end);
+      consumer.close();
+      session.commit();
+   }
+
+   private static void waitForServerInitialization(TestableServer server, int seconds)
+   {
+      long time = System.currentTimeMillis();
+      long toWait = seconds * 1000;
+      while (!server.isInitialised())
+      {
+         try
+         {
+            Thread.sleep(50);
+         }
+         catch (InterruptedException e)
+         {
+            // ignore
+         }
+         if (System.currentTimeMillis() > (time + toWait))
+         {
+            fail("component did not start within timeout of " + seconds);
+         }
+      }
+   }
+
+   private Set<Long> getFileIds(JournalImpl journal)
+   {
+      Set<Long> results = new HashSet<Long>();
+      for (JournalFile jf : journal.getDataFiles())
+      {
+         results.add(Long.valueOf(jf.getFileID()));
+      }
+      return results;
+   }
+
+   static JournalImpl getMessageJournalFromServer(TestableServer server)
+   {
+      JournalStorageManager sm = (JournalStorageManager)server.getServer().getStorageManager();
+      return (JournalImpl)sm.getMessageJournal();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      if (sessionFactory != null)
+         sessionFactory.close();
+      if (session != null)
+         session.close();
+      closeServerLocator(locator);
+
+      super.tearDown();
+   }
+
+   @Override
+   protected void createConfigs() throws Exception
+   {
+      createReplicatedConfigs();
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+   {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+   {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+
+}

Added: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2011-09-01 16:12:50 UTC (rev 11271)
@@ -0,0 +1,37 @@
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
+
+public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
+{
+
+   /**
+    * @param i
+    * @param message
+    */
+   @Override
+   protected void assertMessageBody(final int i, final ClientMessage message)
+   {
+      assertLargeMessageBody(i, message);
+   }
+
+   @Override
+   protected ServerLocatorInternal getServerLocator() throws Exception
+   {
+      ServerLocator locator = super.getServerLocator();
+      locator.setMinLargeMessageSize(MIN_LARGE_MESSAGE);
+      return (ServerLocatorInternal)locator;
+   }
+
+   /**
+    * @param i
+    * @param message
+    */
+   @Override
+   protected void setBody(final int i, final ClientMessage message) throws Exception
+   {
+      setLargeMessageBody(i, message);
+   }
+}

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-09-01 13:36:39 UTC (rev 11270)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-09-01 16:12:50 UTC (rev 11271)
@@ -24,10 +24,12 @@
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.ClusterTopologyListener;
 import org.hornetq.api.core.client.HornetQClient;
@@ -47,6 +49,7 @@
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.ReplicatedBackupUtils;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.tests.util.UnitTestCase;
 
 /**
  * A FailoverTestBase
@@ -59,6 +62,11 @@
 
    protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
 
+   /*
+    * Used only by tests of large messages.
+    */
+   protected static final int MIN_LARGE_MESSAGE = 1024;
+   private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
 
    // Attributes ----------------------------------------------------
 
@@ -121,6 +129,40 @@
       return new SameProcessHornetQServer(createInVMFailoverServer(true, backupConfig, nodeManager));
    }
 
+   /**
+    * Large message version of {@link #setBody(int, ClientMessage)}.
+    * @param i
+    * @param message
+    * @param size
+    */
+   protected static void setLargeMessageBody(final int i, final ClientMessage message)
+   {
+      try
+      {
+         message.setBodyInputStream(UnitTestCase.createFakeLargeStream(LARGE_MESSAGE_SIZE));
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);
+      }
+   }
+
+   /**
+    * Large message version of {@link #assertMessageBody(int, ClientMessage)}.
+    * @param i
+    * @param message
+    */
+   protected static void assertLargeMessageBody(final int i, final ClientMessage message)
+   {
+      HornetQBuffer buffer = message.getBodyBuffer();
+
+      for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
+      {
+         Assert.assertTrue("expecting " + LARGE_MESSAGE_SIZE + " bytes, got " + j, buffer.readable());
+         Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
+      }
+   }
+
    protected void createConfigs() throws Exception
    {
       nodeManager = new InVMNodeManager();

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2011-09-01 13:36:39 UTC (rev 11270)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/LargeMessageFailoverTest.java	2011-09-01 16:12:50 UTC (rev 11271)
@@ -13,14 +13,10 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ServerLocatorInternal;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.tests.util.UnitTestCase;
 
 /**
  * A LargeMessageFailoverTest
@@ -36,11 +32,6 @@
 
    private static final Logger log = Logger.getLogger(LargeMessageFailoverTest.class);
 
-
-   private static final int MIN_LARGE_MESSAGE = 1024;
-
-   private static final int LARGE_MESSAGE_SIZE = MIN_LARGE_MESSAGE * 3;
-
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -53,17 +44,11 @@
 
    // Protected -----------------------------------------------------
 
-   /**
-    * @param name
-    */
    public LargeMessageFailoverTest(final String name)
    {
       super(name);
    }
 
-   /**
-    *
-    */
    public LargeMessageFailoverTest()
    {
       super();
@@ -90,13 +75,7 @@
    @Override
    protected void assertMessageBody(final int i, final ClientMessage message)
    {
-      HornetQBuffer buffer = message.getBodyBuffer();
-
-      for (int j = 0; j < LARGE_MESSAGE_SIZE; j++)
-      {
-         Assert.assertTrue("expecting more bytes", buffer.readable());
-         Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
-      }
+      assertLargeMessageBody(i, message);
    }
 
 
@@ -104,12 +83,10 @@
    protected ServerLocatorInternal getServerLocator() throws Exception
    {
       ServerLocator locator = super.getServerLocator();
-      locator.setMinLargeMessageSize(LARGE_MESSAGE_SIZE);
-      return (ServerLocatorInternal) locator;
+      locator.setMinLargeMessageSize(MIN_LARGE_MESSAGE);
+      return (ServerLocatorInternal)locator;
    }
 
-
-
    /**
     * @param i
     * @param message
@@ -117,11 +94,6 @@
    @Override
    protected void setBody(final int i, final ClientMessage message) throws Exception
    {
-      message.setBodyInputStream(UnitTestCase.createFakeLargeStream(LARGE_MESSAGE_SIZE));
+      setLargeMessageBody(i, message);
    }
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
 }



More information about the hornetq-commits mailing list