[jboss-cvs] JBoss Messaging SVN: r5648 - branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 16 00:34:40 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-16 00:34:40 -0500 (Fri, 16 Jan 2009)
New Revision: 5648
Modified:
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
Adding failover test on Page
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-15 15:29:32 UTC (rev 5647)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-16 05:34:40 UTC (rev 5648)
@@ -23,6 +23,8 @@
package org.jboss.messaging.tests.integration.cluster.failover;
import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
@@ -31,10 +33,13 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.util.SimpleString;
/**
@@ -50,6 +55,7 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingFailoverTest.class);
// Attributes ----------------------------------------------------
@@ -60,21 +66,21 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
-
+
public void testFailoverOnPaging() throws Exception
{
testPaging(true);
}
-
-
+
public void testReplicationOnPaging() throws Exception
{
testPaging(false);
}
-
+
private void testPaging(boolean fail) throws Exception
{
+ setUpFileBased(100 * 1024);
+
ClientSession session = null;
try
{
@@ -156,7 +162,7 @@
assertEquals(i, message.getBody().getInt());
}
-
+
session.close();
session = null;
@@ -186,6 +192,354 @@
}
+ public void testMultithreadFailoverReplicationOnly() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(false, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnProducing() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(true);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(true, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnConsume() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(false, true);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadConsumer(boolean connectedOnBackup, boolean fail) throws Exception,
+ InterruptedException,
+ Throwable
+ {
+ ClientSession session = null;
+ try
+ {
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ final int RECEIVE_TIMEOUT = 10000;
+
+ final ClientSessionFactory factory;
+ final PagingStore store;
+
+ if (connectedOnBackup)
+ {
+ factory = createBackupFactory();
+ store = this.backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+ else
+ {
+ factory = createFailoverFactory();
+ store = this.liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+
+ session = factory.createSession(false, true, true, false);
+
+ final int initialNumberOfPages = store.getNumberOfPages();
+
+ System.out.println("It has initially " + initialNumberOfPages);
+
+ final int THREAD_CONSUMERS = 20;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+ class Consumer extends Thread
+ {
+ volatile Throwable e;
+
+ ClientSession session;
+
+ public Consumer() throws Exception
+ {
+ session = factory.createSession(false, true, true);
+ }
+
+ public void run()
+ {
+ boolean started = false;
+
+ try
+ {
+
+ try
+ {
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ alignSemaphore.countDown();
+
+ started = true;
+
+ startFlag.await();
+
+ while (true)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (numberOfMessages.incrementAndGet() % 1000 == 0)
+ {
+ System.out.println(numberOfMessages + " messages read");
+ }
+
+ msg.acknowledge();
+
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ this.e = e;
+ }
+ }
+ }
+
+ Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i] = new Consumer();
+ }
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ startFlag.countDown();
+
+ if (fail)
+ {
+ Thread.sleep(2000);
+ while (store.getNumberOfPages() == initialNumberOfPages)
+ {
+ Thread.sleep(500);
+ }
+
+ System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+ ", failing now");
+
+ fail(session);
+ }
+
+ for (Thread t : consumers)
+ {
+ t.join();
+ }
+
+ for (Consumer p : consumers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadProducer(boolean failover) throws Exception,
+ MessagingException,
+ InterruptedException,
+ Throwable
+ {
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final PagingStore store = this.liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ final ClientSessionFactory factory = createFailoverFactory();
+
+ ClientSession session = factory.createSession(false, true, true, false);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ // Block on everything ,just because we want to stop sending as soon as the destination gets in page-mode
+ factory.setBlockOnAcknowledge(true);
+ factory.setBlockOnNonPersistentSend(true);
+ factory.setBlockOnPersistentSend(true);
+
+ final int THREAD_PRODUCERS = 20;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+ final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+ class Producer extends Thread
+ {
+ volatile Throwable e;
+
+ public void run()
+ {
+ boolean started = false;
+ try
+ {
+ ClientSession session = factory.createSession(false, true, true);
+ try
+ {
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ alignSemaphore.countDown();
+
+ started = true;
+ startFlag.await();
+
+ while (!store.isPaging())
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+ }
+
+ flagPaging.countDown();
+
+ for (int i = 0; i < 10000; i++)
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ flagPaging.countDown();
+ this.e = e;
+ }
+ }
+ }
+
+ Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+ for (int i = 0; i < THREAD_PRODUCERS; i++)
+ {
+ producers[i] = new Producer();
+ producers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ // Start producing only when all the sessions are opened
+ startFlag.countDown();
+
+ if (failover)
+ {
+ flagPaging.await(); // for this test I want everybody on the paging part
+
+ Thread.sleep(1500);
+
+ fail(session);
+
+ }
+
+ for (Thread t : producers)
+ {
+ t.join();
+ }
+
+ for (Producer p : producers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+
+ }
+ finally
+ {
+ session.close();
+ InVMConnector.resetFailures();
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -195,11 +549,20 @@
super.tearDown();
}
+ protected void fail(final ClientSession session) throws Exception
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ System.out.println("Forcing a failure");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+ }
+
protected void setUp() throws Exception
{
super.setUp();
- setUpFileBased(100 * 1024);
-
}
// Private -------------------------------------------------------
More information about the jboss-cvs-commits
mailing list