[jboss-cvs] JBoss Messaging SVN: r5141 - in trunk: tests/src/org/jboss/messaging/tests/integration/cluster and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Oct 18 05:52:29 EDT 2008
Author: timfox
Date: 2008-10-18 05:52:29 -0400 (Sat, 18 Oct 2008)
New Revision: 5141
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
Log:
Improvements to tests plus fix lock bug in rollback on failover
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-10-18 09:52:29 UTC (rev 5141)
@@ -302,11 +302,14 @@
}
}
- public synchronized void clear()
+ public void clear()
{
+ synchronized (this)
+ {
+ buffer.clear();
+ }
+
waitForOnMessageToComplete();
-
- buffer.clear();
}
public int getClientWindowSize()
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-10-17 21:34:38 UTC (rev 5140)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-10-18 09:52:29 UTC (rev 5141)
@@ -59,7 +59,7 @@
// Constants -----------------------------------------------------
private static final int RECEIVE_TIMEOUT = 5000;
-
+
private static final int NUM_THREADS = 10;
// Attributes ----------------------------------------------------
@@ -79,7 +79,6 @@
// Public --------------------------------------------------------
-
public void testA() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -90,7 +89,7 @@
}
}, NUM_THREADS);
}
-
+
public void testB() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -101,7 +100,7 @@
}
}, NUM_THREADS);
}
-
+
public void testC() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -112,7 +111,7 @@
}
}, NUM_THREADS);
}
-
+
public void testD() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -156,7 +155,7 @@
}
}, NUM_THREADS);
}
-
+
public void testH() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -167,7 +166,7 @@
}
}, NUM_THREADS);
}
-
+
public void testI() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -178,7 +177,7 @@
}
}, NUM_THREADS);
}
-
+
public void testJ() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -189,7 +188,7 @@
}
}, NUM_THREADS);
}
-
+
public void testK() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -200,7 +199,7 @@
}
}, NUM_THREADS);
}
-
+
public void testL() throws Exception
{
runTestMultipleThreads(new RunnableT()
@@ -212,10 +211,21 @@
}, NUM_THREADS);
}
+ public void testM() throws Exception
+ {
+ runTestMultipleThreads(new RunnableT()
+ {
+ public void run(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ doTestM(sf, threadNum);
+ }
+ }, NUM_THREADS);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected void doTestA(final ClientSessionFactory sf, final int threadNum) throws Exception
{
long start = System.currentTimeMillis();
@@ -250,54 +260,13 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
- class MyHandler implements MessageHandler
- {
- final CountDownLatch latch = new CountDownLatch(1);
-
- volatile int count;
-
- public void onMessage(ClientMessage message)
- {
- try
- {
- message.acknowledge();
- }
- catch (MessagingException me)
- {
- log.error("Failed to process", me);
- }
-
- if (count >= numMessages)
- {
- return;
- }
-
- count++;
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
Set<MyHandler> handlers = new HashSet<MyHandler>();
for (ClientConsumer consumer : consumers)
{
- MyHandler handler = new MyHandler();
+ MyHandler handler = new MyHandler(threadNum, numMessages);
consumer.setMessageHandler(handler);
@@ -309,6 +278,11 @@
boolean ok = handler.latch.await(5000, TimeUnit.MILLISECONDS);
assertTrue("Didn't receive all messages", ok);
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
}
sessSend.close();
@@ -363,59 +337,18 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
for (ClientSession session : sessions)
{
session.start();
}
- class MyHandler implements MessageHandler
- {
- final CountDownLatch latch = new CountDownLatch(1);
-
- volatile int count;
-
- public void onMessage(ClientMessage message)
- {
- try
- {
- message.acknowledge();
- }
- catch (MessagingException me)
- {
- log.error("Failed to process", me);
- }
-
- if (count >= numMessages)
- {
- return;
- }
-
- count++;
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
Set<MyHandler> handlers = new HashSet<MyHandler>();
for (ClientConsumer consumer : consumers)
{
- MyHandler handler = new MyHandler();
+ MyHandler handler = new MyHandler(threadNum, numMessages);
consumer.setMessageHandler(handler);
@@ -427,6 +360,11 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(ok);
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
}
sessSend.close();
@@ -481,74 +419,23 @@
sessions.add(sessConsume);
}
- ClientSession sessSend = sf.createSession(false, true, true, false);
+ ClientSession sessSend = sf.createSession(false, false, false, false);
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.rollback();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.commit();
- class MyHandler implements MessageHandler
- {
- final CountDownLatch latch = new CountDownLatch(1);
-
- volatile int count;
-
- public void onMessage(ClientMessage message)
- {
- try
- {
- message.acknowledge();
- }
- catch (MessagingException me)
- {
- log.error("Failed to process", me);
- }
-
- if (count >= numMessages)
- {
- return;
- }
-
- count++;
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
Set<MyHandler> handlers = new HashSet<MyHandler>();
for (ClientConsumer consumer : consumers)
{
- MyHandler handler = new MyHandler();
+ MyHandler handler = new MyHandler(threadNum, numMessages);
consumer.setMessageHandler(handler);
@@ -560,20 +447,13 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(ok);
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
}
- handlers.clear();
-
- // New handlers
- for (ClientConsumer consumer : consumers)
- {
- MyHandler handler = new MyHandler();
-
- consumer.setMessageHandler(handler);
-
- handlers.add(handler);
- }
-
for (ClientSession session : sessions)
{
session.rollback();
@@ -643,31 +523,11 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.rollback();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.commit();
@@ -676,42 +536,11 @@
session.start();
}
- class MyHandler implements MessageHandler
- {
- final CountDownLatch latch = new CountDownLatch(1);
-
- volatile int count;
-
- public void onMessage(ClientMessage message)
- {
- try
- {
- message.acknowledge();
- }
- catch (MessagingException me)
- {
- log.error("Failed to process", me);
- }
-
- if (count >= numMessages)
- {
- return;
- }
-
- count++;
-
- if (count == numMessages)
- {
- latch.countDown();
- }
- }
- }
-
Set<MyHandler> handlers = new HashSet<MyHandler>();
for (ClientConsumer consumer : consumers)
{
- MyHandler handler = new MyHandler();
+ MyHandler handler = new MyHandler(threadNum, numMessages);
consumer.setMessageHandler(handler);
@@ -723,6 +552,11 @@
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
assertTrue(ok);
+
+ if (handler.failure != null)
+ {
+ throw new Exception("Handler failed: " + handler.failure);
+ }
}
handlers.clear();
@@ -730,7 +564,7 @@
// New handlers
for (ClientConsumer consumer : consumers)
{
- MyHandler handler = new MyHandler();
+ MyHandler handler = new MyHandler(threadNum, numMessages);
consumer.setMessageHandler(handler);
@@ -810,30 +644,10 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ consumeMessages(consumers, numMessages);
- assertNotNull(msg);
-
- msg.acknowledge();
- }
- }
-
sessSend.close();
for (ClientSession session : sessions)
{
@@ -886,35 +700,15 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
for (ClientSession session : sessions)
{
session.start();
}
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ consumeMessages(consumers, numMessages);
- assertNotNull(msg);
-
- msg.acknowledge();
- }
- }
-
sessSend.close();
for (ClientSession session : sessions)
{
@@ -969,64 +763,23 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.rollback();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.commit();
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ consumeMessages(consumers, numMessages);
- assertNotNull(msg);
-
- msg.acknowledge();
- }
- }
-
for (ClientSession session : sessions)
{
session.rollback();
}
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ consumeMessages(consumers, numMessages);
- assertNotNull(msg);
-
- msg.acknowledge();
- }
- }
-
-
for (ClientSession session : sessions)
{
session.commit();
@@ -1084,31 +837,11 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.rollback();
- for (int i = 0; i < numMessages; i++)
- {
- ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
- message.putIntProperty(new SimpleString("count"), i);
- message.getBody().flip();
- producer.send(message);
- }
+ sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.commit();
@@ -1117,31 +850,15 @@
session.start();
}
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ consumeMessages(consumers, numMessages);
- msg.acknowledge();
- }
- }
-
for (ClientSession session : sessions)
{
session.rollback();
}
- for (int i = 0; i < numMessages; i++)
- {
- for (ClientConsumer consumer : consumers)
- {
- ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ consumeMessages(consumers, numMessages);
- msg.acknowledge();
- }
- }
-
for (ClientSession session : sessions)
{
session.commit();
@@ -1274,28 +991,72 @@
s.close();
}
+
+ protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
+ {
+ ClientSession sessCreate = sf.createSession(false, true, true, false);
+ sessCreate.createQueue(ADDRESS, new SimpleString(threadNum + ADDRESS.toString()), null, false, false);
+
+ ClientSession sess = sf.createSession(false, true, true, false);
+
+ sess.stop();
+
+ sess.start();
+
+ ClientConsumer consumer = sess.createConsumer(new SimpleString(threadNum + ADDRESS.toString()));
+
+ ClientProducer producer = sess.createProducer(ADDRESS);
+
+ ClientMessage message = sess.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.getBody().flip();
+
+ producer.send(message);
+
+ sess.start();
+
+ ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+
+ sess.stop();
+
+ sess.start();
+
+ sess.close();
+
+ sessCreate.deleteQueue(new SimpleString(threadNum + ADDRESS.toString()));
+
+ sessCreate.close();
+ }
+
protected int getNumIterations()
{
return 5;
}
-
+
protected void setUp() throws Exception
{
super.setUp();
-
+
log.info("************ Starting test " + this.getName());
-
+
timer = new Timer();
-
+
}
-
+
protected void tearDown() throws Exception
{
log.info("************* Ending test " + this.getName());
-
+
timer.cancel();
-
+
super.tearDown();
}
@@ -1347,11 +1108,11 @@
}
}
}
-
+
do
{
List<Runner> threads = new ArrayList<Runner>();
-
+
for (int i = 0; i < numThreads; i++)
{
Runner runner = new Runner(runnable, i);
@@ -1360,18 +1121,18 @@
runner.start();
}
-
+
for (Runner thread : threads)
{
- thread.join();
+ thread.join();
assertNull(thread.throwable);
}
-
+
runnable.checkFail();
}
while (!failer.isExecuted());
-
+
session.close();
assertEquals(0, sf.getSessionCount());
@@ -1379,18 +1140,18 @@
stop();
}
}
-
+
private Failer startFailer(final long time, final ClientSession session)
{
Failer failer = new Failer(session);
timer.schedule(failer, (long)(time * Math.random()), 100);
-
+
return failer;
}
private void start() throws Exception
- {
+ {
Configuration backupConf = new ConfigurationImpl();
backupConf.setSecurityEnabled(false);
backupConf.setPacketConfirmationBatchSize(10);
@@ -1423,8 +1184,6 @@
assertEquals(0, ConnectionRegistryImpl.instance.size());
- // ConnectionRegistryImpl.instance.clear();
-
assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
backupService.stop();
@@ -1436,9 +1195,78 @@
assertEquals(0, InVMRegistry.instance.size());
}
+ private void sendMessages(final ClientSession sessSend,
+ final ClientProducer producer,
+ final int numMessages,
+ final int threadNum) throws Exception
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = sessSend.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
+ message.putIntProperty(new SimpleString("threadnum"), threadNum);
+ message.putIntProperty(new SimpleString("count"), i);
+ message.getBody().flip();
+ producer.send(message);
+ }
+ }
+
+ private void consumeMessages(final Set<ClientConsumer> consumers, final int numMessages) throws Exception
+ {
+ //We make sure the messages arrive in the order they were sent from a particular producer
+ Map<ClientConsumer, Map<Integer, Integer>> counts = new HashMap<ClientConsumer, Map<Integer, Integer>>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ for (ClientConsumer consumer : consumers)
+ {
+ Map<Integer, Integer> consumerCounts = counts.get(consumer);
+
+ if (consumerCounts == null)
+ {
+ consumerCounts = new HashMap<Integer, Integer>();
+ counts.put(consumer, consumerCounts);
+ }
+
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+
+ assertNotNull(msg);
+
+ int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)msg.getProperty(new SimpleString("count"));
+
+ Integer c = consumerCounts.get(tn);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ if (cnt != c.intValue())
+ {
+ throw new Exception("Invalid count, expected " + c + " got " + cnt);
+ }
+
+ c++;
+
+ //Wrap
+ if (c == numMessages)
+ {
+ c = 0;
+ }
+
+ consumerCounts.put(tn, c);
+
+ msg.acknowledge();
+ }
+ }
+ }
+
// Inner classes -------------------------------------------------
- class Failer extends TimerTask
+ private class Failer extends TimerTask
{
private final ClientSession session;
@@ -1470,17 +1298,18 @@
}
}
- public abstract class RunnableT extends Thread
+ private abstract class RunnableT extends Thread
{
private volatile String failReason;
+
private volatile Throwable throwable;
-
+
public void setFailed(final String reason, final Throwable throwable)
{
this.failReason = reason;
this.throwable = throwable;
}
-
+
public void checkFail()
{
if (throwable != null)
@@ -1492,6 +1321,82 @@
fail(failReason);
}
}
+
public abstract void run(final ClientSessionFactory sf, final int threadNum) throws Exception;
}
+
+ private class MyHandler implements MessageHandler
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ private Map<Integer, Integer> counts = new HashMap<Integer, Integer>();
+
+ volatile String failure;
+
+ final int tn;
+
+ final int numMessages;
+
+ volatile boolean done;
+
+ MyHandler(final int threadNum, final int numMessages)
+ {
+ this.tn = threadNum;
+
+ this.numMessages = numMessages;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (MessagingException me)
+ {
+ log.error("Failed to process", me);
+ }
+
+ if (done)
+ {
+ return;
+ }
+
+ int threadNum = (Integer)message.getProperty(new SimpleString("threadnum"));
+ int cnt = (Integer)message.getProperty(new SimpleString("count"));
+
+ Integer c = counts.get(threadNum);
+ if (c == null)
+ {
+ c = new Integer(cnt);
+ }
+
+ //log.info("got message " + threadNum + "-" + cnt);
+
+ if (cnt != c.intValue())
+ {
+ failure = "Invalid count, expected " + c + " got " + cnt;
+ log.error(failure);
+
+ latch.countDown();
+ }
+
+ if (tn == threadNum && c == numMessages - 1)
+ {
+ done = true;
+ latch.countDown();
+ }
+
+ c++;
+ //Wrap around at 100
+ if (c == 100)
+ {
+ c = 0;
+ }
+
+ counts.put(threadNum, c);
+
+
+ }
+ }
}
More information about the jboss-cvs-commits
mailing list