[jboss-cvs] JBoss Messaging SVN: r6164 - trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 25 11:41:59 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-03-25 11:41:58 -0400 (Wed, 25 Mar 2009)
New Revision: 6164
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
Modified:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1558 - Adding test
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java 2009-03-25 15:41:58 UTC (rev 6164)
@@ -36,7 +36,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
-import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.tests.util.ServiceTestBase;
/**
* A MultiThreadFailoverSupport
@@ -48,7 +48,7 @@
*
*
*/
-public abstract class MultiThreadFailoverSupport extends UnitTestCase
+public abstract class MultiThreadFailoverSupport extends ServiceTestBase
{
// Constants -----------------------------------------------------
@@ -57,7 +57,7 @@
// Attributes ----------------------------------------------------
- protected Timer timer;
+ private Timer timer;
// Static --------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-03-25 15:41:58 UTC (rev 6164)
@@ -26,7 +26,6 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.Timer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -77,8 +76,6 @@
protected final Map<String, Object> backupParams = new HashMap<String, Object>();
- protected Timer timer;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -1299,8 +1296,6 @@
super.setUp();
log.info("************ Starting test " + getName());
-
- timer = new Timer();
}
@Override
@@ -1316,7 +1311,6 @@
{
backupService.stop();
}
- timer.cancel();
super.tearDown();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java 2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PageOrderingOnBackupTest.java 2009-03-25 15:41:58 UTC (rev 6164)
@@ -28,10 +28,12 @@
import java.util.concurrent.TimeUnit;
import org.jboss.messaging.core.buffers.ChannelBuffers;
+import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
import org.jboss.messaging.core.client.ClientSession;
import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
import org.jboss.messaging.core.paging.Page;
import org.jboss.messaging.core.paging.PagedMessage;
import org.jboss.messaging.core.paging.PagingManager;
@@ -63,23 +65,45 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
- public void testPageOrderingLiveAndBackup() throws Exception
+ public void testPageOrderingLiveAndBackupProducerOnly() throws Exception
{
+ internalTestPageOrderingLiveAndBackup(false);
+ }
+
+ public void testPageOrderingLiveAndBackupConsume() throws Exception
+ {
+ internalTestPageOrderingLiveAndBackup(true);
+ }
+
+ private void internalTestPageOrderingLiveAndBackup(boolean consumeMessages) throws Exception
+ {
final SimpleString threadIDKey = new SimpleString("THREAD_ID");
final SimpleString sequenceIDKey = new SimpleString("SEQUENCE_ID");
final SimpleString ADDRESS = new SimpleString("SOME_QUEUE");
final int NUMBER_OF_THREADS = 100;
+ final int NUMBER_OF_MESSAGES = 200;
+ final int NUMBER_OF_HANDLERS = consumeMessages ? NUMBER_OF_THREADS : 0;
+
setUpFailoverServers(true, 100 * 1024, 50 * 1024);
final ClientSessionFactory factory = createFailoverFactory();
ClientSession session = factory.createSession(false, true, true);
- session.createQueue(ADDRESS, ADDRESS, true);
+ for (int i = 0; i < NUMBER_OF_THREADS; i++)
+ {
+ session.createQueue(ADDRESS, ADDRESS.concat("-" + i), true);
+ }
session.close();
+ MyHandler handlers[] = new MyHandler[NUMBER_OF_HANDLERS];
+
+ for (int i = 0; i < handlers.length; i++)
+ {
+ handlers[i] = new MyHandler(factory, ADDRESS.concat("-" + i), NUMBER_OF_MESSAGES * 10);
+ }
+
final CountDownLatch flagAlign = new CountDownLatch(NUMBER_OF_THREADS);
final CountDownLatch flagStart = new CountDownLatch(1);
@@ -105,7 +129,7 @@
flagAlign.countDown();
flagStart.await();
- for (int i = 0; i < 200; i++)
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
ClientMessage msg = session.createClientMessage(true);
msg.setBody(ChannelBuffers.wrappedBuffer(new byte[512]));
@@ -152,12 +176,37 @@
}
}
+ Thread.sleep(5000);
+
+ for (MyHandler handler : handlers)
+ {
+ handler.close();
+ if (handler.failure != null)
+ {
+ throw new Exception("Failure on consumer", handler.failure);
+ }
+ }
+
PagingManager livePagingManager = liveService.getServer().getPostOffice().getPagingManager();
PagingManager backupPagingManager = backupService.getServer().getPostOffice().getPagingManager();
TestSupportPageStore livePagingStore = (TestSupportPageStore)livePagingManager.getPageStore(ADDRESS);
TestSupportPageStore backupPagingStore = (TestSupportPageStore)backupPagingManager.getPageStore(ADDRESS);
+ System.out.println("Pages: " + livePagingStore.getNumberOfPages() +
+ " on backup: " +
+ backupPagingStore.getNumberOfPages());
+
+
+ if (consumeMessages)
+ {
+ if (livePagingStore.getNumberOfPages() == backupPagingStore.getNumberOfPages() - 1)
+ {
+ // The live node may have one extra page in front of the backup
+ backupPagingStore.depage();
+ }
+ }
+
assertEquals(livePagingStore.getNumberOfPages(), backupPagingStore.getNumberOfPages());
Page livePage = null;
@@ -169,6 +218,7 @@
if (livePage == null)
{
+ assertNull(backupPagingStore.depage());
break;
}
@@ -193,7 +243,7 @@
{
PagedMessage backupMsg = backupIterator.next();
assertNotNull(backupMsg);
-
+
ServerMessage liveSrvMsg = liveMsg.getMessage(null);
ServerMessage backupSrvMsg = liveMsg.getMessage(null);
@@ -213,4 +263,69 @@
// Inner classes -------------------------------------------------
+ class MyHandler implements MessageHandler
+ {
+ final ClientSession session;
+
+ final ClientConsumer consumer;
+
+ volatile boolean started = true;
+
+ final int msgs;
+
+ volatile int receivedMsgs = 0;
+
+ final CountDownLatch latch;
+
+ Throwable failure;
+
+ MyHandler(ClientSessionFactory sf, SimpleString address, final int msgs) throws Exception
+ {
+ this.session = sf.createSession(null, null, false, true, true, false, 0);
+ this.consumer = session.createConsumer(address);
+ consumer.setMessageHandler(this);
+ this.session.start();
+ this.msgs = msgs;
+ latch = new CountDownLatch(msgs);
+ }
+
+ public synchronized void close() throws Exception
+ {
+ session.close();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.client.MessageHandler#onMessage(org.jboss.messaging.core.client.ClientMessage)
+ */
+ public synchronized void onMessage(ClientMessage message)
+ {
+ try
+ {
+ if (!started)
+ {
+ throw new IllegalStateException("Stopped Handler received message");
+ }
+
+ if (receivedMsgs++ == msgs)
+ {
+ System.out.println("done");
+ started = false;
+ session.stop();
+ }
+
+ message.acknowledge();
+
+ if (!started)
+ {
+ latch.countDown();
+ }
+
+ }
+ catch (Throwable e)
+ {
+ this.failure = e;
+ }
+ }
+
+ }
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverMultiThreadTest.java 2009-03-25 15:41:58 UTC (rev 6164)
@@ -0,0 +1,618 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.MessageHandler;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.settings.impl.AddressSettings;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
+import org.jboss.messaging.utils.SimpleString;
+
+/**
+ * A PagingFailoverMultiThreadTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PagingFailoverMultiThreadTest extends MultiThreadFailoverSupport
+{
+
+ // Constants -----------------------------------------------------
+ private static final int RECEIVE_TIMEOUT = 20000;
+
+
+ final int PAGE_SIZE = 512;
+
+ final int MAX_GLOBAL = 40 * PAGE_SIZE;
+
+ final boolean CREATE_AT_START = true;
+
+ private final int LATCH_WAIT = 50000;
+
+ private final int NUM_THREADS = 10;
+
+ private final int NUM_SESSIONS = 10;
+
+ private final Logger log = Logger.getLogger(this.getClass());
+
+ // Attributes ----------------------------------------------------
+
+ protected static final SimpleString ADDRESS_GLOBAL = new SimpleString("FailoverTestAddress");
+
+ protected MessagingService liveService;
+
+ protected MessagingService backupService;
+
+ protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testFoo()
+ {
+
+ }
+
+ // Currently disabled - https://jira.jboss.org/jira/browse/JBMESSAGING-1558
+ public void disabled_testB() throws Exception
+ {
+
+ runMultipleThreadsFailoverTest(new RunnableT()
+ {
+ @Override
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestB(sf, threadNum);
+ }
+ }, NUM_THREADS, 20, false, 1000);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setBody(final ClientMessage message) throws Exception
+ {
+ message.getBody().writeBytes(new byte[256]);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.tests.integration.cluster.failover.MultiThreadRandomFailoverTestBase#checkSize(org.jboss.messaging.core.client.ClientMessage)
+ */
+ protected boolean checkSize(final ClientMessage message)
+ {
+ return 256 == message.getBody().writerIndex();
+ }
+
+ protected SimpleString createAddressName(int threadNum)
+ {
+ return ADDRESS_GLOBAL.concat("_thread-" + threadNum);
+ }
+
+ protected SimpleString createSubName(int thread, int sequence)
+ {
+ return new SimpleString(thread + "sub" + sequence);
+ }
+
+ protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ SimpleString ADDRESS = createAddressName(threadNum);
+
+ long start = System.currentTimeMillis();
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int numMessages = 100;
+
+ final int numSessions = 1;
+
+ Set<MyInfo> infos = new HashSet<MyInfo>();
+
+ for (int i = 0; i < NUM_SESSIONS; i++)
+ {
+ SimpleString subName = createSubName(threadNum, i);
+
+ ClientSession sessConsume = sf.createSession(null, null, false, true, true, false, 0);
+
+ if (!CREATE_AT_START)
+ {
+ sessConsume.createQueue(ADDRESS, subName, null, true, false);
+ }
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ infos.add(new MyInfo(sessConsume, consumer));
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, threadNum);
+
+ for (MyInfo info : infos)
+ {
+ info.session.start();
+ }
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (MyInfo info : infos)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages, info.session, info.consumer);
+
+ handler.start();
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(LATCH_WAIT, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+
+ assertNull(handler.consumer.receive(250));
+ }
+
+ sessSend.close();
+
+ for (MyInfo info : infos)
+ {
+ info.session.close();
+ }
+
+ if (!CREATE_AT_START)
+ {
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+
+ }
+
+ protected void stop() throws Exception
+ {
+ backupService.stop();
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+
+ }
+
+ private void sendMessages(final ClientSession sessSend,
+ final ClientProducer producer,
+ final int numMessages,
+ final int threadNum) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("threadnum"), threadNum);
+ message.putIntProperty(new SimpleString("count"), i);
+ setBody(message);
+ producer.send(message);
+ }
+ }
+
+ private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages, final int threadNum) throws Exception
+ {
+ // We make sure the messages arrive in the order they were sent from a particular producer
+ Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+ if (consumerCounts == null)
+ {
+ consumerCounts = new HashMap<Integer, Integer>();
+ counts.put(consumer, consumerCounts);
+ }
+
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+ Integer c = consumerCounts.get(tn);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ throw new Exception("Invalid count, expected " + tn + ": " + c + " got " + cnt);
+ }
+
+ c++;
+
+ // Wrap
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ consumerCounts.put(tn, c);
+
+ msg.acknowledge();
+ }
+ }
+ }
+
+ /**
+ * @return
+ */
+ protected ClientSessionFactoryInternal createSessionFactory()
+ {
+ final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams),
+ 0,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_INITIAL_CONNECT_ATTEMPTS,
+ ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS);
+
+ sf.setSendWindowSize(32 * 1024);
+ return sf;
+ }
+
+ @Override
+ protected void start() throws Exception
+ {
+ setUpFailoverServers(true, MAX_GLOBAL, PAGE_SIZE);
+
+ if (CREATE_AT_START)
+ {
+ // TODO: Remove this part here
+ ClientSessionFactory sf = createSessionFactory();
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ for (int threadNum = 0; threadNum < NUM_THREADS; threadNum++)
+ {
+ SimpleString ADDRESS = createAddressName(threadNum);
+
+ for (int i = 0; i < NUM_SESSIONS; i++)
+ {
+ SimpleString subName = createSubName(threadNum, i);
+ session.createQueue(ADDRESS, subName, null, true, false);
+ }
+ }
+ session.close();
+
+ }
+
+ }
+
+ protected void setUpFailoverServers(boolean fileBased, final long maxGlobalSize, final int pageSize) throws Exception
+ {
+ deleteDirectory(new File(getTestDir()));
+
+ Configuration backupConf = new ConfigurationImpl();
+ backupConf.setSecurityEnabled(false);
+ backupConf.setClustered(true);
+ backupConf.setBackup(true);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+
+ if (fileBased)
+ {
+ clearData(getTestDir() + "/backup");
+
+ backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+ backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+ backupConf.setJournalType(JournalType.NIO);
+
+ backupConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+ backupConf.setPagingGlobalWatermarkSize(pageSize);
+ backupService = Messaging.newMessagingService(backupConf);
+ }
+ else
+ {
+ backupService = Messaging.newNullStorageMessagingService(backupConf);
+ }
+
+ backupService.start();
+
+ Thread.sleep(20);
+
+ Configuration liveConf = new ConfigurationImpl();
+ liveConf.setSecurityEnabled(false);
+ liveConf.setClustered(true);
+
+ TransportConfiguration liveTC = new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName());
+ liveConf.getAcceptorConfigurations().add(liveTC);
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+
+ if (fileBased)
+ {
+ liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+ liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+ liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+ liveConf.setPagingMaxGlobalSizeBytes(maxGlobalSize);
+ liveConf.setPagingGlobalWatermarkSize(pageSize);
+ liveConf.setJournalFileSize(100 * 1024);
+
+ liveConf.setJournalType(JournalType.NIO);
+ }
+
+ if (fileBased)
+ {
+ liveService = Messaging.newMessagingService(liveConf);
+ }
+ else
+ {
+ liveService = Messaging.newNullStorageMessagingService(liveConf);
+ }
+
+ AddressSettings settings = new AddressSettings();
+ settings.setPageSizeBytes(pageSize);
+
+ liveService.getServer().getAddressSettingsRepository().addMatch("#", settings);
+ backupService.getServer().getAddressSettingsRepository().addMatch("#", settings);
+
+ clearData(getTestDir() + "/live");
+
+ liveService.start();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+ private class MyInfo
+ {
+ final ClientSession session;
+
+ final ClientConsumer consumer;
+
+ public MyInfo(final ClientSession session, final ClientConsumer consumer)
+ {
+ this.session = session;
+ this.consumer = consumer;
+ }
+ }
+
+ private class MyHandler implements MessageHandler
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ private final Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+ volatile String failure;
+
+ final int tn;
+
+ final int numMessages;
+
+ final ClientSession session;
+
+ final ClientConsumer consumer;
+
+ volatile Xid xid;
+
+ volatile boolean done;
+
+ volatile boolean started = false;
+
+ volatile boolean commit = false;
+
+ synchronized void start() throws Exception
+ {
+ counts.clear();
+
+ done = false;
+
+ failure = null;
+
+ latch = new CountDownLatch(1);
+
+ started = true;
+ consumer.setMessageHandler(this);
+ session.start();
+ }
+
+ synchronized void stop() throws Exception
+ {
+ session.stop();
+ // FIXME: Remove this line when https://jira.jboss.org/jira/browse/JBMESSAGING-1549 is done
+ consumer.setMessageHandler(null);
+ started = false;
+ }
+
+ synchronized void close() throws Exception
+ {
+ stop();
+ session.close();
+ }
+
+ MyHandler(final int threadNum, final int numMessages, final ClientSession session, final ClientConsumer consumer) throws Exception
+ {
+ tn = threadNum;
+
+ this.numMessages = numMessages;
+
+ this.session = session;
+
+ this.consumer = consumer;
+
+ }
+
+ public void setCommitOnComplete(boolean commit)
+ {
+ this.commit = commit;
+ }
+
+ public synchronized void onMessage(final ClientMessage message)
+ {
+
+ if (!started)
+ {
+ this.failure = "Received message with session stopped (thread = " + tn + ")";
+ log.error(failure);
+ return;
+ }
+
+ // log.info("*** handler got message");
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException me)
+ {
+ log.error("Failed to process", me);
+ }
+
+ if (done)
+ {
+ return;
+ }
+
+ int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+ Integer c = counts.get(threadNum);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
+ if (tn == threadNum && cnt != c.intValue())
+ {
+ failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+ log.error(failure);
+
+ latch.countDown();
+ }
+
+ if (!checkSize(message))
+ {
+ failure = "Invalid size on message";
+ log.error(failure);
+ latch.countDown();
+ }
+
+ if (tn == threadNum && c == numMessages - 1)
+ {
+ done = true;
+ try
+ {
+ this.stop();
+ }
+ catch (Exception e)
+ {
+ this.failure = e.getMessage();
+ e.printStackTrace();
+ }
+ latch.countDown();
+ }
+
+ c++;
+ // Wrap around at numMessages
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ counts.put(threadNum, c);
+
+ }
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-03-25 10:41:11 UTC (rev 6163)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-03-25 15:41:58 UTC (rev 6164)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.tests.util.RandomUtil;
import org.jboss.messaging.utils.SimpleString;
/**
@@ -375,7 +376,8 @@
if (fail)
{
- Thread.sleep(1000);
+ // Fail after some time
+ Thread.sleep((long)(1000 * RandomUtil.randomDouble()));
while (store.getNumberOfPages() == initialNumberOfPages)
{
Thread.sleep(100);
More information about the jboss-cvs-commits
mailing list