Author: borges
Date: 2012-02-03 11:05:31 -0500 (Fri, 03 Feb 2012)
New Revision: 12084
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Log:
HORNETQ-720 Add test for sync start while the live is receiving largeMessage data.
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-02-03
13:28:27 UTC (rev 12083)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-02-03
16:05:31 UTC (rev 12084)
@@ -3,10 +3,17 @@
import java.io.File;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.tests.util.UnitTestCase;
public class BackupSyncLargeMessageTest extends BackupSyncJournalTest
{
@@ -41,6 +48,7 @@
createProducerSendSomeMessages();
startBackupFinishSyncing();
receiveMsgsInRange(0, n_msgs / 2);
+ backupServer.stop();
assertEquals("we really ought to delete these after delivery", n_msgs /
2, getAllMessageFileIds(dir).size());
}
@@ -61,6 +69,54 @@
assertEquals("we really ought to delete these after delivery", n_msgs /
2, getAllMessageFileIds(dir).size());
}
+ /**
+ * LargeMessages are passed from the client to the server in chunks. Here we test the
backup
+ * starting the data synchronization with the live in the middle of a multiple chunks
large
+ * message upload from the client to the live server.
+ * @throws Exception
+ */
+ public void testBackupStartsWhenLiveIsReceivingLargeMessage() throws Exception
+ {
+ final ClientSession session = addClientSession(sessionFactory.createSession(true,
true));
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+ final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ final ClientMessage message = session.createMessage(true);
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1000 *
MIN_LARGE_MESSAGE));
+
+ final AtomicBoolean caughtException = new AtomicBoolean(false);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch latch2 = new CountDownLatch(1);
+
+ Runnable r = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ latch.countDown();
+ producer.send(message);
+ session.commit();
+ }
+ catch (HornetQException e)
+ {
+ e.printStackTrace();
+ caughtException.set(true);
+ }
+ finally
+ {
+ latch2.countDown();
+ }
+ }
+ };
+ Executors.defaultThreadFactory().newThread(r).start();
+ latch.await();
+ startBackupFinishSyncing();
+ latch2.await();
+ crash(session);
+ assertFalse("no exceptions while sending message",
caughtException.get());
+ }
+
private Set<Long> getAllMessageFileIds(File dir)
{
Set<Long> idsOnBkp = new HashSet<Long>();