[hornetq-commits] JBoss hornetq SVN: r10993 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jul 18 12:32:02 EDT 2011


Author: borges
Date: 2011-07-18 12:32:01 -0400 (Mon, 18 Jul 2011)
New Revision: 10993

Modified:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Rewrite testSendPacketsWithFailure()

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-07-18 16:31:27 UTC (rev 10992)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-07-18 16:32:01 UTC (rev 10993)
@@ -34,6 +34,10 @@
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.Configuration;
@@ -93,6 +97,7 @@
    private ServerLocator locator;
 
    private ReplicationManager manager;
+   private static final SimpleString ADDRESS = new SimpleString("foobar123");
 
    // Static --------------------------------------------------------
 
@@ -102,7 +107,6 @@
 
    private void setupServer(boolean backup, boolean netty, String... interceptors) throws Exception
    {
-      assert backup; // XXX
 
       Configuration backupConfig = createDefaultConfig(netty);
       Configuration liveConfig = createDefaultConfig(netty);
@@ -129,10 +133,23 @@
       backupServer = new HornetQServerImpl(backupConfig);
       locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
       backupServer.start();
-      Thread.sleep(200); // XXX improve this
       waitForComponent(backupServer);
+      int count = 0;
+      waitForReplication(count);
    }
 
+   private void waitForReplication(int count) throws InterruptedException
+   {
+      if (liveServer == null)
+         return;
+
+      while (liveServer.getReplicationManager() == null && count < 10)
+      {
+         Thread.sleep(50);
+         count++;
+      }
+   }
+
    private static void waitForComponent(HornetQComponent component) throws Exception
    {
       waitForComponent(component, 3);
@@ -204,8 +221,8 @@
       blockOnReplication(storage, manager);
 
       PagingManager pagingManager =
-               createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(), backupServer.getExecutorFactory(),
-                                 backupServer.getAddressSettingsRepository());
+               createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(),
+                                 backupServer.getExecutorFactory(), backupServer.getAddressSettingsRepository());
 
       PagingStore store = pagingManager.getPageStore(dummy);
       store.start();
@@ -243,39 +260,59 @@
 
    public void testSendPacketsWithFailure() throws Exception
    {
-
+      int nMsg = 100;
       setupServer(true, false, TestInterceptor.class.getName());
 
-      StorageManager storage = getStorage();
       manager = liveServer.getReplicationManager();
+      waitForComponent(manager);
+      final ClientSession session = locator.createSessionFactory().createSession();
+      session.createQueue(ADDRESS, ADDRESS, null, true);
 
-      Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+      final ClientProducer producer = session.createProducer(ADDRESS);
+      sendMessages(session, producer, nMsg);
 
+      // Now we start intercepting the communication with the backup
       TestInterceptor.value.set(false);
+      sendMessages(session, producer, nMsg);
 
-      for (int i = 0; i < 500; i++)
+      session.start();
+      try
       {
-         replicatedJournal.appendAddRecord(i, (byte)1, new FakeData(), false);
-      }
-
-      final CountDownLatch latch = new CountDownLatch(1);
-      storage.afterCompleteOperations(new IOAsyncTask()
-      {
-
-         public void onError(final int errorCode, final String errorMessage)
+         final ClientConsumer consumer = session.createConsumer(ADDRESS);
+         for (int i = 0; i < 2 * nMsg; i++)
          {
+            ClientMessage message = consumer.receive(1000);
+            Assert.assertNotNull("Message should exist!", message);
+            if (i < nMsg)
+            {
+               assertMessageBody(i, message);
+               Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+            }
+            message.acknowledge();
          }
 
-         public void done()
+         final CountDownLatch latch = new CountDownLatch(1);
+         liveServer.getStorageManager().afterCompleteOperations(new IOAsyncTask()
          {
-            latch.countDown();
-         }
-      });
 
-      backupServer.stop();
+            public void onError(final int errorCode, final String errorMessage)
+            {
+            }
 
-      Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
+            public void done()
+            {
+               latch.countDown();
+            }
+         });
 
+         Assert.assertTrue(latch.await(20, TimeUnit.SECONDS));
+      }
+      finally
+      {
+         TestInterceptor.value.set(false);
+         if (!session.isClosed())
+            session.commit();
+      }
    }
 
    public void testExceptionSettingActionBefore() throws Exception
@@ -532,7 +569,6 @@
 
    }
 
-
    protected
             PagingManager
             createPageManager(final StorageManager storageManager,



More information about the hornetq-commits mailing list