[hornetq-commits] JBoss hornetq SVN: r10993 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Jul 18 12:32:02 EDT 2011
Author: borges
Date: 2011-07-18 12:32:01 -0400 (Mon, 18 Jul 2011)
New Revision: 10993
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Rewrite testSendPacketsWithFailure()
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-18 16:31:27 UTC (rev 10992)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-07-18 16:32:01 UTC (rev 10993)
@@ -34,6 +34,10 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
@@ -93,6 +97,7 @@
private ServerLocator locator;
private ReplicationManager manager;
+ private static final SimpleString ADDRESS = new SimpleString("foobar123");
// Static --------------------------------------------------------
@@ -102,7 +107,6 @@
private void setupServer(boolean backup, boolean netty, String... interceptors) throws Exception
{
- assert backup; // XXX
Configuration backupConfig = createDefaultConfig(netty);
Configuration liveConfig = createDefaultConfig(netty);
@@ -129,10 +133,23 @@
backupServer = new HornetQServerImpl(backupConfig);
locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
backupServer.start();
- Thread.sleep(200); // XXX improve this
waitForComponent(backupServer);
+ int count = 0;
+ waitForReplication(count);
}
+ private void waitForReplication(int count) throws InterruptedException
+ {
+ if (liveServer == null)
+ return;
+
+ while (liveServer.getReplicationManager() == null && count < 10)
+ {
+ Thread.sleep(50);
+ count++;
+ }
+ }
+
private static void waitForComponent(HornetQComponent component) throws Exception
{
waitForComponent(component, 3);
@@ -204,8 +221,8 @@
blockOnReplication(storage, manager);
PagingManager pagingManager =
- createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(), backupServer.getExecutorFactory(),
- backupServer.getAddressSettingsRepository());
+ createPageManager(backupServer.getStorageManager(), backupServer.getConfiguration(),
+ backupServer.getExecutorFactory(), backupServer.getAddressSettingsRepository());
PagingStore store = pagingManager.getPageStore(dummy);
store.start();
@@ -243,39 +260,59 @@
public void testSendPacketsWithFailure() throws Exception
{
-
+ int nMsg = 100;
setupServer(true, false, TestInterceptor.class.getName());
- StorageManager storage = getStorage();
manager = liveServer.getReplicationManager();
+ waitForComponent(manager);
+ final ClientSession session = locator.createSessionFactory().createSession();
+ session.createQueue(ADDRESS, ADDRESS, null, true);
- Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
+ final ClientProducer producer = session.createProducer(ADDRESS);
+ sendMessages(session, producer, nMsg);
+ // Now we start intercepting the communication with the backup
TestInterceptor.value.set(false);
+ sendMessages(session, producer, nMsg);
- for (int i = 0; i < 500; i++)
+ session.start();
+ try
{
- replicatedJournal.appendAddRecord(i, (byte)1, new FakeData(), false);
- }
-
- final CountDownLatch latch = new CountDownLatch(1);
- storage.afterCompleteOperations(new IOAsyncTask()
- {
-
- public void onError(final int errorCode, final String errorMessage)
+ final ClientConsumer consumer = session.createConsumer(ADDRESS);
+ for (int i = 0; i < 2 * nMsg; i++)
{
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull("Message should exist!", message);
+ if (i < nMsg)
+ {
+ assertMessageBody(i, message);
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ }
+ message.acknowledge();
}
- public void done()
+ final CountDownLatch latch = new CountDownLatch(1);
+ liveServer.getStorageManager().afterCompleteOperations(new IOAsyncTask()
{
- latch.countDown();
- }
- });
- backupServer.stop();
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ }
- Assert.assertTrue(latch.await(50, TimeUnit.SECONDS));
+ public void done()
+ {
+ latch.countDown();
+ }
+ });
+ Assert.assertTrue(latch.await(20, TimeUnit.SECONDS));
+ }
+ finally
+ {
+ TestInterceptor.value.set(false);
+ if (!session.isClosed())
+ session.commit();
+ }
}
public void testExceptionSettingActionBefore() throws Exception
@@ -532,7 +569,6 @@
}
-
protected
PagingManager
createPageManager(final StorageManager storageManager,
More information about the hornetq-commits
mailing list