[Jboss-cvs] JBoss Messaging SVN: r1217 - in trunk: src/main/org/jboss/messaging/core/local tests/src/org/jboss/test/messaging/core/local
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 9 04:05:21 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-08-09 04:05:18 -0400 (Wed, 09 Aug 2006)
New Revision: 1217
Modified:
trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
Log:
RoundRobinPointToPointRouter re-implementation and extra deadlock test
Modified: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java 2006-08-08 19:52:15 UTC (rev 1216)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java 2006-08-09 08:05:18 UTC (rev 1217)
@@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import java.util.Collections;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -45,6 +46,7 @@
* consumers rather than in contiguous blocks.
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @version <tt>$Revision: 1 $</tt>
* $Id: $
*/
@@ -63,51 +65,51 @@
// it's important that we're actually using an ArrayList for fast array access
protected ArrayList receivers;
- protected int pos;
+ protected int target;
// Constructors --------------------------------------------------
public RoundRobinPointToPointRouter()
{
receivers = new ArrayList();
- reset();
+ target = 0;
}
// Router implementation -----------------------------------------
public Set handle(DeliveryObserver observer, Routable routable, Transaction tx)
{
- Set deliveries = new HashSet();
- boolean selectorRejected = false;
- ArrayList receiversCopy = null;
- int firstPos;
-
+ int initial, current;
+ ArrayList receiversCopy;
+
synchronized(receivers)
{
if (receivers.isEmpty())
{
- return deliveries;
+ return Collections.EMPTY_SET;
}
- // make a copy to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
- int crtSize = receivers.size();
- receiversCopy = new ArrayList(crtSize);
+
+ // try to release the lock as quickly as possible and make a copy of the receivers array
+ // to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
+
+ receiversCopy = new ArrayList(receivers.size());
receiversCopy.addAll(receivers);
- if (pos >= crtSize)
- {
- pos = 0;
- }
- firstPos = pos;
+ initial = target;
+ current = initial;
}
+ Set deliveries = new HashSet();;
+ boolean selectorRejected = false;
+
while (true)
{
- Receiver receiver = (Receiver)receiversCopy.get(pos);
+ Receiver r = (Receiver)receiversCopy.get(current);
try
{
- Delivery d = receiver.handle(observer, routable, tx);
+ Delivery d = r.handle(observer, routable, tx);
- if (trace) { log.trace("receiver " + receiver + " handled " + routable + " and returned " + d); }
+ if (trace) { log.trace("receiver " + r + " handled " + routable + " and returned " + d); }
if (d != null && !d.isCancelled())
{
@@ -115,7 +117,7 @@
{
// deliver to the first receiver that accepts
deliveries.add(d);
- incPos();
+ shiftTarget(current);
break;
}
else
@@ -127,13 +129,13 @@
catch(Throwable t)
{
// broken receiver - log the exception and ignore it
- log.error("The receiver " + receiver + " is broken", t);
+ log.error("The receiver " + r + " is broken", t);
}
- incPos();
+ current = (current + 1) % receiversCopy.size();
// if we've tried them all then we break
- if (pos == firstPos)
+ if (current == initial)
{
break;
}
@@ -155,14 +157,13 @@
{
return false;
}
+
receivers.add(r);
-
- reset();
+ target = 0;
}
return true;
}
-
public boolean remove(Receiver r)
{
synchronized(receivers)
@@ -171,7 +172,7 @@
if (removed)
{
- reset();
+ target = 0;
}
return removed;
@@ -183,8 +184,7 @@
synchronized(receivers)
{
receivers.clear();
-
- reset();
+ target = 0;
}
}
@@ -204,27 +204,18 @@
}
}
-
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
- protected void reset()
+
+ protected void shiftTarget(int currentTarget)
{
- // Reset back to the first one
- pos = 0;
- }
-
- protected void incPos()
- {
- pos++;
-
- // Wrap around
- if (pos >= receivers.size())
+ synchronized(receivers)
{
- pos = 0;
+ int size = receivers.size();
+ target = Math.max((target + 1) % size, (currentTarget + 1) % size);
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java 2006-08-08 19:52:15 UTC (rev 1216)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java 2006-08-09 08:05:18 UTC (rev 1217)
@@ -35,7 +35,12 @@
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.test.messaging.MessagingTestCase;
-
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision: 1 $</tt>
+ * $Id: $
+ */
public class RoundRobinPointToPointRouterTest extends MessagingTestCase
{
// Constants -----------------------------------------------------
@@ -423,14 +428,58 @@
assertNotNull(dels);
assertEquals(0, dels.size());
}
+
+
+ /**
+ * http://jira.jboss.org/jira/browse/JBMESSAGING-491
+ */
+ public void testDeadlock() throws Exception
+ {
+ final Router router = new RoundRobinPointToPointRouter();
+
+ LockingReceiver receiver = new LockingReceiver();
+ router.add(receiver);
+
+ final Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ // sends the message to the router on a separate thread
+ router.handle(null, new SimpleMessageReference(), null);
+ }
+ }, "Message sending thread");
+
+ // start the sending tread, which will immediately grab the router's "receivers" lock, and it
+ // will sleep for 3 seconds before attempting to grab LockingReceiver's lock.
+ t.start();
+
+
+ // in the mean time, the main thread immediately grabs receiver's lock ...
+
+ synchronized(receiver.getLock())
+ {
+ // ... sleeps for 500 ms to allow sender thread time to grab router's "receivers" lock
+ Thread.sleep(500);
+
+ // ... and try to remove the receiver form router
+ router.remove(receiver);
+ }
+
+ // normally, receiver removal should be immediate, as the router releases receiver's lock
+ // immediately, so test should complete. Pre-JBMESSAGING-491, the test deadlocks.
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
protected void checkReceiverGotRef(SimpleReceiver[] receivers, int pos)
{
log.info("checkReceiverGotRef:" + pos);
for (int i = 0; i < receivers.length; i++)
{
SimpleReceiver r = receivers[i];
-
+
if (i == pos)
{
assertTrue(r.gotRef);
@@ -441,21 +490,19 @@
}
}
}
-
+
protected void resetReceivers(SimpleReceiver[] receivers)
{
for (int i = 0; i < receivers.length; i++)
{
SimpleReceiver r = receivers[i];
-
+
r.gotRef = false;
}
}
- // Package protected ---------------------------------------------
+
- // Protected -----------------------------------------------------
-
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
@@ -486,5 +533,45 @@
}
}
+
+
+ class LockingReceiver implements Receiver
+ {
+ private Object lock;
+
+ public LockingReceiver()
+ {
+ lock = new Object();
+ }
+
+ public Delivery handle(DeliveryObserver observer, Routable routable, Transaction tx)
+ {
+ // The delivering thread needs to grab the receiver's lock to complete delivery; this
+ // is how Messaging receivers are written, anyway. We simulate the race condition by
+ // putting the sending thread to sleep for 3 seconds before allowing it to attempt to
+ // grab the lock
+
+ try
+ {
+ Thread.sleep(3000);
+ }
+ catch(InterruptedException e)
+ {
+ // this shouldn't happen in the test
+ return null;
+ }
+
+ synchronized(lock)
+ {
+ return new SimpleDelivery(null, null, true, true);
+ }
+ }
+
+ public Object getLock()
+ {
+ return lock;
+ }
+ }
+
}
More information about the jboss-cvs-commits
mailing list