[jboss-cvs] JBoss Messaging SVN: r5652 - branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 16 10:50:10 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-16 10:50:08 -0500 (Fri, 16 Jan 2009)
New Revision: 5652
Modified:
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
Log:
Just saving a backup before I mess up a lot the test for debugging
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-16 06:04:54 UTC (rev 5651)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/LargeMessageFailoverTest.java 2009-01-16 15:50:08 UTC (rev 5652)
@@ -24,6 +24,8 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientFileMessage;
@@ -33,8 +35,12 @@
import org.jboss.messaging.core.client.ClientSessionFactory;
import org.jboss.messaging.core.client.impl.ClientSessionImpl;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.util.SimpleString;
/**
@@ -51,6 +57,8 @@
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(LargeMessageFailoverTest.class);
+
// Attributes ----------------------------------------------------
private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
@@ -190,7 +198,321 @@
}.start();
}
+
/**
+ * @throws Exception
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadConsume(final boolean connectedOnBackup, final boolean fail) throws Exception,
+ InterruptedException,
+ Throwable
+ {
+ ClientSession session = null;
+ try
+ {
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ final int RECEIVE_TIMEOUT = 10000;
+
+ final ClientSessionFactory factory;
+ final PagingStore store;
+
+ if (connectedOnBackup)
+ {
+ factory = createBackupFactory();
+ store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+ else
+ {
+ factory = createFailoverFactory();
+ store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+
+ session = factory.createSession(false, true, true, false);
+
+ final int initialNumberOfPages = store.getNumberOfPages();
+
+ System.out.println("It has initially " + initialNumberOfPages);
+
+ final int THREAD_CONSUMERS = 20;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+ class Consumer extends Thread
+ {
+ volatile Throwable e;
+
+ ClientSession session;
+
+ public Consumer() throws Exception
+ {
+ session = factory.createSession(null, null, false, true, true, false, 0);
+ }
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+
+ try
+ {
+
+ try
+ {
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ alignSemaphore.countDown();
+
+ started = true;
+
+ startFlag.await();
+
+ while (true)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (numberOfMessages.incrementAndGet() % 1000 == 0)
+ {
+ System.out.println(numberOfMessages + " messages read");
+ }
+
+ msg.acknowledge();
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ this.e = e;
+ }
+ }
+ }
+
+ Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i] = new Consumer();
+ }
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ startFlag.countDown();
+
+ if (fail)
+ {
+ Thread.sleep(1000);
+ while (store.getNumberOfPages() == initialNumberOfPages)
+ {
+ Thread.sleep(100);
+ }
+
+ System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+ ", failing now");
+
+ fail(session);
+ }
+
+ for (Thread t : consumers)
+ {
+ t.join();
+ }
+
+ for (Consumer p : consumers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadProducer(final boolean failover) throws Exception,
+ MessagingException,
+ InterruptedException,
+ Throwable
+ {
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ final ClientSessionFactory factory = createFailoverFactory();
+
+ ClientSession session = factory.createSession(false, true, true, false);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ final int THREAD_PRODUCERS = 30;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+ final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+ class Producer extends Thread
+ {
+ volatile Throwable e;
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+ try
+ {
+ ClientSession session = factory.createSession(false, true, true);
+ try
+ {
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ alignSemaphore.countDown();
+
+ started = true;
+ startFlag.await();
+
+ while (!store.isPaging())
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+ }
+
+ flagPaging.countDown();
+
+ for (int i = 0; i < 1000; i++)
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ flagPaging.countDown();
+ this.e = e;
+ }
+ }
+ }
+
+ Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+ for (int i = 0; i < THREAD_PRODUCERS; i++)
+ {
+ producers[i] = new Producer();
+ producers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ // Start producing only when all the sessions are opened
+ startFlag.countDown();
+
+ if (failover)
+ {
+ flagPaging.await(); // for this test I want everybody on the paging part
+
+ Thread.sleep(1500);
+
+ fail(session);
+
+ }
+
+ for (Thread t : producers)
+ {
+ t.join();
+ }
+
+ for (Producer p : producers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+
+ }
+ finally
+ {
+ session.close();
+ InVMConnector.resetFailures();
+ }
+
+ }
+
+ private void fail(final ClientSession session) throws Exception
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ System.out.println("Forcing a failure");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+ }
+
+
+ /**
* @param factory
* @param placeToFail
* @param numberOfMessages
Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java 2009-01-16 06:04:54 UTC (rev 5651)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java 2009-01-16 15:50:08 UTC (rev 5652)
@@ -12,6 +12,8 @@
package org.jboss.messaging.tests.integration.cluster.failover;
+import java.io.File;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -23,8 +25,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -39,13 +39,17 @@
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
/**
@@ -53,8 +57,13 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*/
-public class MultiThreadRandomFailoverTest extends TestCase
+public class MultiThreadRandomFailoverTest extends UnitTestCase
{
+ /**
+ *
+ */
+ private static final int DEFAULT_MESSAGE_SIZE = 1024;
+
private static final Logger log = Logger.getLogger(MultiThreadRandomFailoverTest.class);
// Constants -----------------------------------------------------
@@ -88,7 +97,7 @@
{
doTestA(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testB() throws Exception
@@ -99,7 +108,7 @@
{
doTestB(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testC() throws Exception
@@ -110,7 +119,7 @@
{
doTestC(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testD() throws Exception
@@ -121,7 +130,7 @@
{
doTestD(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testE() throws Exception
@@ -132,7 +141,7 @@
{
doTestE(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testF() throws Exception
@@ -143,7 +152,7 @@
{
doTestF(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testG() throws Exception
@@ -154,7 +163,7 @@
{
doTestG(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testH() throws Exception
@@ -165,7 +174,7 @@
{
doTestH(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testI() throws Exception
@@ -176,7 +185,7 @@
{
doTestI(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testJ() throws Exception
@@ -187,7 +196,7 @@
{
doTestJ(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testK() throws Exception
@@ -198,7 +207,7 @@
{
doTestK(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
public void testL() throws Exception
@@ -209,7 +218,7 @@
{
doTestL(sf);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
// public void testM() throws Exception
@@ -231,9 +240,20 @@
{
doTestN(sf, threadNum);
}
- }, NUM_THREADS);
+ }, NUM_THREADS, false);
}
+ public void testLargeMessage() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestLargeMessage(sf, threadNum);
+ }
+ }, NUM_THREADS, true);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -272,7 +292,7 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
Set<MyHandler> handlers = new HashSet<MyHandler>();
@@ -354,7 +374,7 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
for (ClientSession session : sessions)
{
@@ -445,11 +465,11 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.rollback();
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.commit();
@@ -550,11 +570,11 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.rollback();
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.commit();
@@ -692,7 +712,7 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
consumeMessages(consumers, numMessages, threadNum);
@@ -748,7 +768,7 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
for (ClientSession session : sessions)
{
@@ -811,11 +831,11 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.rollback();
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.commit();
@@ -885,11 +905,11 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.rollback();
- sendMessages(sessSend, producer, numMessages, threadNum);
+ sendMessages(sessSend, producer, numMessages, DEFAULT_MESSAGE_SIZE, threadNum);
sessSend.commit();
@@ -1202,6 +1222,96 @@
sessCreate.close();
}
+ protected void doTestLargeMessage(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ long start = System.currentTimeMillis();
+
+ sf.setMinLargeMessageSize(10 * 1024);
+
+ sf.setSendWindowSize(1024*1024);
+
+ ClientSession s = sf.createSession(false, false, false);
+
+ final int messageSize = 40 * 1024;
+
+ final int numMessages = 100;
+
+ final int numSessions = 10;
+
+ Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
+ Set<ClientSession> sessions = new HashSet<ClientSession>();
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ ClientSession sessConsume = sf.createSession(false, true, true);
+
+ sessConsume.start();
+
+ sessConsume.createQueue(ADDRESS, subName, null, false, false);
+
+ ClientConsumer consumer = sessConsume.createConsumer(subName);
+
+ consumers.add(consumer);
+
+ sessions.add(sessConsume);
+ }
+
+ ClientSession sessSend = sf.createSession(false, true, true);
+
+ ClientProducer producer = sessSend.createProducer(ADDRESS);
+
+ sendMessages(sessSend, producer, numMessages, messageSize, threadNum);
+
+ Set<MyHandler> handlers = new HashSet<MyHandler>();
+
+ for (ClientConsumer consumer : consumers)
+ {
+ MyHandler handler = new MyHandler(threadNum, numMessages, messageSize);
+
+ consumer.setMessageHandler(handler);
+
+ handlers.add(handler);
+ }
+
+ for (MyHandler handler : handlers)
+ {
+ boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
+
+ if (!ok)
+ {
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
+ }
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
+ }
+
+ sessSend.close();
+ for (ClientSession session : sessions)
+ {
+ session.close();
+ }
+
+ for (int i = 0; i < numSessions; i++)
+ {
+ SimpleString subName = new SimpleString(threadNum + "sub" + i);
+
+ s.deleteQueue(subName);
+ }
+
+ s.close();
+
+ long end = System.currentTimeMillis();
+
+ log.info("duration " + (end - start));
+ }
+
protected int getNumIterations()
{
return 20;
@@ -1235,14 +1345,21 @@
// Private -------------------------------------------------------
- private void runTestMultipleThreads(final RunnableT runnable, final int numThreads) throws Exception
+ private void runTestMultipleThreads(final RunnableT runnable, final int numThreads, final boolean fileBased) throws Exception
{
final int numIts = getNumIterations();
for (int its = 0; its < numIts; its++)
{
log.info("************ ITERATION: " + its);
- start();
+ if (fileBased)
+ {
+ startFileBased();
+ }
+ else
+ {
+ start();
+ }
final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
@@ -1333,6 +1450,56 @@
return failer;
}
+ private void startFileBased() throws Exception
+ {
+
+ deleteDirectory(new File(getTestDir()));
+
+ Configuration backupConf = new ConfigurationImpl();
+
+ backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+ backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+ backupConf.setBackup(true);
+
+ backupService = MessagingServiceImpl.newMessagingService(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+
+ liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+ liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+ liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+ liveConf.setJournalFileSize(100 * 1024);
+
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+ liveService.start();
+
+ }
+
private void start() throws Exception
{
Configuration backupConf = new ConfigurationImpl();
@@ -1380,18 +1547,20 @@
private void sendMessages(final ClientSession sessSend,
final ClientProducer producer,
final int numMessages,
+ final int sizeOfMessage,
final int threadNum) throws Exception
{
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ ClientMessage message = sessSend.createClientMessage(JBossBytesMessage.TYPE,
false,
0,
System.currentTimeMillis(),
(byte)1);
message.putIntProperty(new SimpleString("threadnum"), threadNum);
message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
+ message.setBody(new ByteBufferWrapper(ByteBuffer.allocate(sizeOfMessage)));
+ //message.getBody().flip();
producer.send(message);
}
}
@@ -1522,16 +1691,26 @@
final int tn;
final int numMessages;
+
+ final int messageSize;
volatile boolean done;
- MyHandler(final int threadNum, final int numMessages)
+ MyHandler(final int threadNum, final int numMessages, final int messageSize)
{
this.tn = threadNum;
this.numMessages = numMessages;
+
+ this.messageSize = messageSize;
}
+
+ MyHandler(final int threadNum, final int numMessages)
+ {
+ this(threadNum, numMessages, DEFAULT_MESSAGE_SIZE);
+ }
+
public void onMessage(ClientMessage message)
{
try
@@ -1566,6 +1745,13 @@
latch.countDown();
}
+
+ if (message.getBody().limit() != messageSize)
+ {
+ failure = "Invalid Message size, expected " + messageSize + " but it was " + message.getBody().limit();
+
+ latch.countDown();
+ }
if (tn == threadNum && c == numMessages - 1)
{
More information about the jboss-cvs-commits
mailing list