[jboss-cvs] JBoss Messaging SVN: r2879 - in trunk: tests/src/org/jboss/test/messaging/jms/clustering and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 11 20:09:22 EDT 2007
Author: timfox
Date: 2007-07-11 20:09:22 -0400 (Wed, 11 Jul 2007)
New Revision: 2879
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
Fixed MultipleFailoverTest
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-11 21:38:09 UTC (rev 2878)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-07-12 00:09:22 UTC (rev 2879)
@@ -30,18 +30,19 @@
import javax.jms.MessageListener;
import javax.jms.Session;
+import org.jboss.jms.delegate.Cancel;
import org.jboss.jms.delegate.ConsumerDelegate;
+import org.jboss.jms.delegate.DefaultCancel;
+import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.delegate.Cancel;
-import org.jboss.jms.delegate.DefaultCancel;
-import org.jboss.jms.delegate.DeliveryInfo;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.util.Future;
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
@@ -176,7 +177,11 @@
// or add anything to the tx for this session
if (!isConnectionConsumer)
{
+ if (trace) { log.trace("Calling postDeliver"); }
+
sess.postDeliver();
+
+ if (trace) { log.trace("Called postDeliver"); }
}
}
@@ -245,7 +250,7 @@
}
// Public ---------------------------------------------------------------------------------------
-
+
/**
* Handles a message sent from the server.
*
@@ -737,6 +742,9 @@
{
boolean notified = false;
+ if (trace) { log.trace("Receiver thread:" + receiverThread + " listener:" + listener + " listenerRunning:" + listenerRunning +
+ " sessionExecutor:" + sessionExecutor); }
+
// If we have a thread waiting on receive() we notify it
if (receiverThread != null)
{
@@ -754,6 +762,7 @@
listenerRunning = true;
if (trace) { log.trace(this + " scheduled a new ListenerRunner"); }
+
this.queueRunner(new ListenerRunner());
}
@@ -763,6 +772,8 @@
// Make sure we notify any thread waiting for last delivery
if (waitingForLastDelivery && !notified)
{
+ if (trace) { log.trace("Notifying"); }
+
mainLock.notifyAll();
}
}
@@ -895,8 +906,54 @@
mp = (MessageProxy)buffer.removeFirst();
- if (!buffer.isEmpty())
+// if (!buffer.isEmpty())
+// {
+// //Queue up the next runner to run
+//
+// if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
+//
+// queueRunner(this);
+//
+// if (trace) { log.trace("Queued next onMessage to run"); }
+// }
+// else
+// {
+// if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
+//
+// listenerRunning = false;
+// }
+ }
+
+ /*
+ * Bug here is as follows:
+ * The next runner gets scheduled BEFORE the on message is executed
+ * so if the onmessage fails on acking it will be put on hold
+ * and failover will kick in, this will clear the executor
+ * so the next queud one disappears at everything grinds to a halt
+ *
+ * Solution - don't use a session executor - have a sesion thread instead much nicer
+ */
+
+
+ if (mp != null)
+ {
+ try
{
+ callOnMessage(sessionDelegate, theListener, consumerID, queueName,
+ false, mp, ackMode, maxDeliveries, null, shouldAck);
+
+ if (trace) { log.trace("Called callonMessage"); }
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to deliver message", t);
+ }
+ }
+
+ synchronized (mainLock)
+ {
+ if (!buffer.isEmpty())
+ {
//Queue up the next runner to run
if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
@@ -910,26 +967,15 @@
if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
listenerRunning = false;
- }
+ }
}
-
- if (mp != null)
- {
- try
- {
- callOnMessage(sessionDelegate, theListener, consumerID, queueName,
- false, mp, ackMode, maxDeliveries, null, shouldAck);
- }
- catch (JMSException e)
- {
- log.error("Failed to deliver message", e);
- }
- }
if (handleFlowControl)
{
checkStart();
}
+
+ if (trace) { log.trace("Exiting run()"); }
}
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-07-11 21:38:09 UTC (rev 2878)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/MultipleFailoverTest.java 2007-07-12 00:09:22 UTC (rev 2879)
@@ -34,6 +34,7 @@
import javax.jms.Session;
import javax.jms.TextMessage;
+import org.jboss.jms.client.JBossConnection;
import org.jboss.test.messaging.tools.ServerManagement;
/**
@@ -197,17 +198,17 @@
public void testFailoverFloodTwoServers() throws Exception
{
- Connection conn = null;
+ JBossConnection conn = null;
try
{
- conn = this.createConnectionOnServer(cf, 1);
+ conn = (JBossConnection)this.createConnectionOnServer(cf, 1);
Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sessCons = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer cons = sessCons.createConsumer(queue[0]);
+ MessageConsumer cons = sessCons.createConsumer(queue[1]);
MyListener list = new MyListener();
@@ -215,7 +216,7 @@
conn.start();
- MessageProducer prod = sessSend.createProducer(queue[0]);
+ MessageProducer prod = sessSend.createProducer(queue[1]);
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
@@ -236,13 +237,19 @@
if (count % 100 == 0)
{
- log.info("sent " + count);
+ log.info("sent " + count + " server id " + conn.getServerID());
}
-
+
count++;
+
+ Thread.sleep(5);
}
-
+
+ log.info("done send");
+
+ log.info("Waiting to join thread");
t.join(5 * 60 * 60 * 1000);
+ log.info("joined");
if (killer.failed)
{
@@ -252,13 +259,14 @@
//We check that we received all the message
//we allow for duplicates, see http://jira.jboss.org/jira/browse/JBMESSAGING-604
+ if (!list.waitFor(count - 1))
+ {
+ fail("Timed out waiting for message");
+ }
+
conn.close();
conn = null;
- if (!list.waitFor(count))
- {
- fail("Timed out waiting for message");
- }
count = 0;
Iterator iter = list.msgs.iterator();
@@ -337,10 +345,11 @@
{
try
{
- Thread.sleep(10000);
+ Thread.sleep(5000);
log.info("Killing server 1");
ServerManagement.kill(1);
+ log.info("Killed server 1");
Thread.sleep(5000);
@@ -392,10 +401,13 @@
ServerManagement.start(1, "all", false);
ServerManagement.deployQueue("testDistributedQueue", 1);
+ Thread.sleep(10000);
+
log.info("killer DONE");
}
catch (Exception e)
{
+ log.error("Killer failed", e);
failed = true;
}
@@ -406,8 +418,6 @@
class MyListener implements MessageListener
{
- int count = 0;
-
volatile boolean failed;
Set msgs = new TreeSet();
@@ -418,40 +428,41 @@
boolean waitFor(int i)
{
+ log.info("Waiting for message " + i);
synchronized (obj)
{
+ log.info("here");
long toWait = 30000;
while (maxcnt < i && toWait > 0)
{
long start = System.currentTimeMillis();
try
- {
- obj.wait(30000);
+ {
+ obj.wait(60000);
}
catch (InterruptedException e)
{}
- if (i <= maxcnt)
+ if (maxcnt < i)
{
toWait -= System.currentTimeMillis() - start;
}
}
- return maxcnt < i;
+ return maxcnt == i;
}
+
}
public void onMessage(Message msg)
{
try
{
- TextMessage tm = (TextMessage)msg;
-
- if (count % 100 == 0)
+ int cnt = msg.getIntProperty("cnt");
+
+ if (cnt % 100 == 0)
{
- log.info("Received message " + tm.getText() + " (" + tm + ")");
+ log.info(this + " Received message " + cnt);
}
- count++;
-
/*
IMPORTANT NOTE
@@ -474,16 +485,15 @@
Therefore we only count that the total messages were received
*/
-
- int cnt = msg.getIntProperty("cnt");
-
+
msgs.add(new Integer(cnt));
maxcnt = Math.max(maxcnt, cnt);
+
synchronized (obj)
{
obj.notify();
- }
+ }
}
catch (Exception e)
{
Modified: trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-07-11 21:38:09 UTC (rev 2878)
+++ trunk/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2007-07-12 00:09:22 UTC (rev 2879)
@@ -145,7 +145,7 @@
}
else
{
- Server s = acquireRemote(2, i, true);
+ Server s = acquireRemote(10, i, true);
if (s != null)
{
@@ -543,7 +543,7 @@
log.info("spawned server " + i + ", waiting for it to come online");
- while(System.currentTimeMillis() - startTime < maxWaitTime * 1000)
+ while (System.currentTimeMillis() - startTime < maxWaitTime * 1000)
{
s = acquireRemote(1, i, true);
if (s != null)
More information about the jboss-cvs-commits
mailing list