[jboss-cvs] JBoss Messaging SVN: r2204 - in trunk: src/main/org/jboss/messaging/core/plugin/postoffice/cluster and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Feb 8 09:21:08 EST 2007
Author: timfox
Date: 2007-02-08 09:21:08 -0500 (Thu, 08 Feb 2007)
New Revision: 2204
Modified:
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
trunk/tests/etc/log4j.xml
trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
Log:
Mainly fix MessageRedistributionTest
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -239,7 +239,7 @@
//Sanity check
if (((List)param).isEmpty())
{
- log.error("Got a polled callback list - but it is empty!!!");
+ log.error("Got a polled callback list - but it is empty!!! See http://jira.jboss.org/jira/browse/JBMESSAGING-818");
}
packet = new PolledCallbacksDelivery((List)param, resp.getSessionId());
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -158,6 +158,15 @@
private String groupName;
private volatile boolean started;
+
+ //FIXME using a stopping flag is not a good approach and introduces a race condition
+ //http://jira.jboss.org/jira/browse/JBMESSAGING-819
+ //the code can check stopping and find it to be false, then the service can stop, setting stopping to true
+ //then actually stopping the post office, then the same thread that checked stopping continues and performs
+ //its action only to find the service stopped
+ //Should use a read-write lock instead
+ //One way to minimise the chance of the race happening is to sleep for a little while after setting stopping to true
+ //before actually stopping the service (see below)
private volatile boolean stopping;
private JChannelFactory jChannelFactory;
@@ -333,11 +342,15 @@
log.warn("Attempt to stop() but " + this + " is not started");
return;
}
+
+ //Need to send this *before* stopping
+ syncSendRequest(new LeaveClusterRequest(getNodeId()));
stopping = true;
+
+ //FIXME http://jira.jboss.org/jira/browse/JBMESSAGING-819 this is a temporary kludge for now
+ Thread.sleep(1000);
- syncSendRequest(new LeaveClusterRequest(getNodeId()));
-
statsSender.stop();
super.stop(sendNotification);
@@ -715,6 +728,11 @@
*/
public void asyncSendRequest(ClusterRequest request) throws Exception
{
+ if (stopping)
+ {
+ return;
+ }
+
if (trace) { log.trace(this + " sending asynchronously " + request + " to group"); }
byte[] bytes = writeRequest(request);
@@ -726,6 +744,11 @@
*/
public void asyncSendRequest(ClusterRequest request, int nodeId) throws Exception
{
+ if (stopping)
+ {
+ return;
+ }
+
Address address = this.getAddressForNodeId(nodeId, false);
if (address == null)
@@ -765,12 +788,15 @@
if (tx == null)
{
- throw new IllegalStateException("Cannot find transaction transaction id: " + id);
+ //Commit can come in after the node has left - this is ok
+ if (trace) { log.trace("Cannot find transaction in map, node may have already left"); }
}
-
- tx.commit(this);
-
- if (trace) { log.trace(this + " committed transaction " + id ); }
+ else
+ {
+ tx.commit(this);
+
+ if (trace) { log.trace(this + " committed transaction " + id ); }
+ }
}
public void rollbackTransaction(TransactionId id) throws Throwable
@@ -786,12 +812,15 @@
if (tx == null)
{
- throw new IllegalStateException("Cannot find transaction transaction id: " + id);
+ // Rollback can come in after the node has left - this is ok
+ if (trace) { log.trace("Cannot find transaction in map, node may have already left"); }
}
-
- tx.rollback(this);
-
- if (trace) { log.trace(this + " committed transaction " + id ); }
+ else
+ {
+ tx.rollback(this);
+
+ if (trace) { log.trace(this + " committed transaction " + id ); }
+ }
}
public void updateQueueStats(int nodeId, List statsList) throws Exception
@@ -814,7 +843,7 @@
if (nameMap == null)
{
// This is ok, the node might have left
- if (trace) { log.trace(this + " cannot find node in name map, i guess the node might have left?"); }
+ if (trace) { log.trace(this + " cannot find node in name map, the node might have left"); }
}
else
{
@@ -1728,6 +1757,11 @@
*/
private void syncSendRequest(ClusterRequest request) throws Exception
{
+ if (stopping)
+ {
+ return;
+ }
+
if (trace) { log.trace(this + " sending synch request " + request); }
Message message = new Message(null, null, writeRequest(request));
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -395,7 +395,7 @@
name,
del.getReference().getMessage());
- if (!del.getReference().getMessage().isReliable())
+ if (!del.getReference().getMessage().isReliable() || !recoverable)
{
//We can ack it now
del.acknowledge(null);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/PullMessagesRequest.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -135,18 +135,26 @@
{
//We need to ack the delivery
+ if (trace) { log.trace(this + " committing, tx: " + this.txId); }
+
//We need to ack it in memory only
//since it would have been acked on the pulling node
LocalClusteredQueue queue = (LocalClusteredQueue)reliableDelivery.getObserver();
queue.acknowledgeFromCluster(reliableDelivery);
+
+ if (trace) { log.trace(this + " committed, tx: " + this.txId); }
}
public void rollback(PostOfficeInternal office) throws Throwable
{
//We need to cancel the delivery
+ if (trace) { log.trace(this + " rolling back, tx: " + this.txId); }
+
reliableDelivery.cancel();
+
+ if (trace) { log.trace(this + " rolled back, tx: " + this.txId); }
}
public void read(DataInputStream in) throws Exception
Modified: trunk/tests/etc/log4j.xml
===================================================================
--- trunk/tests/etc/log4j.xml 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/etc/log4j.xml 2007-02-08 14:21:08 UTC (rev 2204)
@@ -47,8 +47,8 @@
</category>
<category name="org.jboss.remoting">
- <priority value="TRACE" class="org.jboss.logging.XLevel"/>
- <!-- <priority value="DEBUG"/> -->
+ <!-- <priority value="TRACE" class="org.jboss.logging.XLevel"/> -->
+ <priority value="DEBUG"/>
</category>
<category name="org.jboss">
@@ -57,14 +57,17 @@
<category name="org.jboss.messaging">
<priority value="TRACE" class="org.jboss.logging.XLevel"/>
+ <!-- <priority value="DEBUG"/> -->
</category>
<category name="org.jboss.jms">
<priority value="TRACE" class="org.jboss.logging.XLevel"/>
+ <!-- <priority value="DEBUG"/> -->
</category>
<category name="org.jboss.test">
<priority value="TRACE" class="org.jboss.logging.XLevel"/>
+ <!-- <priority value="DEBUG"/> -->
</category>
<!-- Ignoring trace from these: -->
@@ -82,8 +85,8 @@
</category>
<category name="org.jboss.jms.server.remoting.JMSWireFormat">
- <priority value="TRACE" class="org.jboss.logging.XLevel"/>
- <!-- <priority value="DEBUG"/> -->
+ <!-- <priority value="TRACE" class="org.jboss.logging.XLevel"/> -->
+ <priority value="DEBUG"/>
</category>
<root>
Modified: trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/src/org/jboss/test/messaging/core/SimpleReceiver.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -22,10 +22,8 @@
package org.jboss.test.messaging.core;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Channel;
@@ -45,6 +43,7 @@
* an "active" delivery (NACKING) undelivered, or throw unchecked exceptions.
*
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @version <tt>$Revision$</tt>
*
* $Id$
@@ -62,8 +61,6 @@
public static final String SELECTOR_REJECTING = "SELECTOR_REJECTING";
public static final String ACCEPTING_TO_MAX = "ACCEPTING_TO_MAX";
- private static final String INVOCATION_COUNT = "INVOCATION_COUNT";
-
// Static --------------------------------------------------------
// Attributes ----------------------------------------------------
@@ -72,12 +69,14 @@
private List messages;
private String state;
private String name;
- private Channel channel;
- private String futureState;
- private int invocationsToFutureStateCount;
- private Map waitingArea;
+ private Channel channel;
+
private boolean immediateAsynchronousAcknowledgment;
private int maxRefs;
+
+ private int count;
+ private int waitForCount = -1;
+
// Constructors --------------------------------------------------
@@ -113,8 +112,6 @@
this.state = state;
this.channel = channel;
messages = new ArrayList();
- waitingArea = new HashMap();
- waitingArea.put(INVOCATION_COUNT, new Integer(0));
immediateAsynchronousAcknowledgment = false;
}
@@ -123,7 +120,7 @@
public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
{
log.trace(this + " got routable:" + ref);
-
+
try
{
if (ref == null)
@@ -163,8 +160,10 @@
log.trace("State is:" + state);
boolean done = ACKING.equals(state) ? true : false;
- log.trace(this + " is " + (done ? "ACKing" : "NACKing") + " message " + ref);
+ //NOTE! it is NOT Nacking, it is keeping - don't say NACKing - it is misleading (nack means cancel)
+ log.trace(this + " is " + (done ? "ACKing" : "Keeping") + " message " + ref);
+
Message m = ref.getMessage();
SimpleDelivery delivery = new SimpleDelivery(observer, ref, done);
@@ -186,18 +185,14 @@
}
finally
{
- synchronized(waitingArea)
+ synchronized (this)
{
- if (futureState != null && --invocationsToFutureStateCount == 0)
+ count++;
+ if (waitForCount != -1 && count >= waitForCount)
{
- state = futureState;
- futureState = null;
+ this.notify();
}
-
- Integer crt = (Integer)waitingArea.get(INVOCATION_COUNT);
- waitingArea.put(INVOCATION_COUNT, new Integer(crt.intValue() + 1));
- waitingArea.notifyAll();
- }
+ }
}
}
@@ -245,38 +240,34 @@
return l;
}
- public boolean waitForHandleInvocations(int count)
- {
- return waitForHandleInvocations(count, Long.MAX_VALUE);
- }
-
/**
* Blocks until handle() is called for the specified number of times.
*
* @return true if the handle was invoked the specified number of times or false if the method
* exited with timeout.
*/
- public boolean waitForHandleInvocations(int count, long timeout)
+ public boolean waitForHandleInvocations(int waitFor, long timeout)
{
long start = System.currentTimeMillis();
- synchronized(waitingArea)
+
+ synchronized(this)
{
- while(true)
- {
- Integer invocations = (Integer)waitingArea.get(INVOCATION_COUNT);
-
- if (invocations.intValue() == count)
+ this.waitForCount = waitFor;
+
+ while (this.count < waitForCount)
+ {
+ if (timeout < 0)
{
- return true;
- }
- if (timeout <= 0)
- {
+ resetInvocationCount();
return false;
}
+
try
{
- waitingArea.wait(timeout);
- timeout -= System.currentTimeMillis() - start;
+ this.wait(timeout);
+ long now = System.currentTimeMillis();
+ timeout -= now - start;
+ start = now;
}
catch(InterruptedException e)
{
@@ -284,17 +275,12 @@
}
}
}
+
+ resetInvocationCount();
+
+ return true;
}
- public void resetInvocationCount()
- {
- synchronized(waitingArea)
- {
- waitingArea.put(INVOCATION_COUNT, new Integer(0));
- waitingArea.notifyAll();
- }
- }
-
public void acknowledge(Message r, Transaction tx) throws Throwable
{
log.debug(this + " acknowledging " + r);
@@ -367,29 +353,6 @@
log.trace(this + " cancelled " + r);
}
-
- public void setState(String state)
- {
- checkValid(state);
- this.state = state;
- }
-
- /**
- * Sets the given state on the receiver, but only after "invocationCount" handle() invocations.
- * The state changes <i>after</i> the last invocation.
- */
- public void setState(String state, int invocationCount)
- {
- checkValid(state);
- futureState = state;
- invocationsToFutureStateCount = invocationCount;
- }
-
- public String getState()
- {
- return state;
- }
-
public String toString()
{
return "Receiver["+ name +"](" + state + ")";
@@ -413,6 +376,12 @@
throw new IllegalArgumentException("Unknown receiver state: " + state);
}
}
+
+ private void resetInvocationCount()
+ {
+ this.waitForCount = -1;
+ this.count = 0;
+ }
// Inner classes -------------------------------------------------
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/base/PostOfficeTestBase.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -127,6 +127,7 @@
"Clustered", ms, pm, tr, ff, cf, pool,
groupName, jChannelFactory,
stateTimeout, castTimeout, pullPolicy, rf, mapper, 1000);
+
postOffice.start();
return postOffice;
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-02-08 10:52:31 UTC (rev 2203)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/cluster/RedistributionWithDefaultMessagePullPolicyTest.java 2007-02-08 14:21:08 UTC (rev 2204)
@@ -524,6 +524,8 @@
try
{
+ log.trace("Creating post offices");
+
office1 = (DefaultClusteredPostOffice)
createClusteredPostOffice(1, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
sc, ms, pm, tr, pool);
@@ -543,6 +545,8 @@
office5 = (DefaultClusteredPostOffice)
createClusteredPostOffice(5, "testgroup", 10000, 10000, new DefaultMessagePullPolicy(),
sc, ms, pm, tr, pool);
+
+ log.trace("Created postoffices");
LocalClusteredQueue queue1 =
new LocalClusteredQueue(office1, 1, "queue1", channelIDManager.getID(), ms, pm,
@@ -568,17 +572,25 @@
new LocalClusteredQueue(office5, 5, "queue1", channelIDManager.getID(), ms, pm,
true, recoverable, (QueuedExecutor) pool.get(), -1, null, tr);
office5.bindClusteredQueue(new SimpleCondition("queue1"), queue5);
+
+ log.trace("Created and bound queues");
final int NUM_MESSAGES = 100;
+ log.trace("sending messages");
this.sendMessages("queue1", persistent, office1, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office2, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office3, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office4, NUM_MESSAGES, null);
this.sendMessages("queue1", persistent, office5, NUM_MESSAGES, null);
+
+ log.trace("sent messages");
Thread.sleep(2000);
+
+ log.trace("Finished small sleep");
+
//Check the sizes
log.trace("Here are the sizes:");
@@ -602,14 +614,29 @@
assertEquals(NUM_MESSAGES, queue5.memoryRefCount());
assertEquals(0, queue5.getDeliveringCount());
+
+ log.trace("Creating receiver");
SimpleReceiver receiver = new SimpleReceiver("blah", SimpleReceiver.ACCEPTING);
+
+ log.trace("Created receiver");
queue1.add(receiver);
+
+ log.trace("Added receiver");
queue1.deliver(false);
+
+ log.trace("Called deliver");
- Thread.sleep(7000);
+ log.trace("Waiting for handleInvocations");
+ long start = System.currentTimeMillis();
+ receiver.waitForHandleInvocations(NUM_MESSAGES * 5, 60000);
+ long end = System.currentTimeMillis();
+ log.trace("I waited for " + (end - start) + " ms");
+
+ Thread.sleep(2000);
+
log.trace("Here are the sizes:");
log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
@@ -640,6 +667,8 @@
assertEquals(NUM_MESSAGES * 5, messages.size());
Iterator iter = messages.iterator();
+
+ log.trace("Acknowledging messages");
while (iter.hasNext())
{
@@ -647,6 +676,8 @@
receiver.acknowledge(msg, null);
}
+
+ log.trace("Acknowledged messages");
receiver.clear();
@@ -800,7 +831,8 @@
queue5.add(receiver5);
receiver1.setMaxRefs(5);
- queue1.deliver(false);
+ queue1.deliver(false);
+ receiver1.waitForHandleInvocations(5, 20000);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 5, queue1.memoryRefCount());
assertEquals(5, queue1.getDeliveringCount());
@@ -810,7 +842,8 @@
receiver1.setMaxRefs(0);
receiver2.setMaxRefs(10);
- queue2.deliver(false);
+ queue2.deliver(false);
+ receiver2.waitForHandleInvocations(10, 20000);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 10, queue2.memoryRefCount());
assertEquals(10, queue2.getDeliveringCount());
@@ -818,7 +851,8 @@
receiver2.setMaxRefs(0);
receiver3.setMaxRefs(15);
- queue3.deliver(false);
+ queue3.deliver(false);
+ receiver3.waitForHandleInvocations(15, 20000);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 15, queue3.memoryRefCount());
assertEquals(15, queue3.getDeliveringCount());
@@ -826,7 +860,8 @@
receiver3.setMaxRefs(0);
receiver4.setMaxRefs(20);
- queue4.deliver(false);
+ queue4.deliver(false);
+ receiver4.waitForHandleInvocations(20, 20000);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 20, queue4.memoryRefCount());
assertEquals(20, queue4.getDeliveringCount());
@@ -834,15 +869,14 @@
receiver4.setMaxRefs(0);
receiver5.setMaxRefs(25);
- queue5.deliver(false);
+ queue5.deliver(false);
+ receiver5.waitForHandleInvocations(25, 20000);
Thread.sleep(1000);
assertEquals(NUM_MESSAGES - 25, queue5.memoryRefCount());
assertEquals(25, queue5.getDeliveringCount());
acknowledgeAll(receiver5);
receiver5.setMaxRefs(0);
- Thread.sleep(1000);
-
assertTrue(office1.getHoldingTransactions().isEmpty());
assertTrue(office2.getHoldingTransactions().isEmpty());
assertTrue(office3.getHoldingTransactions().isEmpty());
@@ -859,7 +893,9 @@
//Consume the rest from queue 5
receiver5.setMaxRefs(NUM_MESSAGES - 25);
queue5.deliver(false);
- Thread.sleep(5000);
+ receiver5.waitForHandleInvocations(NUM_MESSAGES - 25, 20000);
+
+ Thread.sleep(2000);
log.trace("receiver5 msgs:" + receiver5.getMessages().size());
@@ -907,7 +943,9 @@
receiver5.setMaxRefs(5);
queue5.deliver(false);
- Thread.sleep(5000);
+ receiver5.waitForHandleInvocations(5, 20000);
+
+ Thread.sleep(2000);
log.trace("Here are the sizes 4:");
log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
@@ -948,6 +986,8 @@
receiver5.setMaxRefs(1);
queue5.deliver(false);
+ receiver5.waitForHandleInvocations(1, 20000);
+
Thread.sleep(2000);
log.trace("Here are the sizes 5:");
@@ -986,9 +1026,12 @@
//From queue 4 consume everything else
- receiver4.setMaxRefs(NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1);
+ int num = NUM_MESSAGES - 15 + NUM_MESSAGES - 20 + NUM_MESSAGES - 11 + NUM_MESSAGES - 11 + 1;
+ receiver4.setMaxRefs(num);
queue4.deliver(false);
- Thread.sleep(7000);
+ receiver4.waitForHandleInvocations(num, 20000);
+
+ Thread.sleep(2000);
log.trace("Here are the sizes 6:");
log.trace("queue1, refs:" + queue1.memoryRefCount() + " dels:" + queue1.getDeliveringCount());
More information about the jboss-cvs-commits
mailing list