[jboss-cvs] JBoss Messaging SVN: r5982 - in trunk: tests/src/org/jboss/messaging/tests/integration/cluster/failover and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 3 21:48:50 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-03 21:48:49 -0500 (Tue, 03 Mar 2009)
New Revision: 5982
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
Log:
Fixing PageFailoverTest and PagingFailoverStressTest, and
PingTest (verifying if this would fix it on Hudson)
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -413,7 +413,7 @@
public void deliverReplicated(final long messageID) throws Exception
{
- MessageReference ref = messageQueue.removeFirstReference(messageID);
+ MessageReference ref = removeReferenceOnBackup(messageID);
//log.info("handling replicated delivery on backup " + messageID + " session " + session.getName());
@@ -462,7 +462,44 @@
// Public ---------------------------------------------------------------------------------------
// Private --------------------------------------------------------------------------------------
+
+ private MessageReference removeReferenceOnBackup(final long id) throws Exception
+ {
+ // most of the times, the remove will work ok, so we first try it without any locks
+ MessageReference ref = messageQueue.removeFirstReference(id);
+ if (ref == null)
+ {
+ PagingStore store = pagingManager.getPageStore(binding.getAddress());
+
+ while (true)
+ {
+ // Can't have the same store being depaged in more than one thread
+ synchronized (store)
+ {
+ // as soon as it gets the lock, it needs to verify if another thread couldn't find the reference
+ ref = messageQueue.removeFirstReference(id);
+ if (ref == null)
+ {
+ // force a depage
+ if (!store.readPage()) // This returns false if there are no pages
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+
+ return ref;
+
+ }
+
+
private void promptDelivery()
{
lock.lock();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -2424,7 +2424,7 @@
{
LargeServerMessage largeMessage = storageManager.createLargeMessage();
- MessagingBuffer headerBuffer = ChannelBuffers.dynamicBuffer(header);
+ MessagingBuffer headerBuffer = ChannelBuffers.wrappedBuffer(header);
largeMessage.decodeProperties(headerBuffer);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.core.server.Messaging;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.settings.impl.AddressSettings;
@@ -97,6 +98,8 @@
backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
backupConf.setJournalFileSize(100 * 1024);
+
+ backupConf.setJournalType(JournalType.NIO);
backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
backupConf.setPagingGlobalWatermarkSize(pageSize);
@@ -123,6 +126,8 @@
liveConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
liveConf.setPagingGlobalWatermarkSize(pageSize);
liveConf.setJournalFileSize(100 * 1024);
+
+ liveConf.setJournalType(JournalType.NIO);
liveConf.setSecurityEnabled(false);
liveConf.getAcceptorConfigurations()
@@ -145,7 +150,7 @@
backupService.getServer().getAddressSettingsRepository().addMatch("#", settings);
clearData(getTestDir() + "/live");
-
+
liveService.start();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -23,7 +23,8 @@
package org.jboss.messaging.tests.integration.cluster.failover;
-import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
@@ -63,6 +64,50 @@
// Public --------------------------------------------------------
+
+ public void testMultithreadFailoverReplicationOnly() throws Throwable
+ {
+ setUpFileBased(getMaxGlobal(), getPageSize());
+
+ int numberOfProducedMessages = multiThreadProducer(getNumberOfThreads(), false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), false, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnProducing() throws Throwable
+ {
+ setUpFileBased(getMaxGlobal(), getPageSize());
+
+ int numberOfProducedMessages = multiThreadProducer(getNumberOfThreads(), true);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), true, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnConsume() throws Throwable
+ {
+ setUpFileBased(getMaxGlobal(), getPageSize());
+
+ int numberOfProducedMessages = multiThreadProducer(getNumberOfThreads(), false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(getNumberOfThreads(), false, true);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+
public void testFailoverOnPaging() throws Exception
{
testPaging(true);
@@ -181,12 +226,27 @@
return 500;
}
+ protected int getNumberOfThreads()
+ {
+ return 5;
+ }
+
+ protected int getMaxGlobal()
+ {
+ return 1024;
+ }
+
+ protected int getPageSize()
+ {
+ return 512;
+ }
+
protected void fail(final ClientSession session) throws Exception
{
RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
- InVMConnector.numberOfFailures = 1;
- InVMConnector.failOnCreateConnection = true;
+// InVMConnector.numberOfFailures = 1;
+// InVMConnector.failOnCreateConnection = true;
System.out.println("Forcing a failure");
conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
@@ -194,7 +254,307 @@
// Private -------------------------------------------------------
+
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ protected int multiThreadConsumer(int numberOfThreads, final boolean connectedOnBackup, final boolean fail) throws Exception,
+ InterruptedException,
+ Throwable
+ {
+ ClientSession session = null;
+ try
+ {
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final int RECEIVE_TIMEOUT = 2000;
+
+ final ClientSessionFactory factory;
+ final PagingStore store;
+
+ if (connectedOnBackup)
+ {
+ factory = createBackupFactory();
+ store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+ else
+ {
+ factory = createFailoverFactory();
+ store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+
+ session = factory.createSession(false, true, true, false);
+
+ final int initialNumberOfPages = store.getNumberOfPages();
+
+ System.out.println("It has initially " + initialNumberOfPages);
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(numberOfThreads);
+
+ class Consumer extends Thread
+ {
+ volatile Throwable e;
+
+ ClientSession session;
+
+ public Consumer() throws Exception
+ {
+ session = factory.createSession(null, null, false, true, true, false, 0);
+ }
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+
+ try
+ {
+
+ try
+ {
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ alignSemaphore.countDown();
+
+ started = true;
+
+ startFlag.await();
+
+ while (true)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (numberOfMessages.incrementAndGet() % 1000 == 0)
+ {
+ System.out.println(numberOfMessages + " messages read");
+ }
+
+ msg.acknowledge();
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ // Using System.out, as it would appear on the test output
+ e.printStackTrace();
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ this.e = e;
+ }
+ }
+ }
+
+ Consumer[] consumers = new Consumer[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ consumers[i] = new Consumer();
+ }
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ consumers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ startFlag.countDown();
+
+ if (fail)
+ {
+ Thread.sleep(1000);
+ while (store.getNumberOfPages() == initialNumberOfPages)
+ {
+ Thread.sleep(100);
+ }
+
+ System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+ ", failing now");
+
+ fail(session);
+ }
+
+ for (Thread t : consumers)
+ {
+ t.join();
+ }
+
+ for (Consumer p : consumers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ protected int multiThreadProducer(final int numberOfThreads, final boolean failover) throws Exception,
+ MessagingException,
+ InterruptedException,
+ Throwable
+ {
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ final ClientSessionFactory factory = createFailoverFactory();
+
+ ClientSession session = factory.createSession(false, true, true, false);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(numberOfThreads);
+ final CountDownLatch flagPaging = new CountDownLatch(numberOfThreads);
+
+ class Producer extends Thread
+ {
+ volatile Throwable e;
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+ try
+ {
+ ClientSession session = factory.createSession(false, true, true);
+ try
+ {
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ alignSemaphore.countDown();
+
+ started = true;
+ startFlag.await();
+
+ while (!store.isPaging())
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+ }
+
+ flagPaging.countDown();
+
+ for (int i = 0; i < 100; i++)
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ // Using System.out, as it would appear on the test output
+ e.printStackTrace();
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ flagPaging.countDown();
+ this.e = e;
+ }
+ }
+ }
+
+ Producer[] producers = new Producer[numberOfThreads];
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ producers[i] = new Producer();
+ producers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ // Start producing only when all the sessions are opened
+ startFlag.countDown();
+
+ if (failover)
+ {
+ flagPaging.await(); // for this test I want everybody on the paging part
+
+ Thread.sleep(1500);
+
+ fail(session);
+
+ }
+
+ for (Thread t : producers)
+ {
+ t.join();
+ }
+
+ for (Producer p : producers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+
+ }
+ finally
+ {
+ session.close();
+ InVMConnector.resetFailures();
+ }
+
+ }
+
+
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -78,7 +78,7 @@
private static final long PING_INTERVAL = 500;
// Attributes ----------------------------------------------------
-
+
private MessagingService messagingService;
// Static --------------------------------------------------------
@@ -110,7 +110,7 @@
public boolean connectionFailed(MessagingException me)
{
this.me = me;
-
+
return true;
}
@@ -144,7 +144,7 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
@@ -219,14 +219,14 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
ClientSession session = csf.createSession(false, true, true);
-
+
assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
Listener clientListener = new Listener();
@@ -294,20 +294,20 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
-
+
Listener clientListener = new Listener();
ClientSession session = csf.createSession(false, true, true);
-
+
assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
session.addFailureListener(clientListener);
-
+
RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
// We need to get it to send one ping then stop
@@ -334,8 +334,21 @@
serverConn.addFailureListener(serverListener);
- Thread.sleep(PING_INTERVAL * 10);
+ for (int i = 0; i < 20; i++)
+ {
+ // a few tries to avoid a possible race caused by GCs or similar issues
+ if (messagingService.getServer().getRemotingService().getConnections().isEmpty())
+ {
+ // Sleep a bit more since it's async
+ // We are not sure about the order in which the listeners are called, so we need another sleep
+ Thread.sleep(PING_INTERVAL);
+ break;
+ }
+
+ Thread.sleep(PING_INTERVAL);
+ }
+
// The client listener should be called too since the server will close it from the server side which will result
// in the
// netty detecting closure on the client side and then calling failure listener
@@ -347,7 +360,8 @@
session.close();
}
-
+
+
/*
* Test the client triggering failure due to no pong received in time
*/
@@ -360,6 +374,7 @@
log.info("In interceptor, packet is " + packet.getType());
if (packet.getType() == PacketImpl.PING)
{
+ log.info("Ignoring Ping packet.. it will be dropped");
return false;
}
else
@@ -372,7 +387,7 @@
messagingService.getServer().getRemotingService().addInterceptor(noPongInterceptor);
TransportConfiguration transportConfig = new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory");
-
+
ClientSessionFactory csf = new ClientSessionFactoryImpl(transportConfig,
null,
DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
@@ -390,18 +405,18 @@
DEFAULT_AUTO_GROUP,
DEFAULT_MAX_CONNECTIONS,
DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
DEFAULT_RETRY_INTERVAL,
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
0,
0);
-
+
ClientSession session = csf.createSession(false, true, true);
-
+
assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
Listener clientListener = new Listener();
-
+
session.addFailureListener(clientListener);
RemotingConnection serverConn = null;
@@ -424,13 +439,22 @@
serverConn.addFailureListener(serverListener);
- Thread.sleep(PING_INTERVAL * 2);
+ for (int i = 0; i < 20; i++)
+ {
+ // a few tries to avoid a possible race caused by GCs or similar issues
+ if (messagingService.getServer().getRemotingService().getConnections().isEmpty())
+ {
+ // Sleep a bit more since it's async
+ // We are not sure about the order in which the listeners are called, so we need another sleep
+ Thread.sleep(PING_INTERVAL);
+ break;
+ }
+ Thread.sleep(PING_INTERVAL);
+ }
+
assertNotNull(clientListener.getException());
- // Sleep a bit more since it's async
- Thread.sleep(PING_INTERVAL);
-
// We don't receive an exception on the server in this case
assertNull(serverListener.getException());
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/failover/PagingFailoverStressTest.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -22,17 +22,6 @@
package org.jboss.messaging.tests.stress.failover;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.client.ClientConsumer;
-import org.jboss.messaging.core.client.ClientMessage;
-import org.jboss.messaging.core.client.ClientProducer;
-import org.jboss.messaging.core.client.ClientSession;
-import org.jboss.messaging.core.client.ClientSessionFactory;
-import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.paging.PagingStore;
-import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.tests.integration.cluster.failover.PagingFailoverTest;
import org.jboss.messaging.utils.SimpleString;
@@ -59,359 +48,30 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public void testMultithreadFailoverReplicationOnly() throws Throwable
- {
- setUpFileBased(10 * 1024, 5 * 1024);
- int numberOfProducedMessages = multiThreadProducer(false);
+ // Package protected ---------------------------------------------
- System.out.println(numberOfProducedMessages + " messages produced");
-
- int numberOfConsumedMessages = multiThreadConsumer(false, false);
-
- assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
- }
-
- public void testMultithreadFailoverOnProducing() throws Throwable
+ // Protected -----------------------------------------------------
+ protected int getNumberOfMessages()
{
- setUpFileBased(10 * 1024, 5 * 1024);
-
- int numberOfProducedMessages = multiThreadProducer(true);
-
- System.out.println(numberOfProducedMessages + " messages produced");
-
- int numberOfConsumedMessages = multiThreadConsumer(true, false);
-
- assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
+ return 5000;
}
-
- public void testMultithreadFailoverOnConsume() throws Throwable
+ protected int getNumberOfThreads()
{
- setUpFileBased(10 * 1024, 5 * 1024);
-
- int numberOfProducedMessages = multiThreadProducer(false);
-
- System.out.println(numberOfProducedMessages + " messages produced");
-
- int numberOfConsumedMessages = multiThreadConsumer(false, true);
-
- assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
-
+ return 10;
}
-
- /**
- * @throws Exception
- * @throws InterruptedException
- * @throws Throwable
- */
- private int multiThreadConsumer(final boolean connectedOnBackup, final boolean fail) throws Exception,
- InterruptedException,
- Throwable
+
+ protected int getMaxGlobal()
{
- ClientSession session = null;
- try
- {
- final AtomicInteger numberOfMessages = new AtomicInteger(0);
-
- final int RECEIVE_TIMEOUT = 10000;
-
- final ClientSessionFactory factory;
- final PagingStore store;
-
- if (connectedOnBackup)
- {
- factory = createBackupFactory();
- store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
- }
- else
- {
- factory = createFailoverFactory();
- store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
- }
-
- session = factory.createSession(false, true, true, false);
-
- final int initialNumberOfPages = store.getNumberOfPages();
-
- System.out.println("It has initially " + initialNumberOfPages);
-
- final int THREAD_CONSUMERS = 20;
-
- final CountDownLatch startFlag = new CountDownLatch(1);
- final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
-
- class Consumer extends Thread
- {
- volatile Throwable e;
-
- ClientSession session;
-
- public Consumer() throws Exception
- {
- session = factory.createSession(null, null, false, true, true, false, 0);
- }
-
- @Override
- public void run()
- {
- boolean started = false;
-
- try
- {
-
- try
- {
- ClientConsumer consumer = session.createConsumer(ADDRESS);
-
- session.start();
-
- alignSemaphore.countDown();
-
- started = true;
-
- startFlag.await();
-
- while (true)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
- if (msg == null)
- {
- break;
- }
-
- if (numberOfMessages.incrementAndGet() % 1000 == 0)
- {
- System.out.println(numberOfMessages + " messages read");
- }
-
- msg.acknowledge();
- }
-
- }
- finally
- {
- session.close();
- }
- }
- catch (Throwable e)
- {
- // Using System.out, as it would appear on the test output
- e.printStackTrace();
- if (!started)
- {
- alignSemaphore.countDown();
- }
- this.e = e;
- }
- }
- }
-
- Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
-
- for (int i = 0; i < THREAD_CONSUMERS; i++)
- {
- consumers[i] = new Consumer();
- }
-
- for (int i = 0; i < THREAD_CONSUMERS; i++)
- {
- consumers[i].start();
- }
-
- alignSemaphore.await();
-
- startFlag.countDown();
-
- if (fail)
- {
- Thread.sleep(1000);
- while (store.getNumberOfPages() == initialNumberOfPages)
- {
- Thread.sleep(100);
- }
-
- System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
- ", failing now");
-
- fail(session);
- }
-
- for (Thread t : consumers)
- {
- t.join();
- }
-
- for (Consumer p : consumers)
- {
- if (p.e != null)
- {
- throw p.e;
- }
- }
-
- return numberOfMessages.intValue();
- }
- finally
- {
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception ignored)
- {
- }
- }
- }
+ return 10 * 1024;
}
-
- /**
- * @throws Exception
- * @throws MessagingException
- * @throws InterruptedException
- * @throws Throwable
- */
- private int multiThreadProducer(final boolean failover) throws Exception,
- MessagingException,
- InterruptedException,
- Throwable
+
+ protected int getPageSize()
{
-
- final AtomicInteger numberOfMessages = new AtomicInteger(0);
- final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
-
- final ClientSessionFactory factory = createFailoverFactory();
-
- ClientSession session = factory.createSession(false, true, true, false);
- try
- {
- session.createQueue(ADDRESS, ADDRESS, null, true, false);
-
- final int THREAD_PRODUCERS = 30;
-
- final CountDownLatch startFlag = new CountDownLatch(1);
- final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
- final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
-
- class Producer extends Thread
- {
- volatile Throwable e;
-
- @Override
- public void run()
- {
- boolean started = false;
- try
- {
- ClientSession session = factory.createSession(false, true, true);
- try
- {
- ClientProducer producer = session.createProducer(ADDRESS);
-
- alignSemaphore.countDown();
-
- started = true;
- startFlag.await();
-
- while (!store.isPaging())
- {
-
- ClientMessage msg = session.createClientMessage(true);
-
- producer.send(msg);
- numberOfMessages.incrementAndGet();
- }
-
- flagPaging.countDown();
-
- for (int i = 0; i < 100; i++)
- {
-
- ClientMessage msg = session.createClientMessage(true);
-
- producer.send(msg);
- numberOfMessages.incrementAndGet();
-
- }
-
- }
- finally
- {
- session.close();
- }
- }
- catch (Throwable e)
- {
- // Using System.out, as it would appear on the test output
- e.printStackTrace();
- if (!started)
- {
- alignSemaphore.countDown();
- }
- flagPaging.countDown();
- this.e = e;
- }
- }
- }
-
- Producer[] producers = new Producer[THREAD_PRODUCERS];
-
- for (int i = 0; i < THREAD_PRODUCERS; i++)
- {
- producers[i] = new Producer();
- producers[i].start();
- }
-
- alignSemaphore.await();
-
- // Start producing only when all the sessions are opened
- startFlag.countDown();
-
- if (failover)
- {
- flagPaging.await(); // for this test I want everybody on the paging part
-
- Thread.sleep(1500);
-
- fail(session);
-
- }
-
- for (Thread t : producers)
- {
- t.join();
- }
-
- for (Producer p : producers)
- {
- if (p.e != null)
- {
- throw p.e;
- }
- }
-
- return numberOfMessages.intValue();
-
- }
- finally
- {
- session.close();
- InVMConnector.resetFailures();
- }
-
+ return 5 * 1024;
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
- protected int getNumberOfMessages()
- {
- return 5000;
- }
+
// Private -------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2009-03-03 21:39:33 UTC (rev 5981)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/paging/PageStressTest.java 2009-03-04 02:48:49 UTC (rev 5982)
@@ -21,6 +21,7 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.server.JournalType;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.tests.util.ServiceTestBase;
@@ -320,6 +321,8 @@
config.setJournalFileSize(10 * 1024 * 1024);
config.setJournalMinFiles(5);
+ config.setJournalType(JournalType.NIO);
+
return config;
}
More information about the jboss-cvs-commits
mailing list