[jboss-cvs] JBoss Messaging SVN: r5696 - in trunk/tests/src/org/jboss/messaging/tests: stress/failover and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 22 18:17:22 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-22 18:17:22 -0500 (Thu, 22 Jan 2009)
New Revision: 5696
Added:
trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
Moving PagingFailoverTest to stress testsuite
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-22 21:47:21 UTC (rev 5695)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-22 23:17:22 UTC (rev 5696)
@@ -1,560 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.integration.cluster.failover;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.client.impl.ClientSessionImpl;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.paging.PagingManager;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.remoting.RemotingConnection;
-import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
-import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
-import org.jboss.messaging.util.SimpleString;
-
-/**
- * A PagingFailoverTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- * Created Dec 8, 2008 10:53:16 AM
- *
- *
- */
-public class PagingFailoverTest extends FailoverTestBase
-{
-
- // Constants -----------------------------------------------------
- private static final Logger log = Logger.getLogger(PagingFailoverTest.class);
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testFailoverOnPaging() throws Exception
- {
- testPaging(true);
- }
-
- public void testReplicationOnPaging() throws Exception
- {
- testPaging(false);
- }
-
- private void testPaging(final boolean fail) throws Exception
- {
- setUpFileBased(100 * 1024);
-
- ClientSession session = null;
- try
- {
- ClientSessionFactory sf1 = createFailoverFactory();
-
- session = sf1.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- ClientProducer producer = session.createProducer(ADDRESS);
-
- final int numMessages = 50000;
-
- PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
- PagingStore storeLive = pmLive.getPageStore(ADDRESS);
-
- PagingManager pmBackup = backupService.getServer().getPostOffice().getPagingManager();
- PagingStore storeBackup = pmBackup.getPageStore(ADDRESS);
-
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = session.createClientMessage(true);
- ByteBuffer buffer = ByteBuffer.allocate(1000);
-
- buffer.putInt(i);
-
- buffer.rewind();
-
- message.setBody(new ByteBufferWrapper(buffer));
-
- producer.send(message);
-
- if (storeLive.isPaging())
- {
- assertTrue(storeBackup.isPaging());
- }
- }
-
- session.close();
- session = sf1.createSession(null, null, false, true, true, false, 0);
- session.start();
-
- final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
-
- assertEquals("GloblSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
-
- assertEquals("PageSizeLive", storeLive.getAddressSize(), pmLive.getGlobalSize());
-
- assertEquals("PageSizeBackup", storeBackup.getAddressSize(), pmBackup.getGlobalSize());
-
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- for (int i = 0; i < numMessages; i++)
- {
-
- if (fail && i == numMessages / 2)
- {
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
- }
-
- ClientMessage message = consumer.receive(10000);
-
-
- assertNotNull(message);
-
- message.acknowledge();
-
- message.getBody().rewind();
-
- assertEquals(i, message.getBody().getInt());
-
- }
-
- session.close();
- session = null;
-
- if (!fail)
- {
- assertEquals(0, pmLive.getGlobalSize());
- assertEquals(0, storeLive.getAddressSize());
- }
- assertEquals(0, pmBackup.getGlobalSize());
- assertEquals(0, storeBackup.getAddressSize());
-
- }
- finally
- {
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception ignored)
- {
- // eat it
- }
- }
- }
-
- }
-
- public void testMultithreadFailoverReplicationOnly() throws Throwable
- {
- setUpFileBased(100 * 1024, 20 * 1024);
-
- int numberOfProducedMessages = multiThreadProducer(false);
-
- System.out.println(numberOfProducedMessages + " messages produced");
-
- int numberOfConsumedMessages = multiThreadConsumer(false, false);
-
- assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
- }
-
- public void testMultithreadFailoverOnProducing() throws Throwable
- {
- setUpFileBased(100 * 1024, 20 * 1024);
-
- int numberOfProducedMessages = multiThreadProducer(true);
-
- System.out.println(numberOfProducedMessages + " messages produced");
-
- int numberOfConsumedMessages = multiThreadConsumer(true, false);
-
- assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
- }
-
- public void testMultithreadFailoverOnConsume() throws Throwable
- {
- setUpFileBased(100 * 1024, 20 * 1024);
-
- int numberOfProducedMessages = multiThreadProducer(false);
-
- System.out.println(numberOfProducedMessages + " messages produced");
-
- int numberOfConsumedMessages = multiThreadConsumer(false, true);
-
- assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
- }
-
- /**
- * @throws Exception
- * @throws InterruptedException
- * @throws Throwable
- */
- private int multiThreadConsumer(final boolean connectedOnBackup, final boolean fail) throws Exception,
- InterruptedException,
- Throwable
- {
- ClientSession session = null;
- try
- {
- final AtomicInteger numberOfMessages = new AtomicInteger(0);
-
- final int RECEIVE_TIMEOUT = 10000;
-
- final ClientSessionFactory factory;
- final PagingStore store;
-
- if (connectedOnBackup)
- {
- factory = createBackupFactory();
- store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
- }
- else
- {
- factory = createFailoverFactory();
- store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
- }
-
- session = factory.createSession(false, true, true, false);
-
- final int initialNumberOfPages = store.getNumberOfPages();
-
- System.out.println("It has initially " + initialNumberOfPages);
-
- final int THREAD_CONSUMERS = 20;
-
- final CountDownLatch startFlag = new CountDownLatch(1);
- final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
-
- class Consumer extends Thread
- {
- volatile Throwable e;
-
- ClientSession session;
-
- public Consumer() throws Exception
- {
- session = factory.createSession(null, null, false, true, true, false, 0);
- }
-
- @Override
- public void run()
- {
- boolean started = false;
-
- try
- {
-
- try
- {
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- alignSemaphore.countDown();
-
- started = true;
-
- startFlag.await();
-
- while (true)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
- if (msg == null)
- {
- break;
- }
-
- if (numberOfMessages.incrementAndGet() % 1000 == 0)
- {
- System.out.println(numberOfMessages + " messages read");
- }
-
- msg.acknowledge();
- }
-
- }
- finally
- {
- session.close();
- }
- }
- catch (Throwable e)
- {
- log.error(e.getMessage(), e);
- if (!started)
- {
- alignSemaphore.countDown();
- }
- this.e = e;
- }
- }
- }
-
- Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
-
- for (int i = 0; i < THREAD_CONSUMERS; i++)
- {
- consumers[i] = new Consumer();
- }
-
- for (int i = 0; i < THREAD_CONSUMERS; i++)
- {
- consumers[i].start();
- }
-
- alignSemaphore.await();
-
- startFlag.countDown();
-
- if (fail)
- {
- Thread.sleep(1000);
- while (store.getNumberOfPages() == initialNumberOfPages)
- {
- Thread.sleep(100);
- }
-
- System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
- ", failing now");
-
- fail(session);
- }
-
- for (Thread t : consumers)
- {
- t.join();
- }
-
- for (Consumer p : consumers)
- {
- if (p.e != null)
- {
- throw p.e;
- }
- }
-
- return numberOfMessages.intValue();
- }
- finally
- {
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception ignored)
- {
- }
- }
- }
- }
-
- /**
- * @throws Exception
- * @throws MessagingException
- * @throws InterruptedException
- * @throws Throwable
- */
- private int multiThreadProducer(final boolean failover) throws Exception,
- MessagingException,
- InterruptedException,
- Throwable
- {
-
- final AtomicInteger numberOfMessages = new AtomicInteger(0);
- final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
-
- final ClientSessionFactory factory = createFailoverFactory();
-
- ClientSession session = factory.createSession(false, true, true, false);
- try
- {
- session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- final int THREAD_PRODUCERS = 30;
-
- final CountDownLatch startFlag = new CountDownLatch(1);
- final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
- final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
-
- class Producer extends Thread
- {
- volatile Throwable e;
-
- @Override
- public void run()
- {
- boolean started = false;
- try
- {
- ClientSession session = factory.createSession(false, true, true);
- try
- {
- ClientProducer producer = session.createProducer(ADDRESS);
-
- alignSemaphore.countDown();
-
- started = true;
- startFlag.await();
-
- while (!store.isPaging())
- {
-
- ClientMessage msg = session.createClientMessage(true);
-
- producer.send(msg);
- numberOfMessages.incrementAndGet();
- }
-
- flagPaging.countDown();
-
- for (int i = 0; i < 1000; i++)
- {
-
- ClientMessage msg = session.createClientMessage(true);
-
- producer.send(msg);
- numberOfMessages.incrementAndGet();
-
- }
-
- }
- finally
- {
- session.close();
- }
- }
- catch (Throwable e)
- {
- log.error(e.getMessage(), e);
- if (!started)
- {
- alignSemaphore.countDown();
- }
- flagPaging.countDown();
- this.e = e;
- }
- }
- }
-
- Producer[] producers = new Producer[THREAD_PRODUCERS];
-
- for (int i = 0; i < THREAD_PRODUCERS; i++)
- {
- producers[i] = new Producer();
- producers[i].start();
- }
-
- alignSemaphore.await();
-
- // Start producing only when all the sessions are opened
- startFlag.countDown();
-
- if (failover)
- {
- flagPaging.await(); // for this test I want everybody on the paging part
-
- Thread.sleep(1500);
-
- fail(session);
-
- }
-
- for (Thread t : producers)
- {
- t.join();
- }
-
- for (Producer p : producers)
- {
- if (p.e != null)
- {
- throw p.e;
- }
- }
-
- return numberOfMessages.intValue();
-
- }
- finally
- {
- session.close();
- InVMConnector.resetFailures();
- }
-
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- @Override
- protected void tearDown() throws Exception
- {
- super.tearDown();
- }
-
- protected void fail(final ClientSession session) throws Exception
- {
- RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
-
- InVMConnector.numberOfFailures = 1;
- InVMConnector.failOnCreateConnection = true;
- System.out.println("Forcing a failure");
- conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
-
- }
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- }
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverTest.java (from rev 5690, trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverTest.java 2009-01-22 23:17:22 UTC (rev 5696)
@@ -0,0 +1,561 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.stress.failover;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.tests.integration.cluster.failover.FailoverTestBase;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 8, 2008 10:53:16 AM
+ *
+ *
+ */
+public class PagingFailoverTest extends FailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingFailoverTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testFailoverOnPaging() throws Exception
+ {
+ testPaging(true);
+ }
+
+ public void testReplicationOnPaging() throws Exception
+ {
+ testPaging(false);
+ }
+
+ private void testPaging(final boolean fail) throws Exception
+ {
+ setUpFileBased(100 * 1024);
+
+ ClientSession session = null;
+ try
+ {
+ ClientSessionFactory sf1 = createFailoverFactory();
+
+ session = sf1.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 50000;
+
+ PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
+ PagingStore storeLive = pmLive.getPageStore(ADDRESS);
+
+ PagingManager pmBackup = backupService.getServer().getPostOffice().getPagingManager();
+ PagingStore storeBackup = pmBackup.getPageStore(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+ ByteBuffer buffer = ByteBuffer.allocate(1000);
+
+ buffer.putInt(i);
+
+ buffer.rewind();
+
+ message.setBody(new ByteBufferWrapper(buffer));
+
+ producer.send(message);
+
+ if (storeLive.isPaging())
+ {
+ assertTrue(storeBackup.isPaging());
+ }
+ }
+
+ session.close();
+ session = sf1.createSession(null, null, false, true, true, false, 0);
+ session.start();
+
+ final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ assertEquals("GloblSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+
+ assertEquals("PageSizeLive", storeLive.getAddressSize(), pmLive.getGlobalSize());
+
+ assertEquals("PageSizeBackup", storeBackup.getAddressSize(), pmBackup.getGlobalSize());
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+
+ if (fail && i == numMessages / 2)
+ {
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+ }
+
+ ClientMessage message = consumer.receive(10000);
+
+
+ assertNotNull(message);
+
+ message.acknowledge();
+
+ message.getBody().rewind();
+
+ assertEquals(i, message.getBody().getInt());
+
+ }
+
+ session.close();
+ session = null;
+
+ if (!fail)
+ {
+ assertEquals(0, pmLive.getGlobalSize());
+ assertEquals(0, storeLive.getAddressSize());
+ }
+ assertEquals(0, pmBackup.getGlobalSize());
+ assertEquals(0, storeBackup.getAddressSize());
+
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ // eat it
+ }
+ }
+ }
+
+ }
+
+ public void testMultithreadFailoverReplicationOnly() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(false, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnProducing() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(true);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(true, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnConsume() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(false, true);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadConsumer(final boolean connectedOnBackup, final boolean fail) throws Exception,
+ InterruptedException,
+ Throwable
+ {
+ ClientSession session = null;
+ try
+ {
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ final int RECEIVE_TIMEOUT = 10000;
+
+ final ClientSessionFactory factory;
+ final PagingStore store;
+
+ if (connectedOnBackup)
+ {
+ factory = createBackupFactory();
+ store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+ else
+ {
+ factory = createFailoverFactory();
+ store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+
+ session = factory.createSession(false, true, true, false);
+
+ final int initialNumberOfPages = store.getNumberOfPages();
+
+ System.out.println("It has initially " + initialNumberOfPages);
+
+ final int THREAD_CONSUMERS = 20;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+ class Consumer extends Thread
+ {
+ volatile Throwable e;
+
+ ClientSession session;
+
+ public Consumer() throws Exception
+ {
+ session = factory.createSession(null, null, false, true, true, false, 0);
+ }
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+
+ try
+ {
+
+ try
+ {
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ alignSemaphore.countDown();
+
+ started = true;
+
+ startFlag.await();
+
+ while (true)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (numberOfMessages.incrementAndGet() % 1000 == 0)
+ {
+ System.out.println(numberOfMessages + " messages read");
+ }
+
+ msg.acknowledge();
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ this.e = e;
+ }
+ }
+ }
+
+ Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i] = new Consumer();
+ }
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ startFlag.countDown();
+
+ if (fail)
+ {
+ Thread.sleep(1000);
+ while (store.getNumberOfPages() == initialNumberOfPages)
+ {
+ Thread.sleep(100);
+ }
+
+ System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+ ", failing now");
+
+ fail(session);
+ }
+
+ for (Thread t : consumers)
+ {
+ t.join();
+ }
+
+ for (Consumer p : consumers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadProducer(final boolean failover) throws Exception,
+ MessagingException,
+ InterruptedException,
+ Throwable
+ {
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ final ClientSessionFactory factory = createFailoverFactory();
+
+ ClientSession session = factory.createSession(false, true, true, false);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ final int THREAD_PRODUCERS = 30;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+ final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+ class Producer extends Thread
+ {
+ volatile Throwable e;
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+ try
+ {
+ ClientSession session = factory.createSession(false, true, true);
+ try
+ {
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ alignSemaphore.countDown();
+
+ started = true;
+ startFlag.await();
+
+ while (!store.isPaging())
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+ }
+
+ flagPaging.countDown();
+
+ for (int i = 0; i < 1000; i++)
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ flagPaging.countDown();
+ this.e = e;
+ }
+ }
+ }
+
+ Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+ for (int i = 0; i < THREAD_PRODUCERS; i++)
+ {
+ producers[i] = new Producer();
+ producers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ // Start producing only when all the sessions are opened
+ startFlag.countDown();
+
+ if (failover)
+ {
+ flagPaging.await(); // for this test I want everybody on the paging part
+
+ Thread.sleep(1500);
+
+ fail(session);
+
+ }
+
+ for (Thread t : producers)
+ {
+ t.join();
+ }
+
+ for (Producer p : producers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+
+ }
+ finally
+ {
+ session.close();
+ InVMConnector.resetFailures();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ protected void fail(final ClientSession session) throws Exception
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ System.out.println("Forcing a failure");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Property changes on: trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverTest.java
___________________________________________________________________
Name: svn:mergeinfo
+
More information about the jboss-cvs-commits
mailing list