[Jboss-cvs] JBoss Messaging SVN: r1217 - in trunk: src/main/org/jboss/messaging/core/local tests/src/org/jboss/test/messaging/core/local

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Aug 9 04:05:21 EDT 2006


Author: ovidiu.feodorov at jboss.com
Date: 2006-08-09 04:05:18 -0400 (Wed, 09 Aug 2006)
New Revision: 1217

Modified:
   trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
   trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
Log:
RoundRobinPointToPointRouter re-implementation and extra deadlock test

Modified: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2006-08-08 19:52:15 UTC (rev 1216)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2006-08-09 08:05:18 UTC (rev 1217)
@@ -25,6 +25,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.Collections;
 
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.Delivery;
@@ -45,6 +46,7 @@
  * consumers rather than in contiguous blocks.
  *  
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
  * @version <tt>$Revision: 1 $</tt>
  * $Id: $
  */
@@ -63,51 +65,51 @@
    // it's important that we're actually using an ArrayList for fast array access
    protected ArrayList receivers;
    
-   protected int pos;
+   protected int target;
 
    // Constructors --------------------------------------------------
 
    public RoundRobinPointToPointRouter()
    {
       receivers = new ArrayList();
-      reset();
+      target = 0;
    }
 
    // Router implementation -----------------------------------------
    
    public Set handle(DeliveryObserver observer, Routable routable, Transaction tx)
    {
-      Set deliveries = new HashSet();
-      boolean selectorRejected = false;
-      ArrayList receiversCopy = null;
-      int firstPos;
-      
+      int initial, current;
+      ArrayList receiversCopy;
+
       synchronized(receivers)
       {
          if (receivers.isEmpty())
          {
-            return deliveries;
+            return Collections.EMPTY_SET;
          }
-         // make a copy to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
-         int crtSize = receivers.size();
-         receiversCopy = new ArrayList(crtSize);
+
+         // try to release the lock as quickly as possible and make a copy of the receivers array
+         // to avoid deadlock (http://jira.jboss.org/jira/browse/JBMESSAGING-491)
+
+         receiversCopy = new ArrayList(receivers.size());
          receiversCopy.addAll(receivers);
-         if (pos >= crtSize)
-         {
-            pos = 0;
-         }
-         firstPos = pos;
+         initial = target;
+         current = initial;
       }
 
+      Set deliveries = new HashSet();;
+      boolean selectorRejected = false;
+
       while (true)
       {
-         Receiver receiver = (Receiver)receiversCopy.get(pos);
+         Receiver r = (Receiver)receiversCopy.get(current);
 
          try
          {
-            Delivery d = receiver.handle(observer, routable, tx);
+            Delivery d = r.handle(observer, routable, tx);
 
-            if (trace) { log.trace("receiver " + receiver + " handled " + routable + " and returned " + d); }
+            if (trace) { log.trace("receiver " + r + " handled " + routable + " and returned " + d); }
 
             if (d != null && !d.isCancelled())
             {
@@ -115,7 +117,7 @@
                {
                   // deliver to the first receiver that accepts
                   deliveries.add(d);
-                  incPos();
+                  shiftTarget(current);
                   break;
                }
                else
@@ -127,13 +129,13 @@
          catch(Throwable t)
          {
             // broken receiver - log the exception and ignore it
-            log.error("The receiver " + receiver + " is broken", t);
+            log.error("The receiver " + r + " is broken", t);
          }
 
-         incPos();
+         current = (current + 1) % receiversCopy.size();
 
          // if we've tried them all then we break
-         if (pos == firstPos)
+         if (current == initial)
          {
             break;
          }
@@ -155,14 +157,13 @@
          {
             return false;
          }
+
          receivers.add(r);
-         
-         reset();
+         target = 0;
       }
       return true;
    }
 
-
    public boolean remove(Receiver r)
    {
       synchronized(receivers)
@@ -171,7 +172,7 @@
          
          if (removed)
          {
-            reset();
+            target = 0;
          }
          
          return removed;
@@ -183,8 +184,7 @@
       synchronized(receivers)
       {
          receivers.clear();
-         
-         reset();
+         target = 0;
       }
    }
 
@@ -204,27 +204,18 @@
       }
    }
 
-
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------
    
    // Protected -----------------------------------------------------
-   
-   protected void reset()
+
+   protected void shiftTarget(int currentTarget)
    {
-      // Reset back to the first one
-      pos = 0;
-   }
-   
-   protected void incPos()
-   {
-      pos++;
-      
-      // Wrap around
-      if (pos >= receivers.size())
+      synchronized(receivers)
       {
-         pos = 0;
+         int size = receivers.size();
+         target = Math.max((target + 1) % size, (currentTarget + 1) % size);
       }
    }
    

Modified: trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java	2006-08-08 19:52:15 UTC (rev 1216)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java	2006-08-09 08:05:18 UTC (rev 1217)
@@ -35,7 +35,12 @@
 import org.jboss.messaging.core.tx.Transaction;
 import org.jboss.test.messaging.MessagingTestCase;
 
-
+/**
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @version <tt>$Revision: 1 $</tt>
+ * $Id: $
+ */
 public class RoundRobinPointToPointRouterTest extends MessagingTestCase
 {
    // Constants -----------------------------------------------------
@@ -423,14 +428,58 @@
       assertNotNull(dels);
       assertEquals(0, dels.size());     
    }
+
+
+   /**
+    * http://jira.jboss.org/jira/browse/JBMESSAGING-491
+    */
+   public void testDeadlock() throws Exception
+   {
+      final Router router = new RoundRobinPointToPointRouter();
+
+      LockingReceiver receiver = new LockingReceiver();
+      router.add(receiver);
+
+      final Thread t = new Thread(new Runnable()
+      {
+         public void run()
+         {
+            // sends the message to the router on a separate thread
+            router.handle(null, new SimpleMessageReference(), null);
+         }
+      }, "Message sending thread");
+
+      // start the sending tread, which will immediately grab the router's "receivers" lock, and it
+      // will sleep for 3 seconds before attempting to grab LockingReceiver's lock.
+      t.start();
+
+
+      // in the mean time, the main thread immediately grabs receiver's lock ...
+
+      synchronized(receiver.getLock())
+      {
+         // ... sleeps for 500 ms to allow sender thread time to grab router's "receivers" lock
+         Thread.sleep(500);
+
+         // ... and try to remove the receiver form router
+         router.remove(receiver);
+      }
+
+      // normally, receiver removal should be immediate, as the router releases receiver's lock
+      // immediately, so test should complete. Pre-JBMESSAGING-491, the test deadlocks.
+   }
    
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+
    protected void checkReceiverGotRef(SimpleReceiver[] receivers, int pos)
    {
       log.info("checkReceiverGotRef:" + pos);
       for (int i = 0; i < receivers.length; i++)
       {
          SimpleReceiver r = receivers[i];
-         
+
          if (i == pos)
          {
             assertTrue(r.gotRef);
@@ -441,21 +490,19 @@
          }
       }
    }
-   
+
    protected void resetReceivers(SimpleReceiver[] receivers)
    {
       for (int i = 0; i < receivers.length; i++)
       {
          SimpleReceiver r = receivers[i];
-         
+
          r.gotRef = false;
       }
    }
 
-   // Package protected ---------------------------------------------
+
    
-   // Protected -----------------------------------------------------
-   
    // Private -------------------------------------------------------
    
    // Inner classes -------------------------------------------------   
@@ -486,5 +533,45 @@
       }
       
    }
+
+
+   class LockingReceiver implements Receiver
+   {
+      private Object lock;
+
+      public LockingReceiver()
+      {
+         lock = new Object();
+      }
+
+      public Delivery handle(DeliveryObserver observer, Routable routable, Transaction tx)
+      {
+         // The delivering thread needs to grab the receiver's lock to complete delivery; this
+         // is how Messaging receivers are written, anyway. We simulate the race condition by
+         // putting the sending thread to sleep for 3 seconds before allowing it to attempt to
+         // grab the lock
+
+         try
+         {
+            Thread.sleep(3000);
+         }
+         catch(InterruptedException e)
+         {
+            // this shouldn't happen in the test
+            return null;
+         }
+
+         synchronized(lock)
+         {
+            return new SimpleDelivery(null, null, true, true);
+         }
+      }
+
+      public Object getLock()
+      {
+         return lock;
+      }
+   }
+
 }
 




More information about the jboss-cvs-commits mailing list