[jboss-cvs] JBoss Messaging SVN: r2242 - in trunk: tests/src/org/jboss/test/messaging/core/local and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 9 10:37:38 EST 2007


Author: timfox
Date: 2007-02-09 10:37:38 -0500 (Fri, 09 Feb 2007)
New Revision: 2242

Modified:
   trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java
   trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
   trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-505


Modified: trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java	2007-02-09 13:56:20 UTC (rev 2241)
+++ trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java	2007-02-09 15:37:38 UTC (rev 2242)
@@ -58,36 +58,41 @@
    
    private boolean trace = log.isTraceEnabled();
 
-   List receivers;
+   private List receivers;
+   
+   private volatile boolean makeCopy;
+   
+   private ArrayList receiversCopy;
 
    // Constructors --------------------------------------------------
 
    public FirstReceiverPointToPointRouter()
    {
       receivers = new ArrayList();
+      
+      makeCopy = true;
    }
 
    // Router implementation -----------------------------------------
 
    public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
    {
-      ArrayList receiversCopy;
-
-      synchronized(receivers)
+      if (makeCopy)
       {
-         if (receivers.isEmpty())
-         {
-            return null;
+         synchronized (this)
+         {         
+            //We make a copy of the receivers to avoid a race condition:
+            //http://jira.jboss.org/jira/browse/JBMESSAGING-505
+            //Note that we do not make a copy every time - only when the receivers have changed
+         
+            receiversCopy = new ArrayList(receivers);
+            
+            makeCopy = false;
          }
-
-         // try to release the lock as quickly as possible and make a copy of the receivers array
-         // to avoid deadlock
-
-         receiversCopy = new ArrayList(receivers.size());
-         receiversCopy.addAll(receivers);
-      }
-
+      }    
+      
       Delivery del = null;
+      
       boolean selectorRejected = false;
 
       for(Iterator i = receiversCopy.iterator(); i.hasNext(); )
@@ -130,58 +135,53 @@
    }
 
 
-   public boolean add(Receiver r)
+   public synchronized boolean add(Receiver r)
    {
-      synchronized(receivers)
+      if (receivers.contains(r))
       {
-         if (receivers.contains(r))
-         {
-            return false;
-         }
-         receivers.add(r);
+         return false;
       }
+      
+      receivers.add(r);
+      
+      makeCopy = true;
+      
       return true;
    }
 
 
-   public boolean remove(Receiver r)
-   {
-      synchronized(receivers)
+   public synchronized boolean remove(Receiver r)
+   {            
+      boolean removed = receivers.remove(r);      
+      
+      if (removed)
       {
-         return receivers.remove(r);
+         makeCopy = true;
       }
+      
+      return removed;
    }
 
-   public void clear()
+   public synchronized void clear()
    {
-      synchronized(receivers)
-      {
-         receivers.clear();
-      }
+      receivers.clear();
+      
+      makeCopy = true;
    }
 
-   public boolean contains(Receiver r)
+   public synchronized boolean contains(Receiver r)
    {
-      synchronized(receivers)
-      {
-         return receivers.contains(r);
-      }
+      return receivers.contains(r);      
    }
 
-   public Iterator iterator()
+   public synchronized Iterator iterator()
    {
-      synchronized(receivers)
-      {
-         return receivers.iterator();
-      }
+      return receivers.iterator();      
    }
    
-   public int getNumberOfReceivers()
+   public synchronized int getNumberOfReceivers()
    {
-      synchronized(receivers)
-      {
-         return receivers.size();
-      }
+      return receivers.size();      
    }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2007-02-09 13:56:20 UTC (rev 2241)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2007-02-09 15:37:38 UTC (rev 2242)
@@ -58,50 +58,62 @@
    private boolean trace = log.isTraceEnabled();
 
    // it's important that we're actually using an ArrayList for fast array access
-   protected ArrayList receivers;
+   private ArrayList receivers;
    
-   protected int target;
-
+   private int target;
+   
+   private volatile boolean makeCopy;
+   
+   private ArrayList receiversCopy;
+   
    // Constructors --------------------------------------------------
 
    public RoundRobinPointToPointRouter()
    {
       receivers = new ArrayList();
+      
       target = 0;
+      
+      makeCopy = true;
    }
 
    // Router implementation -----------------------------------------
    
    public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
-   {
-      int initial, current;
-      ArrayList receiversCopy;
-
-      synchronized(receivers)
+   {             
+      if (makeCopy)
       {
-         if (receivers.isEmpty())
-         {
-            return null;
-         }
-
-         // try to release the lock as quickly as possible and make a copy of the receivers array
-         // to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
+         synchronized (this)
+         {         
+            //We make a copy of the receivers to avoid a race condition:
+            //http://jira.jboss.org/jira/browse/JBMESSAGING-505
+            //Note that we do not make a copy every time - only when the receivers have changed
          
-         //TODO - we shouldn't be cloning an ArrayList for the delivery of each message
-         //on the primary execution path! 
-
-         receiversCopy = new ArrayList(receivers.size());
-         receiversCopy.addAll(receivers);
-         initial = target;
-         current = initial;
+            receiversCopy = new ArrayList(receivers);
+            
+            if (target >= receiversCopy.size())
+            {
+               target = 0;
+            }
+         
+            makeCopy = false;
+         }
+      }      
+      
+      if (receiversCopy.isEmpty())
+      {
+         return null;
       }
 
       Delivery del = null;
+      
       boolean selectorRejected = false;
+      
+      int initial = target;
 
       while (true)
       {
-         Receiver r = (Receiver)receiversCopy.get(current);
+         Receiver r = (Receiver)receiversCopy.get(target);
 
          try
          {
@@ -115,7 +127,9 @@
                {
                   // deliver to the first receiver that accepts
                   del = d;
-                  shiftTarget(current);
+                  
+                  incTarget();
+                                                     
                   break;
                }
                else
@@ -130,10 +144,10 @@
             log.error("The receiver " + r + " is broken", t);
          }
 
-         current = (current + 1) % receiversCopy.size();
+         incTarget();
 
          // if we've tried them all then we break
-         if (current == initial)
+         if (target == initial)
          {
             break;
          }
@@ -144,70 +158,55 @@
          del = new SimpleDelivery(null, null, true, false);
       }
 
-      return del;
+      return del;      
    }
-
-   public boolean add(Receiver r)
+   
+   public synchronized boolean add(Receiver r)
    {
-      synchronized(receivers)
+      if (receivers.contains(r))
       {
-         if (receivers.contains(r))
-         {
-            return false;
-         }
-
-         receivers.add(r);
-         target = 0;
+         return false;
       }
-      return true;
+
+      receivers.add(r);
+      
+      makeCopy = true;
+      
+      return true;      
    }
 
-   public boolean remove(Receiver r)
+   public synchronized boolean remove(Receiver r)
    {
-      synchronized(receivers)
+      boolean removed = receivers.remove(r);
+      
+      if (removed)
       {
-         boolean removed = receivers.remove(r);
-         
-         if (removed)
-         {
-            target = 0;
-         }
-         
-         return removed;
+         makeCopy = true;
       }
+      
+      return removed;      
    }
 
-   public void clear()
+   public synchronized void clear()
    {
-      synchronized(receivers)
-      {
-         receivers.clear();
-         target = 0;
-      }
+      receivers.clear();
+      
+      makeCopy = true;    
    }
 
-   public boolean contains(Receiver r)
+   public synchronized boolean contains(Receiver r)
    {
-      synchronized(receivers)
-      {
-         return receivers.contains(r);
-      }
+      return receivers.contains(r);     
    }
 
-   public Iterator iterator()
+   public synchronized Iterator iterator()
    {
-      synchronized(receivers)
-      {
-         return receivers.iterator();
-      }
+      return receivers.iterator();      
    }
    
-   public int getNumberOfReceivers()
+   public synchronized int getNumberOfReceivers()
    {
-      synchronized(receivers)
-      {
-         return receivers.size();
-      }
+      return receivers.size();      
    }
 
    // Public --------------------------------------------------------
@@ -216,22 +215,18 @@
    
    // Protected -----------------------------------------------------
 
-   protected void shiftTarget(int currentTarget)
+   // Private -------------------------------------------------------
+   
+   private void incTarget()
    {
-      synchronized(receivers)
+      target++;
+      
+      if (target == receiversCopy.size())
       {
-         int size = receivers.size();
-         if (size == 0)
-         {
-            // target has already been reset by remove()
-            return;
-         }
-         target = Math.max((target + 1) % size, (currentTarget + 1) % size);
+         target = 0;
       }
    }
    
-   // Private -------------------------------------------------------
-   
    // Inner classes -------------------------------------------------   
 }
 

Modified: trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java	2007-02-09 13:56:20 UTC (rev 2241)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java	2007-02-09 15:37:38 UTC (rev 2242)
@@ -29,7 +29,6 @@
 import org.jboss.messaging.core.local.RoundRobinPointToPointRouter;
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.message.MessageReference;
-import org.jboss.messaging.core.message.SimpleMessageReference;
 import org.jboss.messaging.core.message.SimpleMessageStore;
 import org.jboss.messaging.core.plugin.contract.MessageStore;
 import org.jboss.messaging.core.tx.Transaction;
@@ -486,41 +485,11 @@
          r.gotRef = false;
       }
    }
-
-
    
    // Private -------------------------------------------------------
    
-   // Inner classes -------------------------------------------------   
+   // Inner classes -------------------------------------------------  
    
-   class SimpleReceiver implements Receiver
-   {
-      boolean selectorMatches = true;
-      
-      boolean closed;
-      
-      boolean gotRef;
-
-      public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
-      {
-         if (closed)
-         {
-            return null;
-         }
-         
-         Delivery del = new SimpleDelivery(null, null, true, selectorMatches);
-         
-         if (selectorMatches)
-         {
-            gotRef = true;
-         }
-                  
-         return del;
-      }
-      
-   }
-
-
    class LockingReceiver implements Receiver
    {
       private Object lock;
@@ -558,6 +527,36 @@
          return lock;
       }
    }
+   
+   class SimpleReceiver implements Receiver
+   {
+      boolean selectorMatches = true;
+      
+      boolean closed;
+      
+      boolean gotRef;
 
+      public Delivery handle(DeliveryObserver observer, MessageReference ref, Transaction tx)
+      {
+         if (closed)
+         {
+            return null;
+         }
+         
+         Delivery del = new SimpleDelivery(null, null, true, selectorMatches);
+         
+         if (selectorMatches)
+         {
+            gotRef = true;
+         }
+                  
+         return del;
+      }
+      
+   }
+
+
+   
+
 }
 




More information about the jboss-cvs-commits mailing list