[jboss-cvs] JBoss Messaging SVN: r2242 - in trunk: tests/src/org/jboss/test/messaging/core/local and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 9 10:37:38 EST 2007
Author: timfox
Date: 2007-02-09 10:37:38 -0500 (Fri, 09 Feb 2007)
New Revision: 2242
Modified:
trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java
trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-505
Modified: trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java 2007-02-09 13:56:20 UTC (rev 2241)
+++ trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java 2007-02-09 15:37:38 UTC (rev 2242)
@@ -58,36 +58,41 @@
private boolean trace = log.isTraceEnabled();
- List receivers;
+ private List receivers;
+
+ private volatile boolean makeCopy;
+
+ private ArrayList receiversCopy;
// Constructors --------------------------------------------------
public FirstReceiverPointToPointRouter()
{
receivers = new ArrayList();
+
+ makeCopy = true;
}
// Router implementation -----------------------------------------
public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
{
- ArrayList receiversCopy;
-
- synchronized(receivers)
+ if (makeCopy)
{
- if (receivers.isEmpty())
- {
- return null;
+ synchronized (this)
+ {
+ //We make a copy of the receivers to avoid a race condition:
+ //http://jira.jboss.org/jira/browse/JBMESSAGING-505
+ //Note that we do not make a copy every time - only when the receivers have changed
+
+ receiversCopy = new ArrayList(receivers);
+
+ makeCopy = false;
}
-
- // try to release the lock as quickly as possible and make a copy of the receivers array
- // to avoid deadlock
-
- receiversCopy = new ArrayList(receivers.size());
- receiversCopy.addAll(receivers);
- }
-
+ }
+
Delivery del = null;
+
boolean selectorRejected = false;
for(Iterator i = receiversCopy.iterator(); i.hasNext(); )
@@ -130,58 +135,53 @@
}
- public boolean add(Receiver r)
+ public synchronized boolean add(Receiver r)
{
- synchronized(receivers)
+ if (receivers.contains(r))
{
- if (receivers.contains(r))
- {
- return false;
- }
- receivers.add(r);
+ return false;
}
+
+ receivers.add(r);
+
+ makeCopy = true;
+
return true;
}
- public boolean remove(Receiver r)
- {
- synchronized(receivers)
+ public synchronized boolean remove(Receiver r)
+ {
+ boolean removed = receivers.remove(r);
+
+ if (removed)
{
- return receivers.remove(r);
+ makeCopy = true;
}
+
+ return removed;
}
- public void clear()
+ public synchronized void clear()
{
- synchronized(receivers)
- {
- receivers.clear();
- }
+ receivers.clear();
+
+ makeCopy = true;
}
- public boolean contains(Receiver r)
+ public synchronized boolean contains(Receiver r)
{
- synchronized(receivers)
- {
- return receivers.contains(r);
- }
+ return receivers.contains(r);
}
- public Iterator iterator()
+ public synchronized Iterator iterator()
{
- synchronized(receivers)
- {
- return receivers.iterator();
- }
+ return receivers.iterator();
}
- public int getNumberOfReceivers()
+ public synchronized int getNumberOfReceivers()
{
- synchronized(receivers)
- {
- return receivers.size();
- }
+ return receivers.size();
}
Modified: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java 2007-02-09 13:56:20 UTC (rev 2241)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java 2007-02-09 15:37:38 UTC (rev 2242)
@@ -58,50 +58,62 @@
private boolean trace = log.isTraceEnabled();
// it's important that we're actually using an ArrayList for fast array access
- protected ArrayList receivers;
+ private ArrayList receivers;
- protected int target;
-
+ private int target;
+
+ private volatile boolean makeCopy;
+
+ private ArrayList receiversCopy;
+
// Constructors --------------------------------------------------
public RoundRobinPointToPointRouter()
{
receivers = new ArrayList();
+
target = 0;
+
+ makeCopy = true;
}
// Router implementation -----------------------------------------
public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
- {
- int initial, current;
- ArrayList receiversCopy;
-
- synchronized(receivers)
+ {
+ if (makeCopy)
{
- if (receivers.isEmpty())
- {
- return null;
- }
-
- // try to release the lock as quickly as possible and make a copy of the receivers array
- // to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
+ synchronized (this)
+ {
+ //We make a copy of the receivers to avoid a race condition:
+ //http://jira.jboss.org/jira/browse/JBMESSAGING-505
+ //Note that we do not make a copy every time - only when the receivers have changed
- //TODO - we shouldn't be cloning an ArrayList for the delivery of each message
- //on the primary execution path!
-
- receiversCopy = new ArrayList(receivers.size());
- receiversCopy.addAll(receivers);
- initial = target;
- current = initial;
+ receiversCopy = new ArrayList(receivers);
+
+ if (target >= receiversCopy.size())
+ {
+ target = 0;
+ }
+
+ makeCopy = false;
+ }
+ }
+
+ if (receiversCopy.isEmpty())
+ {
+ return null;
}
Delivery del = null;
+
boolean selectorRejected = false;
+
+ int initial = target;
while (true)
{
- Receiver r = (Receiver)receiversCopy.get(current);
+ Receiver r = (Receiver)receiversCopy.get(target);
try
{
@@ -115,7 +127,9 @@
{
// deliver to the first receiver that accepts
del = d;
- shiftTarget(current);
+
+ incTarget();
+
break;
}
else
@@ -130,10 +144,10 @@
log.error("The receiver " + r + " is broken", t);
}
- current = (current + 1) % receiversCopy.size();
+ incTarget();
// if we've tried them all then we break
- if (current == initial)
+ if (target == initial)
{
break;
}
@@ -144,70 +158,55 @@
del = new SimpleDelivery(null, null, true, false);
}
- return del;
+ return del;
}
-
- public boolean add(Receiver r)
+
+ public synchronized boolean add(Receiver r)
{
- synchronized(receivers)
+ if (receivers.contains(r))
{
- if (receivers.contains(r))
- {
- return false;
- }
-
- receivers.add(r);
- target = 0;
+ return false;
}
- return true;
+
+ receivers.add(r);
+
+ makeCopy = true;
+
+ return true;
}
- public boolean remove(Receiver r)
+ public synchronized boolean remove(Receiver r)
{
- synchronized(receivers)
+ boolean removed = receivers.remove(r);
+
+ if (removed)
{
- boolean removed = receivers.remove(r);
-
- if (removed)
- {
- target = 0;
- }
-
- return removed;
+ makeCopy = true;
}
+
+ return removed;
}
- public void clear()
+ public synchronized void clear()
{
- synchronized(receivers)
- {
- receivers.clear();
- target = 0;
- }
+ receivers.clear();
+
+ makeCopy = true;
}
- public boolean contains(Receiver r)
+ public synchronized boolean contains(Receiver r)
{
- synchronized(receivers)
- {
- return receivers.contains(r);
- }
+ return receivers.contains(r);
}
- public Iterator iterator()
+ public synchronized Iterator iterator()
{
- synchronized(receivers)
- {
- return receivers.iterator();
- }
+ return receivers.iterator();
}
- public int getNumberOfReceivers()
+ public synchronized int getNumberOfReceivers()
{
- synchronized(receivers)
- {
- return receivers.size();
- }
+ return receivers.size();
}
// Public --------------------------------------------------------
@@ -216,22 +215,18 @@
// Protected -----------------------------------------------------
- protected void shiftTarget(int currentTarget)
+ // Private -------------------------------------------------------
+
+ private void incTarget()
{
- synchronized(receivers)
+ target++;
+
+ if (target == receiversCopy.size())
{
- int size = receivers.size();
- if (size == 0)
- {
- // target has already been reset by remove()
- return;
- }
- target = Math.max((target + 1) % size, (currentTarget + 1) % size);
+ target = 0;
}
}
- // Private -------------------------------------------------------
-
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java 2007-02-09 13:56:20 UTC (rev 2241)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java 2007-02-09 15:37:38 UTC (rev 2242)
@@ -29,7 +29,6 @@
import org.jboss.messaging.core.local.RoundRobinPointToPointRouter;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
-import org.jboss.messaging.core.message.SimpleMessageReference;
import org.jboss.messaging.core.message.SimpleMessageStore;
import org.jboss.messaging.core.plugin.contract.MessageStore;
import org.jboss.messaging.core.tx.Transaction;
@@ -486,41 +485,11 @@
r.gotRef = false;
}
}
-
-
// Private -------------------------------------------------------
- // Inner classes -------------------------------------------------
+ // Inner classes -------------------------------------------------
- class SimpleReceiver implements Receiver
- {
- boolean selectorMatches = true;
-
- boolean closed;
-
- boolean gotRef;
-
- public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
- {
- if (closed)
- {
- return null;
- }
-
- Delivery del = new SimpleDelivery(null, null, true, selectorMatches);
-
- if (selectorMatches)
- {
- gotRef = true;
- }
-
- return del;
- }
-
- }
-
-
class LockingReceiver implements Receiver
{
private Object lock;
@@ -558,6 +527,36 @@
return lock;
}
}
+
+ class SimpleReceiver implements Receiver
+ {
+ boolean selectorMatches = true;
+
+ boolean closed;
+
+ boolean gotRef;
+ public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
+ {
+ if (closed)
+ {
+ return null;
+ }
+
+ Delivery del = new SimpleDelivery(null, null, true, selectorMatches);
+
+ if (selectorMatches)
+ {
+ gotRef = true;
+ }
+
+ return del;
+ }
+
+ }
+
+
+
+
}
More information about the jboss-cvs-commits
mailing list