[Jboss-cvs] JBoss Messaging SVN: r1183 - in trunk: src/main/org/jboss/messaging/core/local tests tests/bin tests/src/org/jboss/test/messaging/core/local tests/src/org/jboss/test/messaging/jms

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Aug 3 07:33:23 EDT 2006


Author: timfox
Date: 2006-08-03 07:33:15 -0400 (Thu, 03 Aug 2006)
New Revision: 1183

Added:
   trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java
   trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
   trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
Removed:
   trunk/src/main/org/jboss/messaging/core/local/PointToPointRouter.java
Modified:
   trunk/src/main/org/jboss/messaging/core/local/Queue.java
   trunk/tests/bin/runtest
   trunk/tests/build.xml
   trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java
Log:
Made round robin default routing for queues and subscriptions



Added: trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/src/main/org/jboss/messaging/core/local/FirstReceiverPointToPointRouter.java	2006-08-03 11:33:15 UTC (rev 1183)
@@ -0,0 +1,179 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.local;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Routable;
+import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.tx.Transaction;
+
+/**
+ * 
+ * This router deliver the reference to a maximum of one of the router's receivers.
+ * It will always favour the first receiver in the internal list of receivers, but will retry
+ * the next one (and the next one...) if a previous one does not want to accept the message.
+ * If the router has several receivers (e.g. the case of multiple consumers on a queue)
+ * then if the consumers are fast then the first receiver will tend to get most or all of the references
+ * 
+ * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1174 $</tt>
+ * $Id: PointToPointRouter.java 1174 2006-08-02 14:14:32Z timfox $
+ */
+public class FirstReceiverPointToPointRouter implements Router
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(FirstReceiverPointToPointRouter.class);
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private boolean trace = log.isTraceEnabled();
+
+   List receivers;
+
+   // Constructors --------------------------------------------------
+
+   public FirstReceiverPointToPointRouter()
+   {
+      receivers = new ArrayList();
+   }
+
+   // Router implementation -----------------------------------------
+
+   public Set handle(DeliveryObserver observer, Routable routable, Transaction tx)
+   {
+      Set deliveries = new HashSet();
+      
+      boolean selectorRejected = false;
+      
+      synchronized(receivers)
+      {
+         for(Iterator i = receivers.iterator(); i.hasNext(); )
+         {
+            Receiver receiver = (Receiver)i.next();
+            
+            try
+            {
+               Delivery d = receiver.handle(observer, routable, tx);
+
+               if (trace) { log.trace("receiver " + receiver + " handled " + routable + " and returned " + d); }
+     
+               if (d != null && !d.isCancelled())
+               {
+                  if (d.isSelectorAccepted())
+                  {
+                     // deliver to the first receiver that accepts
+                     deliveries.add(d);
+                     break;
+                  }
+                  else
+                  {
+                     selectorRejected = true;
+                  }
+               }
+            }
+            catch(Throwable t)
+            {
+               // broken receiver - log the exception and ignore it
+               log.error("The receiver " + receiver + " is broken", t);
+            }
+         }
+      }
+      
+      if (deliveries.isEmpty() && selectorRejected)
+      {
+         deliveries.add(new SimpleDelivery(null, null, true, false));
+      }
+
+      return deliveries;
+   }
+
+   public boolean add(Receiver r)
+   {
+      synchronized(receivers)
+      {
+         if (receivers.contains(r))
+         {
+            return false;
+         }
+         receivers.add(r);
+      }
+      return true;
+   }
+
+
+   public boolean remove(Receiver r)
+   {
+      synchronized(receivers)
+      {
+         return receivers.remove(r);
+      }
+   }
+
+   public void clear()
+   {
+      synchronized(receivers)
+      {
+         receivers.clear();
+      }
+   }
+
+   public boolean contains(Receiver r)
+   {
+      synchronized(receivers)
+      {
+         return receivers.contains(r);
+      }
+   }
+
+   public Iterator iterator()
+   {
+      synchronized(receivers)
+      {
+         return receivers.iterator();
+      }
+   }
+
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------   
+}

Deleted: trunk/src/main/org/jboss/messaging/core/local/PointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/PointToPointRouter.java	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/src/main/org/jboss/messaging/core/local/PointToPointRouter.java	2006-08-03 11:33:15 UTC (rev 1183)
@@ -1,172 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.local;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.jboss.logging.Logger;
-import org.jboss.messaging.core.Delivery;
-import org.jboss.messaging.core.DeliveryObserver;
-import org.jboss.messaging.core.Receiver;
-import org.jboss.messaging.core.Routable;
-import org.jboss.messaging.core.Router;
-import org.jboss.messaging.core.SimpleDelivery;
-import org.jboss.messaging.core.tx.Transaction;
-
-/**
- * @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision$</tt>
- * $Id$
- */
-public class PointToPointRouter implements Router
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(PointToPointRouter.class);
-
-   // Static --------------------------------------------------------
-   
-   // Attributes ----------------------------------------------------
-   
-   private boolean trace = log.isTraceEnabled();
-
-   List receivers;
-
-   // Constructors --------------------------------------------------
-
-   public PointToPointRouter()
-   {
-      receivers = new ArrayList();
-   }
-
-   // Router implementation -----------------------------------------
-
-   public Set handle(DeliveryObserver observer, Routable routable, Transaction tx)
-   {
-      Set deliveries = new HashSet();
-      
-      boolean selectorRejected = false;
-      
-      synchronized(receivers)
-      {
-         for(Iterator i = receivers.iterator(); i.hasNext(); )
-         {
-            Receiver receiver = (Receiver)i.next();
-            
-            try
-            {
-               Delivery d = receiver.handle(observer, routable, tx);
-
-               if (trace) { log.trace("receiver " + receiver + " handled " + routable + " and returned " + d); }
-     
-               if (d != null && !d.isCancelled())
-               {
-                  if (d.isSelectorAccepted())
-                  {
-                     // deliver to the first receiver that accepts
-                     deliveries.add(d);
-                     break;
-                  }
-                  else
-                  {
-                     selectorRejected = true;
-                  }
-               }
-            }
-            catch(Throwable t)
-            {
-               // broken receiver - log the exception and ignore it
-               log.error("The receiver " + receiver + " is broken", t);
-            }
-         }
-      }
-      
-      if (deliveries.isEmpty() && selectorRejected)
-      {
-         deliveries.add(new SimpleDelivery(null, null, true, false));
-      }
-
-      return deliveries;
-   }
-
-   public boolean add(Receiver r)
-   {
-      synchronized(receivers)
-      {
-         if (receivers.contains(r))
-         {
-            return false;
-         }
-         receivers.add(r);
-      }
-      return true;
-   }
-
-
-   public boolean remove(Receiver r)
-   {
-      synchronized(receivers)
-      {
-         return receivers.remove(r);
-      }
-   }
-
-   public void clear()
-   {
-      synchronized(receivers)
-      {
-         receivers.clear();
-      }
-   }
-
-   public boolean contains(Receiver r)
-   {
-      synchronized(receivers)
-      {
-         return receivers.contains(r);
-      }
-   }
-
-   public Iterator iterator()
-   {
-      synchronized(receivers)
-      {
-         return receivers.iterator();
-      }
-   }
-
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-   
-   // Protected -----------------------------------------------------
-   
-   // Private -------------------------------------------------------
-   
-   // Inner classes -------------------------------------------------   
-}

Modified: trunk/src/main/org/jboss/messaging/core/local/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/Queue.java	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/src/main/org/jboss/messaging/core/local/Queue.java	2006-08-03 11:33:15 UTC (rev 1183)
@@ -58,7 +58,12 @@
                 QueuedExecutor executor)
    {      
       super(id, ms, pm, mm, true, recoverable, fullSize, pageSize, downCacheSize, executor);
-      router = new PointToPointRouter();
+      
+      //TODO make the policy configurable
+      //By default we use a router with a round robin policy for even distribution in the
+      //case of multiple consumers
+      router = new RoundRobinPointToPointRouter();
+      
       this.fullSize = fullSize;
       this.pageSize = pageSize;
       this.downCacheSize = downCacheSize;

Added: trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/src/main/org/jboss/messaging/core/local/RoundRobinPointToPointRouter.java	2006-08-03 11:33:15 UTC (rev 1183)
@@ -0,0 +1,233 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.local;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.jboss.logging.Logger;
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Routable;
+import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.tx.Transaction;
+
+/**
+ * 
+ * This router deliver the reference to a maximum of one of the router's receivers.
+ * 
+ * The router will always first try the next receiver in the list to the one it tried last time
+ * This gives a more balanced distribution than the FirstReceiverPointToPointRouter and is
+ * better suited when batching messages to consumers since we will end up with messages interleaved amongst
+ * consumers rather than in contiguous blocks.
+ *  
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1 $</tt>
+ * $Id: $
+ */
+public class RoundRobinPointToPointRouter implements Router
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(RoundRobinPointToPointRouter.class);
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+   
+   private boolean trace = log.isTraceEnabled();
+
+   //It's important that we're actually using an ArrayList for fast array access
+   protected ArrayList receivers;
+   
+   protected int pos;
+
+   // Constructors --------------------------------------------------
+
+   public RoundRobinPointToPointRouter()
+   {
+      receivers = new ArrayList();
+      
+      reset();
+   }
+
+   // Router implementation -----------------------------------------
+   
+   public Set handle(DeliveryObserver observer, Routable routable, Transaction tx)
+   {
+      Set deliveries = new HashSet();
+      
+      boolean selectorRejected = false;
+      
+      synchronized(receivers)
+      {
+         if (receivers.isEmpty())
+         {
+            return deliveries;
+         }
+         
+         int firstPos = pos;
+         
+         while (true)
+         {
+            Receiver receiver = (Receiver)receivers.get(pos);
+            
+            try
+            {
+               Delivery d = receiver.handle(observer, routable, tx);
+
+               if (trace) { log.trace("receiver " + receiver + " handled " + routable + " and returned " + d); }
+     
+               if (d != null && !d.isCancelled())
+               {
+                  if (d.isSelectorAccepted())
+                  {
+                     // deliver to the first receiver that accepts
+                     deliveries.add(d);
+                     
+                     incPos();
+                     
+                     break;
+                  }
+                  else
+                  {
+                     selectorRejected = true;
+                  }
+               }
+            }
+            catch(Throwable t)
+            {
+               // broken receiver - log the exception and ignore it
+               log.error("The receiver " + receiver + " is broken", t);
+            }
+            
+            incPos();
+            
+            //If we've tried them all then we break
+            
+            if (pos == firstPos)
+            {
+               break;
+            }            
+         }
+      }
+      
+      if (deliveries.isEmpty() && selectorRejected)
+      {
+         deliveries.add(new SimpleDelivery(null, null, true, false));
+      }
+
+      return deliveries;
+   }
+
+   public boolean add(Receiver r)
+   {
+      synchronized(receivers)
+      {
+         if (receivers.contains(r))
+         {
+            return false;
+         }
+         receivers.add(r);
+         
+         reset();
+      }
+      return true;
+   }
+
+
+   public boolean remove(Receiver r)
+   {
+      synchronized(receivers)
+      {
+         boolean removed = receivers.remove(r);
+         
+         if (removed)
+         {
+            reset();
+         }
+         
+         return removed;
+      }
+   }
+
+   public void clear()
+   {
+      synchronized(receivers)
+      {
+         receivers.clear();
+         
+         reset();
+      }
+   }
+
+   public boolean contains(Receiver r)
+   {
+      synchronized(receivers)
+      {
+         return receivers.contains(r);
+      }
+   }
+
+   public Iterator iterator()
+   {
+      synchronized(receivers)
+      {
+         return receivers.iterator();
+      }
+   }
+
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   protected void reset()
+   {
+      //Reset back to the first one
+      
+      pos = 0;
+   }
+   
+   protected void incPos()
+   {
+      pos++;
+      
+      //Wrap around
+      
+      if (pos == receivers.size())
+      {
+         pos = 0;
+      }
+   }
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------   
+}
+

Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/tests/bin/runtest	2006-08-03 11:33:15 UTC (rev 1183)
@@ -94,7 +94,7 @@
     shift
 done
 
-JAVA_OPTS="-Xmx1024M $JAVA_OPTS -Dmodule.output=$reldir/../output $REMOTE_TEST -Dtest.database=$TEST_DATABASE -Dtest.serialization=$TEST_SERIALIZATION"
+JAVA_OPTS="-Xmx1024M $JAVA_OPTS -Dmodule.output=$reldir/../output $REMOTE_TEST -Dtest.database=$TEST_DATABASE -Dtest.serialization=$TEST_SERIALIZATION -Dbuild.lib=../../output/lib"
 
 if [ "$TARGET_TEST" != "" ]; then
    TARGET_TEST="-t $TARGET_TEST"

Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/tests/build.xml	2006-08-03 11:33:15 UTC (rev 1183)
@@ -163,6 +163,7 @@
    -->
 
    <path id="test.execution.classpath">
+      <pathelement location="../lib/jnp-client.jar"/>
       <pathelement location="${tests.root}/etc"/>
       <pathelement location="${build.tests.classes}"/>
       <pathelement location="${project.root}/src/etc"/> <!-- server's configuration files -->

Added: trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/tests/src/org/jboss/test/messaging/core/local/RoundRobinPointToPointRouterTest.java	2006-08-03 11:33:15 UTC (rev 1183)
@@ -0,0 +1,490 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.test.messaging.core.local;
+
+import java.util.Set;
+
+import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.DeliveryObserver;
+import org.jboss.messaging.core.MessageReference;
+import org.jboss.messaging.core.Receiver;
+import org.jboss.messaging.core.Routable;
+import org.jboss.messaging.core.Router;
+import org.jboss.messaging.core.SimpleDelivery;
+import org.jboss.messaging.core.local.RoundRobinPointToPointRouter;
+import org.jboss.messaging.core.plugin.SimpleMessageReference;
+import org.jboss.messaging.core.tx.Transaction;
+import org.jboss.test.messaging.MessagingTestCase;
+
+
+public class RoundRobinPointToPointRouterTest extends MessagingTestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   
+   // Attributes ----------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public RoundRobinPointToPointRouterTest(String name)
+   {
+      super(name);
+   }
+
+   // ChannelTestBase overrides  ------------------------------------
+
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Public --------------------------------------------------------
+   
+   public void testAllAccepting()
+   {
+      Router router = new RoundRobinPointToPointRouter();
+      
+      final int numReceivers = 10;
+      
+      SimpleReceiver[] receivers = new SimpleReceiver[numReceivers];
+      
+      for (int i = 0; i < numReceivers; i++)
+      {
+         receivers[i] = new SimpleReceiver();
+         
+         router.add(receivers[i]);
+      }
+      
+      MessageReference ref = new SimpleMessageReference();
+      
+      Set dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 0);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 1);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 2);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 3);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 4);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 5);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 6);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 7);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 8);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 9);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 0);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 1);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 2);
+      resetReceivers(receivers);
+      
+   }
+   
+   public void testSomeClosed()
+   {
+      Router router = new RoundRobinPointToPointRouter();
+      
+      final int numReceivers = 10;
+      
+      SimpleReceiver[] receivers = new SimpleReceiver[numReceivers];
+      
+      for (int i = 0; i < numReceivers; i++)
+      {
+         receivers[i] = new SimpleReceiver();
+         
+         router.add(receivers[i]);
+      }
+      
+      receivers[2].closed = true;
+      
+      receivers[5].closed = true;
+      receivers[6].closed = true;
+      
+      receivers[9].closed = true;
+      
+      MessageReference ref = new SimpleMessageReference();
+      
+      Set dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 0);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 1);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 3);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 4);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 7);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 8);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 0);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 1);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 3);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 4);
+      resetReceivers(receivers);
+      
+      
+   }
+   
+   public void testAllClosed()
+   {
+      Router router = new RoundRobinPointToPointRouter();
+      
+      final int numReceivers = 10;
+      
+      SimpleReceiver[] receivers = new SimpleReceiver[numReceivers];
+      
+      for (int i = 0; i < numReceivers; i++)
+      {
+         receivers[i] = new SimpleReceiver();
+         
+         receivers[i].closed = true;
+         
+         router.add(receivers[i]);
+      }
+      
+      
+      MessageReference ref = new SimpleMessageReference();
+      
+      Set dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(0, dels.size());
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(0, dels.size());
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(0, dels.size());
+      
+      
+   }
+   
+   public void testSomeNoSelectorMatch()
+   {
+      Router router = new RoundRobinPointToPointRouter();
+      
+      final int numReceivers = 10;
+      
+      SimpleReceiver[] receivers = new SimpleReceiver[numReceivers];
+      
+      for (int i = 0; i < numReceivers; i++)
+      {
+         receivers[i] = new SimpleReceiver();
+         
+         router.add(receivers[i]);
+      }
+      
+      receivers[2].selectorMatches = false;
+      
+      receivers[5].selectorMatches = false;
+      receivers[6].selectorMatches = false;
+      
+      receivers[9].selectorMatches = false;
+      
+      MessageReference ref = new SimpleMessageReference();
+      
+      Set dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 0);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 1);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 3);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 4);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 7);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 8);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 0);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 1);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 3);
+      resetReceivers(receivers);
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      checkReceiverGotRef(receivers, 4);
+      resetReceivers(receivers);
+      
+   }
+   
+   public void testAllNoSelectorMatch()
+   {
+      Router router = new RoundRobinPointToPointRouter();
+      
+      final int numReceivers = 10;
+      
+      SimpleReceiver[] receivers = new SimpleReceiver[numReceivers];
+      
+      for (int i = 0; i < numReceivers; i++)
+      {
+         receivers[i] = new SimpleReceiver();
+         
+         receivers[i].selectorMatches = false;
+         
+         router.add(receivers[i]);
+      }
+      
+      
+      MessageReference ref = new SimpleMessageReference();
+      
+      Set dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(1, dels.size());
+      assertFalse(((SimpleDelivery)dels.iterator().next()).isSelectorAccepted());
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertFalse(((SimpleDelivery)dels.iterator().next()).isSelectorAccepted());
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertFalse(((SimpleDelivery)dels.iterator().next()).isSelectorAccepted());
+           
+   }
+   
+   public void testNoReceivers()
+   {
+      Router router = new RoundRobinPointToPointRouter();
+
+      MessageReference ref = new SimpleMessageReference();
+      
+      Set dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(0, dels.size());
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(0, dels.size());
+      
+      dels = router.handle(null, ref, null);
+      assertNotNull(dels);
+      assertEquals(0, dels.size());     
+   }
+   
+   protected void checkReceiverGotRef(SimpleReceiver[] receivers, int pos)
+   {
+      log.info("checkReceiverGotRef:" + pos);
+      for (int i = 0; i < receivers.length; i++)
+      {
+         SimpleReceiver r = receivers[i];
+         
+         if (i == pos)
+         {
+            assertTrue(r.gotRef);
+         }
+         else
+         {
+            assertFalse(r.gotRef);
+         }
+      }
+   }
+   
+   protected void resetReceivers(SimpleReceiver[] receivers)
+   {
+      for (int i = 0; i < receivers.length; i++)
+      {
+         SimpleReceiver r = receivers[i];
+         
+         r.gotRef = false;
+      }
+   }
+
+   // Package protected ---------------------------------------------
+   
+   // Protected -----------------------------------------------------
+   
+   // Private -------------------------------------------------------
+   
+   // Inner classes -------------------------------------------------   
+   
+   class SimpleReceiver implements Receiver
+   {
+      boolean selectorMatches = true;
+      
+      boolean closed;
+      
+      boolean gotRef;
+
+      public Delivery handle(DeliveryObserver observer, Routable routable, Transaction tx)
+      {
+         if (closed)
+         {
+            return null;
+         }
+         
+         Delivery del = new SimpleDelivery(null, null, true, selectorMatches);
+         
+         if (selectorMatches)
+         {
+            gotRef = true;
+         }
+                  
+         return del;
+      }
+      
+   }
+}
+

Modified: trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java	2006-08-03 00:33:07 UTC (rev 1182)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ManifestTest.java	2006-08-03 11:33:15 UTC (rev 1183)
@@ -68,7 +68,9 @@
    public void testManifestEntries() throws Exception
    {
       Properties props = System.getProperties();
-      String userDir = props.getProperty("jboss.jms.lib");
+      String userDir = props.getProperty("build.lib");
+      
+      log.info("userDir is " + userDir);
 
       // The jar must be there
       File file = new File(userDir, "jboss-messaging.jar");




More information about the jboss-cvs-commits mailing list