[hornetq-commits] JBoss hornetq SVN: r8283 - in trunk: examples/javaee/ejb-jms-transaction/server and 30 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 13 15:52:22 EST 2009


Author: timfox
Date: 2009-11-13 15:52:21 -0500 (Fri, 13 Nov 2009)
New Revision: 8283

Removed:
   trunk/src/main/org/hornetq/core/server/Distributor.java
   trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
   trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
Modified:
   trunk/docs/user-manual/en/configuration-index.xml
   trunk/docs/user-manual/en/queue-attributes.xml
   trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml
   trunk/examples/javaee/hajndi/config/hornetq-queues.xml
   trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml
   trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml
   trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml
   trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml
   trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml
   trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml
   trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml
   trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml
   trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml
   trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml
   trunk/src/config/common/schema/hornetq-configuration.xsd
   trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
   trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
   trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
   trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
   trunk/src/config/trunk/clustered/hornetq-configuration.xml
   trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
   trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
   trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
   trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
   trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java
Log:
refactored queue delivery logic

Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/docs/user-manual/en/configuration-index.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -775,15 +775,8 @@
                             <entry>boolean</entry>
                             <entry>whether to treat the queue as a last value queue</entry>
                             <entry>false</entry>
-                        </row>
+                        </row>                        
                         <row>
-                            <entry><link linkend="queue-attributes.address-settings"
-                                    >address-settings.distribution-policy-class</link></entry>
-                            <entry>String</entry>
-                            <entry>the class to use for distributing messages to a consumer</entry>
-                            <entry>RoundRobinDistributor</entry>
-                        </row>
-                        <row>
                             <entry><link linkend="paging"
                                 >address-settings.page-size-bytes</link></entry>
                             <entry>Long</entry>

Modified: trunk/docs/user-manual/en/queue-attributes.xml
===================================================================
--- trunk/docs/user-manual/en/queue-attributes.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/docs/user-manual/en/queue-attributes.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -95,8 +95,7 @@
         &lt;max-delivery-attempts>3&lt;/max-delivery-attempts>
         &lt;redelivery-delay>5000&lt;/redelivery-delay>
         &lt;expiry-address>jms.queue.expiryQueue&lt;/expiry-address>
-        &lt;last-value-queue>true&lt;/last-value-queue>
-        &lt;distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor&lt;/distribution-policy-class>
+        &lt;last-value-queue>true&lt;/last-value-queue>        
         &lt;max-size-bytes>100000&lt;/max-size-bytes>
         &lt;page-size-bytes>20000&lt;/page-size-bytes>
         &lt;redistribution-delay>0&lt;/redistribution-delay>
@@ -123,9 +122,6 @@
             see <link linkend="message-expiry.configuring">here</link>.</para>
         <para><literal>last-value-queue</literal> defines whether a queue only uses last values or
             not. see <link linkend="last-value-queues">here</link>.</para>
-        <para><literal>distribution-policy-class</literal> defines the class to use for distribution
-            of messages by a queue to consumers. By default this is <literal
-                >org.hornetq.core.server.impl.RoundRobinDistributor</literal>.</para>
         <para><literal>max-size-bytes</literal> and <literal>page-size-bytes</literal> are used to
             set paging on an address. This is explained <link linkend="paging">here</link>.</para>
         <para><literal>redistribution-delay</literal> defines how long to wait when the last

Modified: trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -51,8 +51,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/hajndi/config/hornetq-queues.xml
===================================================================
--- trunk/examples/javaee/hajndi/config/hornetq-queues.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/hajndi/config/hornetq-queues.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -17,8 +17,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -54,8 +54,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -51,8 +51,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd	2009-11-13 20:52:21 UTC (rev 8283)
@@ -454,9 +454,7 @@
         <xsd:element maxOccurs="1" minOccurs="0" name="page-size-bytes" type="xsd:int">
         </xsd:element>
         <xsd:element maxOccurs="1" minOccurs="0" name="address-full-policy" type="addressFullMessagePolicyType">
-		</xsd:element>        
-        <xsd:element maxOccurs="1" minOccurs="0" name="distribution-policy-class" type="xsd:string">
-        </xsd:element>
+		</xsd:element>                
         <xsd:element maxOccurs="1" minOccurs="0" name="message-counter-history-day-limit" type="xsd:int">
         </xsd:element>
         <xsd:element maxOccurs="1" minOccurs="0" name="last-value-queue" type="xsd:boolean">

Modified: trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/jboss-as/clustered/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -82,8 +82,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -56,8 +56,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -61,8 +61,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -35,8 +35,7 @@
          <expiry-address>jms.queue.ExpiryQueue</expiry-address>
          <redelivery-delay>0</redelivery-delay>
          <max-size-bytes>-1</max-size-bytes>
-         <page-size-bytes>10485760</page-size-bytes>
-         <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+         <page-size-bytes>10485760</page-size-bytes>         
          <message-counter-history-day-limit>10</message-counter-history-day-limit>
       </address-setting>
    </address-settings>

Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -61,8 +61,7 @@
 			<expiry-address>jms.queue.ExpiryQueue</expiry-address>
 			<redelivery-delay>0</redelivery-delay>
 			<max-size-bytes>-1</max-size-bytes>
-			<page-size-bytes>10485760</page-size-bytes>
-			<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+			<page-size-bytes>10485760</page-size-bytes>			
 			<message-counter-history-day-limit>10</message-counter-history-day-limit>
 		</address-setting>
 	</address-settings>

Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml	2009-11-13 20:52:21 UTC (rev 8283)
@@ -35,8 +35,7 @@
 			<expiry-address>jms.queue.ExpiryQueue</expiry-address>
 			<redelivery-delay>0</redelivery-delay>
 			<max-size-bytes>-1</max-size-bytes>
-			<page-size-bytes>10485760</page-size-bytes>
-			<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+			<page-size-bytes>10485760</page-size-bytes>			
 			<message-counter-history-day-limit>10</message-counter-history-day-limit>
 		</address-setting>
 	</address-settings>

Modified: trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -30,7 +30,7 @@
 public class AddressSettingsDeployer extends XmlDeployer
 {
    private static final Logger log = Logger.getLogger(AddressSettingsDeployer.class);
-   
+
    private static final String DEAD_LETTER_ADDRESS_NODE_NAME = "dead-letter-address";
 
    private static final String EXPIRY_ADDRESS_NODE_NAME = "expiry-address";
@@ -45,14 +45,12 @@
 
    private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
 
-   private static final String DISTRIBUTION_POLICY_CLASS_NODE_NAME = "distribution-policy-class";
-
    private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME = "message-counter-history-day-limit";
 
    private static final String LVQ_NODE_NAME = "last-value-queue";
-   
+
    private static final String REDISTRIBUTION_DELAY_NODE_NAME = "redistribution-delay";
-   
+
    private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route";
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -85,7 +83,7 @@
     * @throws Exception .
     */
    public void deploy(Node node) throws Exception
-   {      
+   {
       String match = node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
 
       NodeList children = node.getChildNodes();
@@ -118,10 +116,6 @@
          {
             addressSettings.setPageSizeBytes(Integer.valueOf(child.getTextContent()));
          }
-         else if (DISTRIBUTION_POLICY_CLASS_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
-         {
-            addressSettings.setDistributionPolicyClass(child.getTextContent());
-         }
          else if (MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
          {
             addressSettings.setMessageCounterHistoryDayLimit(Integer.valueOf(child.getTextContent()));
@@ -142,7 +136,7 @@
             else if (value.equals(AddressFullMessagePolicy.PAGE.toString()))
             {
                policy = AddressFullMessagePolicy.PAGE;
-            }            
+            }
             addressSettings.setAddressFullMessagePolicy(policy);
          }
          else if (LVQ_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
@@ -154,11 +148,11 @@
             addressSettings.setMaxDeliveryAttempts(Integer.valueOf(child.getTextContent().trim()));
          }
          else if (REDISTRIBUTION_DELAY_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
-         {           
+         {
             addressSettings.setRedistributionDelay(Long.valueOf(child.getTextContent().trim()));
          }
          else if (SEND_TO_DLA_ON_NO_ROUTE.equalsIgnoreCase(child.getNodeName()))
-         {           
+         {
             addressSettings.setSendToDLAOnNoRoute(Boolean.valueOf(child.getTextContent().trim()));
          }
       }

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -14,7 +14,7 @@
 
 package org.hornetq.core.postoffice.impl;
 
-import java.util.Set;
+import java.util.Collection;
 
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.logging.Logger;
@@ -118,31 +118,7 @@
    {
       //It's a high accept priority if the queue has at least one matching consumer
       
-      Set<Consumer> consumers = queue.getConsumers();
-      
-      for (Consumer consumer: consumers)
-      {
-         if (consumer instanceof Redistributor)
-         {
-            continue;
-         }
-         
-         Filter filter = consumer.getFilter();
-         
-         if (filter == null)
-         {
-            return true;
-         }
-         else
-         {
-            if (filter.match(message))
-            {
-               return true;
-            }
-         }
-      }
-      
-      return false;
+      return queue.hasMatchingConsumer(message);      
    }
    
    public void route(final ServerMessage message, final RoutingContext context) throws Exception

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -420,6 +420,7 @@
    public void closeContext()
    {
       final ReplicationContext token = tlReplicationContext.get();
+      
       if (token != null)
       {
          // Disassociate thread local

Deleted: trunk/src/main/org/hornetq/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Distributor.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/Distributor.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -1,33 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */ 
-
-package org.hornetq.core.server;
-
-
-/**
- * 
- * A Distributor
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface Distributor
-{
-   void addConsumer(Consumer consumer);
-
-   boolean removeConsumer(Consumer consumer);
-
-   int getConsumerCount();
-   
-   Consumer getNextConsumer();
-}

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -13,9 +13,9 @@
 
 package org.hornetq.core.server;
 
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Executor;
 
 import org.hornetq.core.filter.Filter;
@@ -49,8 +49,6 @@
 
    int getConsumerCount();
 
-   Set<Consumer> getConsumers();
-
    void addLast(MessageReference ref);
 
    void addFirst(MessageReference ref);
@@ -79,10 +77,10 @@
 
    List<MessageReference> getScheduledMessages();
 
-   Distributor getDistributionPolicy();
+//   Distributor getDistributionPolicy();
+//
+//   void setDistributionPolicy(Distributor policy);
 
-   void setDistributionPolicy(Distributor policy);
-
    int getMessagesAdded();
 
    MessageReference removeReferenceWithID(long id) throws Exception;
@@ -119,10 +117,14 @@
    void addRedistributor(long delay, Executor executor);
 
    void cancelRedistributor() throws Exception;
+   
+   boolean hasMatchingConsumer(ServerMessage message);
 
    // Only used in testing
    void deliverNow();
-
+   
+   Collection<Consumer> getConsumers();
+   
    boolean checkDLQ(MessageReference ref) throws Exception;
    
    void lockDelivery();

Deleted: trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
-
-/**
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- */
-public abstract class DistributorImpl implements Distributor
-{
-   private static final Logger log = Logger.getLogger(DistributorImpl.class);
-
-   protected final List<Consumer> consumers = new ArrayList<Consumer>();
-
-   public void addConsumer(final Consumer consumer)
-   {
-      consumers.add(consumer);
-   }
-
-   public boolean removeConsumer(final Consumer consumer)
-   {
-      return consumers.remove(consumer);
-   }
-
-   public int getConsumerCount()
-   {
-      return consumers.size();
-   }
-
-   public List<Consumer> getConsumers()
-   {
-      return consumers;
-   }
-}

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -96,8 +96,6 @@
                                addressSettingsRepository);
       }
 
-      queue.setDistributionPolicy(addressSettings.getDistributionPolicy());
-
       return queue;
    }
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -40,7 +40,6 @@
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -88,14 +87,12 @@
 
    private final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
 
-   private final MessageHandler globalHandler = new NullFilterMessageHandler();
+   private List<MessageHandler> handlers = new ArrayList<MessageHandler>();
 
    private final ConcurrentSet<MessageReference> expiringMessageReferences = new ConcurrentHashSet<MessageReference>();
 
    private final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
-   private volatile Distributor distributionPolicy = new RoundRobinDistributor();
-
    private boolean direct;
 
    private boolean promptDelivery;
@@ -128,14 +125,14 @@
 
    // We cache the consumers here since we don't want to include the redistributor
 
-   private final Set<Consumer> consumers = new HashSet<Consumer>();
+   private final Set<Consumer> consumerSet = new HashSet<Consumer>();
 
-   private final Map<Consumer, MessageHandler> messageHandlers = new HashMap<Consumer, MessageHandler>();
-
    private final ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
 
    private volatile SimpleString expiryAddress;
 
+   private int pos;
+
    public QueueImpl(final long id,
                     final SimpleString address,
                     final SimpleString name,
@@ -276,29 +273,33 @@
    {
       cancelRedistributor();
 
-      distributionPolicy.addConsumer(consumer);
+      MessageHandler handler;
 
-      consumers.add(consumer);
-
       if (consumer.getFilter() != null)
       {
-         messageHandlers.put(consumer, new FilterMessageHandler(messageReferences.iterator()));
+         handler = new FilterMessageHandler(consumer, messageReferences.iterator());
       }
+      else
+      {
+         handler = new NullFilterMessageHandler(consumer);
+      }
+
+      handlers.add(handler);
+
+      consumerSet.add(consumer);
    }
 
    public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
    {
-      boolean removed = distributionPolicy.removeConsumer(consumer);
+      boolean removed = this.removeHandlerGivenConsumer(consumer);
 
-      if (distributionPolicy.getConsumerCount() == 0)
+      if (handlers.isEmpty())
       {
          promptDelivery = false;
       }
 
-      consumers.remove(consumer);
+      consumerSet.remove(consumer);
 
-      messageHandlers.remove(consumer);
-
       if (removed)
       {
          for (SimpleString groupID : groups.keySet())
@@ -330,7 +331,7 @@
 
       if (delay > 0)
       {
-         if (consumers.size() == 0)
+         if (consumerSet.isEmpty())
          {
             DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
 
@@ -353,7 +354,7 @@
 
          redistributor = null;
 
-         distributionPolicy.removeConsumer(redistributor);
+         removeHandlerGivenConsumer(redistributor);
       }
 
       if (future != null)
@@ -366,14 +367,42 @@
 
    public synchronized int getConsumerCount()
    {
-      return consumers.size();
+      return consumerSet.size();
    }
 
    public synchronized Set<Consumer> getConsumers()
    {
-      return consumers;
+      return consumerSet;
    }
 
+   public synchronized boolean hasMatchingConsumer(final ServerMessage message)
+   {
+      for (MessageHandler handler : handlers)
+      {
+         Consumer consumer = handler.getConsumer();
+
+         if (consumer instanceof Redistributor)
+         {
+            continue;
+         }
+
+         Filter filter = consumer.getFilter();
+
+         if (filter == null)
+         {
+            return true;
+         }
+         else
+         {
+            if (filter.match(message))
+            {
+               return true;
+            }
+         }
+      }
+      return false;
+   }
+
    public Iterator<MessageReference> iterator()
    {
       return new Iterator<MessageReference>()
@@ -604,16 +633,6 @@
       deliveringCount.incrementAndGet();
    }
 
-   public Distributor getDistributionPolicy()
-   {
-      return distributionPolicy;
-   }
-
-   public void setDistributionPolicy(final Distributor distributionPolicy)
-   {
-      this.distributionPolicy = distributionPolicy;
-   }
-
    public int getMessagesAdded()
    {
       return messagesAdded.get();
@@ -862,14 +881,37 @@
    // Private
    // ------------------------------------------------------------------------------
 
+   private boolean removeHandlerGivenConsumer(final Consumer consumer)
+   {
+      Iterator<MessageHandler> iter = handlers.iterator();
+
+      boolean removed = false;
+
+      while (iter.hasNext())
+      {
+         MessageHandler handler = iter.next();
+
+         if (handler.getConsumer() == consumer)
+         {
+            iter.remove();
+
+            removed = true;
+
+            break;
+         }
+      }
+
+      return removed;
+   }
+
    private void internalAddRedistributor(final Executor executor)
    {
       // create the redistributor only once if there are no local consumers
-      if (consumers.size() == 0 && redistributor == null)
+      if (consumerSet.isEmpty() && redistributor == null)
       {
          redistributor = new Redistributor(this, storageManager, postOffice, executor, REDISTRIBUTOR_BATCH_SIZE);
 
-         distributionPolicy.addConsumer(redistributor);
+         handlers.add(new NullFilterMessageHandler(redistributor));
 
          redistributor.start();
 
@@ -979,6 +1021,7 @@
    private void sendToDeadLetterAddress(final MessageReference ref) throws Exception
    {
       SimpleString deadLetterAddress = addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress();
+
       if (deadLetterAddress != null)
       {
          Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
@@ -1021,122 +1064,222 @@
       tx.commit();
    }
 
+   private MessageHandler getHandlerRoundRobin()
+   {
+      MessageHandler handler = handlers.get(pos);
+
+      pos++;
+
+      if (pos == handlers.size())
+      {
+         pos = 0;
+      }
+
+      return handler;
+   }
+
+   private boolean checkExpired(final MessageReference reference)
+   {
+      if (reference.getMessage().isExpired())
+      {
+         reference.handled();
+
+         try
+         {
+            expire(reference);
+         }
+         catch (Exception e)
+         {
+            log.error("Failed to expire ref", e);
+         }
+
+         return true;
+      }
+      else
+      {
+         return false;
+      }
+   }
+
    /*
     * Attempt to deliver all the messages in the queue
     */
    private synchronized void deliver()
    {
-      if (paused)
+      if (paused || handlers.isEmpty())
       {
          return;
       }
 
       direct = false;
 
-      if (distributionPolicy.getConsumerCount() == 0)
-      {
-         return;
-      }
-
-      Consumer consumer;
-
-      MessageReference reference;
-
-      // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
-      int totalConsumers = distributionPolicy.getConsumerCount();
-      Set<Consumer> busyConsumers = new HashSet<Consumer>();
-      Set<Consumer> nullReferences = new HashSet<Consumer>();
-
+      int startPos = pos;
+      int totalCount = handlers.size();
+      int nullCount = 0;
+      int busyCount = 0;
       while (true)
       {
-         consumer = distributionPolicy.getNextConsumer();
+         MessageHandler handler = getHandlerRoundRobin();
 
-         MessageHandler handler = messageHandlers.get(consumer);
+         Consumer consumer = handler.getConsumer();
 
-         if (handler == null)
-         {
-            handler = globalHandler;
-         }
+         MessageReference reference = handler.peek(consumer);
 
-         reference = handler.peek(consumer);
-
          if (reference == null)
          {
-            nullReferences.add(consumer);
-
-            if (nullReferences.size() + busyConsumers.size() == totalConsumers)
-            {
-               // We delivered all the messages - go into direct delivery
-               direct = true;
-
-               promptDelivery = false;
-
-               return;
-            }
-
-            continue;
+            nullCount++;
          }
          else
          {
-            nullReferences.remove(consumer);
-
-            if (reference.getMessage().isExpired())
+            if (checkExpired(reference))
             {
-               // We expire messages on the server too
                handler.remove();
+            }
+            else
+            {
+               final SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
 
-               reference.handled();
+               boolean tryHandle = true;
 
-               try
+               if (groupID != null)
                {
-                  expire(reference);
+                  Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
+                  if (groupConsumer != null && groupConsumer != consumer)
+                  {
+                     tryHandle = false;
+
+                     busyCount++;
+                  }
                }
-               catch (Exception e)
+
+               if (tryHandle)
                {
-                  log.error("Failed to expire ref", e);
+                  HandleStatus status = handle(reference, consumer);
+
+                  if (status == HandleStatus.HANDLED)
+                  {
+                     handler.remove();
+                  }
+                  else if (status == HandleStatus.BUSY)
+                  {
+                     busyCount++;
+
+                     handler.reset();
+
+                     // if (groupID != null )
+                     // {
+                     // // group id being set seems to make delivery stop
+                     // // FIXME !!! why??
+                     // break;
+                     // }
+                  }
+                  else if (status == HandleStatus.NO_MATCH)
+                  {
+                     // if consumer filter reject the message make sure it won't be assigned the message group
+                     if (groupID != null)
+                     {
+                        groups.remove(groupID);
+                     }
+                  }
                }
-
-               continue;
             }
          }
 
-         final SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
-
-         if (groupID != null)
+         if (pos == startPos)
          {
-            Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+            // We've done all the consumers
 
-            if (groupConsumer != null && groupConsumer != consumer)
+            if (nullCount + busyCount == totalCount)
             {
-               continue;
+               if (nullCount == totalCount)
+               {
+                  // We delivered all the messages - go into direct delivery
+                  direct = true;
+
+                  promptDelivery = false;
+               }
+
+               break;
             }
+
+            nullCount = busyCount = 0;
          }
+      }
+   }
 
-         HandleStatus status = handle(reference, consumer);
+   private synchronized boolean directDeliver(final MessageReference reference)
+   {
+      if (paused || handlers.isEmpty())
+      {
+         return false;
+      }
 
-         if (status == HandleStatus.HANDLED)
+      int startPos = pos;
+      int busyCount = 0;
+      boolean setPromptDelivery = false;
+      while (true)
+      {
+         MessageHandler handler = getHandlerRoundRobin();
+
+         Consumer consumer = handler.getConsumer();
+
+         if (!checkExpired(reference))
          {
-            handler.remove();
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            busyConsumers.add(consumer);
+            SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
 
-            handler.reset();
+            boolean tryHandle = true;
 
-            if (groupID != null || busyConsumers.size() == totalConsumers)
+            if (groupID != null)
             {
-               // when all consumers are busy, we stop
-               break;
+               Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
+               if (groupConsumer != null && groupConsumer != consumer)
+               {
+                  tryHandle = false;
+               }
             }
+
+            if (tryHandle)
+            {
+               HandleStatus status = handle(reference, consumer);
+
+               if (status == HandleStatus.HANDLED)
+               {
+                  return true;
+               }
+               else if (status == HandleStatus.BUSY)
+               {
+                  busyCount++;
+
+                  if (groupID != null)
+                  {
+                     // If the group has been assigned a consumer there is no point in trying others
+
+                     return false;
+                  }
+               }
+               else if (status == HandleStatus.NO_MATCH)
+               {
+                  // if consumer filter reject the message make sure it won't be assigned the message group
+                  if (groupID != null)
+                  {
+                     groups.remove(groupID);
+                  }
+
+                  setPromptDelivery = true;
+               }
+            }
          }
-         else if (status == HandleStatus.NO_MATCH)
+
+         if (pos == startPos)
          {
-            // if consumer filter reject the message make sure it won't be assigned the message group
-            if (groupID != null)
+            if (setPromptDelivery)
             {
-               groups.remove(consumer);
+               promptDelivery = true;
             }
+
+            return false;
          }
       }
    }
@@ -1159,23 +1302,12 @@
       {
          // Deliver directly
 
-         HandleStatus status = directDeliver(ref);
+         boolean delivered = directDeliver(ref);
 
-         if (status == HandleStatus.HANDLED)
+         if (!delivered)
          {
-            // Ok
-         }
-         else if (status == HandleStatus.BUSY)
-         {
             add = true;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            add = true;
-         }
 
-         if (add)
-         {
             direct = false;
          }
       }
@@ -1213,81 +1345,6 @@
       }
    }
 
-   private synchronized HandleStatus directDeliver(final MessageReference reference)
-   {
-      if (distributionPolicy.getConsumerCount() == 0)
-      {
-         return HandleStatus.BUSY;
-      }
-
-      HandleStatus status;
-
-      boolean filterRejected = false;
-
-      int consumerCount = 0;
-
-      while (true)
-      {
-         Consumer consumer = distributionPolicy.getNextConsumer();
-         consumerCount++;
-
-         final SimpleString groupId = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
-
-         if (groupId != null)
-         {
-            Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
-            if (groupConsumer != null && groupConsumer != consumer)
-            {
-               continue;
-            }
-         }
-
-         status = handle(reference, consumer);
-
-         if (status == HandleStatus.HANDLED)
-         {
-            break;
-         }
-         else if (status == HandleStatus.NO_MATCH)
-         {
-            filterRejected = true;
-            if (groupId != null)
-            {
-               groups.remove(consumer);
-            }
-         }
-         else if (status == HandleStatus.BUSY)
-         {
-            if (groupId != null)
-            {
-               break;
-            }
-         }
-         // if we've tried all of them
-         if (consumerCount == distributionPolicy.getConsumerCount())
-         {
-            if (filterRejected)
-            {
-               status = HandleStatus.NO_MATCH;
-               break;
-            }
-            else
-            {
-               // Give up - all consumers busy
-               status = HandleStatus.BUSY;
-               break;
-            }
-         }
-      }
-
-      if (status == HandleStatus.NO_MATCH)
-      {
-         promptDelivery = true;
-      }
-
-      return status;
-   }
-
    private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
    {
       HandleStatus status;
@@ -1392,7 +1449,6 @@
    {
       public void run()
       {
-
          // Must be set to false *before* executing to avoid race
          waitingToDeliver.set(false);
 
@@ -1408,7 +1464,7 @@
       }
    }
 
-   final class RefsOperation implements TransactionOperation
+   private final class RefsOperation implements TransactionOperation
    {
       List<MessageReference> refsToAck = new ArrayList<MessageReference>();
 
@@ -1523,21 +1579,38 @@
       void remove();
 
       void reset();
+
+      Consumer getConsumer();
    }
 
    private class FilterMessageHandler implements MessageHandler
    {
+      private final Consumer consumer;
+
       private Iterator<MessageReference> iterator;
 
-      public FilterMessageHandler(final Iterator<MessageReference> iterator)
+      private MessageReference lastReference;
+
+      private boolean resetting;
+
+      public FilterMessageHandler(final Consumer consumer, final Iterator<MessageReference> iterator)
       {
+         this.consumer = consumer;
+
          this.iterator = iterator;
       }
 
       public MessageReference peek(final Consumer consumer)
       {
+         if (resetting)
+         {
+            resetting = false;
+
+            return lastReference;
+         }
+
          MessageReference reference;
-         
+
          if (iterator.hasNext())
          {
             reference = iterator.next();
@@ -1554,6 +1627,8 @@
                iterator = messageReferences.iterator();
             }
          }
+         lastReference = reference;
+
          return reference;
       }
 
@@ -1564,12 +1639,24 @@
 
       public void reset()
       {
-         iterator = messageReferences.iterator();
+         resetting = true;
       }
+
+      public Consumer getConsumer()
+      {
+         return consumer;
+      }
    }
 
    private class NullFilterMessageHandler implements MessageHandler
    {
+      private final Consumer consumer;
+
+      NullFilterMessageHandler(final Consumer consumer)
+      {
+         this.consumer = consumer;
+      }
+
       public MessageReference peek(final Consumer consumer)
       {
          return messageReferences.peekFirst();
@@ -1584,5 +1671,10 @@
       {
          // no-op
       }
+
+      public Consumer getConsumer()
+      {
+         return consumer;
+      }
    }
 }

Deleted: trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Consumer;
-
-/**
- * A RoundRobinDistributor
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class RoundRobinDistributor extends DistributorImpl
-{
-   private static final Logger log = Logger.getLogger(RoundRobinDistributor.class);
-
-   protected int pos = 0;
-
-   @Override
-   public synchronized void addConsumer(final Consumer consumer)
-   {
-      pos = 0;
-      super.addConsumer(consumer);
-   }
-
-   @Override
-   public synchronized boolean removeConsumer(final Consumer consumer)
-   {
-      pos = 0;
-      return super.removeConsumer(consumer);
-   }
-
-   @Override
-   public synchronized int getConsumerCount()
-   {
-      return super.getConsumerCount();
-   }
-
-   public synchronized Consumer getNextConsumer()
-   {
-      Consumer consumer = consumers.get(pos);
-      incrementPosition();
-      return consumer;
-   }
-
-   private synchronized void incrementPosition()
-   {
-      pos++;
-
-      if (pos == consumers.size())
-      {
-         pos = 0;
-      }
-   }
-}

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -1728,6 +1728,7 @@
             }
 
          });
+         
          storageManager.completeReplication();
       }
       else
@@ -1750,6 +1751,7 @@
       if (confirmPacket != null)
       {
          channel.confirm(confirmPacket);
+         
          if (flush)
          {
             channel.flushConfirmations();

Modified: trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -14,8 +14,6 @@
 package org.hornetq.core.settings.impl;
 
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Distributor;
-import org.hornetq.core.server.impl.RoundRobinDistributor;
 import org.hornetq.core.settings.Mergeable;
 import org.hornetq.utils.SimpleString;
 
@@ -32,14 +30,12 @@
    /**
     * defaults used if null, this allows merging
     */
-   public static final Class<?> DEFAULT_DISTRIBUTION_POLICY_CLASS = new RoundRobinDistributor().getClass();
-
    public static final int DEFAULT_MAX_SIZE_BYTES = -1;
 
    public static final AddressFullMessagePolicy DEFAULT_ADDRESS_FULL_MESSAGE_POLICY = AddressFullMessagePolicy.PAGE;
 
    public static final int DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
-   
+
    public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
 
    public static final int DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
@@ -60,8 +56,6 @@
 
    private Boolean dropMessagesWhenFull = null;
 
-   private String distributionPolicyClass = null;
-
    private Integer maxDeliveryAttempts = null;
 
    private Integer messageCounterHistoryDayLimit = null;
@@ -149,16 +143,6 @@
       this.redeliveryDelay = redeliveryDelay;
    }
 
-   public String getDistributionPolicyClass()
-   {
-      return distributionPolicyClass;
-   }
-
-   public void setDistributionPolicyClass(final String distributionPolicyClass)
-   {
-      this.distributionPolicyClass = distributionPolicyClass;
-   }
-
    public SimpleString getDeadLetterAddress()
    {
       return deadLetterAddress;
@@ -189,25 +173,6 @@
       sendToDLAOnNoRoute = value;
    }
 
-   public Distributor getDistributionPolicy()
-   {
-      try
-      {
-         if (distributionPolicyClass != null)
-         {
-            return (Distributor)getClass().getClassLoader().loadClass(distributionPolicyClass).newInstance();
-         }
-         else
-         {
-            return (Distributor)DEFAULT_DISTRIBUTION_POLICY_CLASS.newInstance();
-         }
-      }
-      catch (Exception e)
-      {
-         throw new IllegalArgumentException("Error instantiating distribution policy '" + e + " '");
-      }
-   }
-
    public long getRedistributionDelay()
    {
       return redistributionDelay != null ? redistributionDelay : DEFAULT_REDISTRIBUTION_DELAY;
@@ -248,10 +213,6 @@
       {
          redeliveryDelay = merged.redeliveryDelay;
       }
-      if (distributionPolicyClass == null)
-      {
-         distributionPolicyClass = merged.distributionPolicyClass;
-      }
       if (deadLetterAddress == null)
       {
          deadLetterAddress = merged.deadLetterAddress;

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -12,7 +12,7 @@
  */
 package org.hornetq.tests.integration.client;
 
-import java.util.Set;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -911,7 +911,7 @@
 
             for (Binding binding : bindings.getBindings())
             {
-               Set<Consumer> consumers = ((QueueBinding)binding).getQueue().getConsumers();
+               Collection<Consumer> consumers = ((QueueBinding)binding).getQueue().getConsumers();
 
                for (Consumer consumer : consumers)
                {

Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -29,6 +29,7 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
@@ -41,6 +42,8 @@
  */
 public class MessageGroupingTest extends UnitTestCase
 {
+   private static final Logger log = Logger.getLogger(MessageGroupingTest.class);
+
    private HornetQServer server;
 
    private ClientSession clientSession;
@@ -85,13 +88,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -130,13 +133,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -178,18 +181,18 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
 
-      for(int i = 0; i < numMessages/2; i++)
+      for (int i = 0; i < numMessages / 2; i++)
       {
          ClientMessage cm = consumer.receive(500);
          assertNotNull(cm);
@@ -199,18 +202,25 @@
          assertNotNull(cm);
          assertEquals(cm.getBody().readString(), "m" + i);
       }
-
+      
+      log.info("closing consumers");
+      
       consumer2.close();
+      
+      log.info("closed consumer 2");
+      
       consumer.close();
-      //check that within their groups the messages are still in the correct order
+      
+      log.info("closed consuemrs");
+      // check that within their groups the messages are still in the correct order
       consumer = clientSession.createConsumer(qName);
-      for(int i = 0; i < numMessages; i+=2)
+      for (int i = 0; i < numMessages; i += 2)
       {
          ClientMessage cm = consumer.receive(500);
          assertNotNull(cm);
          assertEquals(cm.getBody().readString(), "m" + i);
       }
-      for(int i = 1; i < numMessages; i+=2)
+      for (int i = 1; i < numMessages; i += 2)
       {
          ClientMessage cm = consumer.receive(500);
          assertNotNull(cm);
@@ -230,13 +240,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -269,13 +279,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -322,13 +332,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -393,13 +403,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -451,13 +461,13 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = createTextMessage("m" + i, clientSession);
-         if( i % 2 == 0 || i == 0)
+         if (i % 2 == 0 || i == 0)
          {
             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
          }
          else
          {
-             message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+            message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
          }
          clientProducer.send(message);
       }
@@ -536,14 +546,14 @@
       }
       server = null;
       clientSession = null;
-      
+
       super.tearDown();
    }
 
    protected void setUp() throws Exception
    {
       super.setUp();
-      
+
       ConfigurationImpl configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
       TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
@@ -578,13 +588,13 @@
          if (acknowledge)
          {
             try
-         {
-            message.acknowledge();
-         }
-         catch (HornetQException e)
             {
-               //ignore
+               message.acknowledge();
             }
+            catch (HornetQException e)
+            {
+               // ignore
+            }
          }
          latch.countDown();
       }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -149,7 +149,7 @@
 
       session.close();
    }
-
+   
    public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception
    {
       ClientSessionFactory sf = createInVMFactory();
@@ -182,6 +182,8 @@
          boolean failed;
 
          boolean started = true;
+         
+         int count = 0;
 
          public void onMessage(final ClientMessage message)
          {
@@ -192,16 +194,17 @@
                {
                   failed = true;
                }
-
-               latch.countDown();
-
-               if (latch.getCount() == 0)
+               
+               count++;
+               
+               if (count == 10)
                {
-
                   message.acknowledge();
                   session.stop();
                   started = false;
                }
+
+               latch.countDown();            
             }
             catch (Exception e)
             {
@@ -215,8 +218,6 @@
 
       latch.await();
 
-      Thread.sleep(100);
-
       assertFalse(handler.failed);
 
       // Make sure no exceptions were thrown from onMessage

Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -281,7 +281,7 @@
       
       csf.close();
    }
-
+   
    /*
    * Test the client triggering failure due to no ping from server received in time
    */
@@ -325,7 +325,7 @@
       //Setting the handler to null will prevent server sending pings back to client
       serverConn.getChannel(0, -1).setHandler(null);
 
-      for (int i = 0; i < 1000; i++)
+      for (int i = 0; i < 2000; i++)
       {
          // a few tries to avoid a possible race caused by GCs or similar issues
          if (server.getRemotingService().getConnections().isEmpty() && clientListener.getException() != null)

Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -33,8 +33,7 @@
            "      <dead-letter-address>DLQtest</dead-letter-address>\n" +
            "      <expiry-address>ExpiryQueueTest</expiry-address>\n" +
            "      <redelivery-delay>100</redelivery-delay>\n" +
-           "      <max-size-bytes>-100</max-size-bytes>\n" +
-           "      <distribution-policy-class>org.hornetq.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>\n" +
+           "      <max-size-bytes>-100</max-size-bytes>\n" +         
            "      <message-counter-history-day-limit>1000</message-counter-history-day-limit>\n" +
            "   </address-settings>";
 
@@ -57,8 +56,7 @@
       AddressSettings as = repository.getMatch("queues.aq");
       assertNotNull(as);
       assertEquals(100, as.getRedeliveryDelay());
-      assertEquals(-100, as.getMaxSizeBytes());
-      assertEquals("org.hornetq.core.impl.RoundRobinDistributionPolicy", as.getDistributionPolicyClass());
+      assertEquals(-100, as.getMaxSizeBytes());     
       assertEquals(1000, as.getMessageCounterHistoryDayLimit());
       assertEquals(new SimpleString("DLQtest"), as.getDeadLetterAddress());
       assertEquals(new SimpleString("ExpiryQueueTest"), as.getExpiryAddress());
@@ -72,8 +70,7 @@
                  + "      <dead-letter-address>DLQtest</dead-letter-address>\n"
                  + "      <expiry-address>ExpiryQueueTest</expiry-address>\n"
                  + "      <redelivery-delay>100</redelivery-delay>\n"
-                 + "      <max-size-bytes>-100</max-size-bytes>\n"
-                 + "      <distribution-policy-class>org.hornetq.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>"
+                 + "      <max-size-bytes>-100</max-size-bytes>\n"               
                  + "      <message-counter-history-day-limit>1000</message-counter-history-day-limit>"
                  + "   </address-setting>"
                  + "</address-settings>"
@@ -88,8 +85,7 @@
       AddressSettings as = repository.getMatch("queues.aq");
       assertNotNull(as);
       assertEquals(100, as.getRedeliveryDelay());
-      assertEquals(-100, as.getMaxSizeBytes());
-      assertEquals("org.hornetq.core.impl.RoundRobinDistributionPolicy", as.getDistributionPolicyClass());
+      assertEquals(-100, as.getMaxSizeBytes());      
       assertEquals(1000, as.getMessageCounterHistoryDayLimit());
       assertEquals(new SimpleString("DLQtest"), as.getDeadLetterAddress());
       assertEquals(new SimpleString("ExpiryQueueTest"), as.getExpiryAddress());

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -20,7 +20,6 @@
 
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.RoutingContext;
@@ -31,7 +30,7 @@
 public class FakeQueue implements Queue
 {
    private SimpleString name;
-   
+
    public FakeQueue(SimpleString name)
    {
       this.name = name;
@@ -43,7 +42,7 @@
    public void acknowledge(MessageReference ref) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -52,7 +51,7 @@
    public void acknowledge(Transaction tx, MessageReference ref) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -61,7 +60,7 @@
    public void addConsumer(Consumer consumer) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -70,7 +69,7 @@
    public void addFirst(MessageReference ref)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -79,7 +78,7 @@
    public void addLast(MessageReference ref)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -88,7 +87,7 @@
    public void addRedistributor(long delay, Executor executor)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -97,7 +96,7 @@
    public void cancel(MessageReference reference) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -106,7 +105,7 @@
    public void cancel(Transaction tx, MessageReference ref) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -115,7 +114,7 @@
    public void cancelRedistributor() throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -169,7 +168,7 @@
    public void deliverAsync(Executor executor)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -178,7 +177,7 @@
    public void deliverNow()
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -187,7 +186,7 @@
    public void expire(MessageReference ref) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -205,7 +204,7 @@
    public void expireReferences() throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -245,15 +244,6 @@
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#getDistributionPolicy()
-    */
-   public Distributor getDistributionPolicy()
-   {
-      // TODO Auto-generated method stub
-      return null;
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#getFilter()
     */
    public Filter getFilter()
@@ -375,7 +365,7 @@
    public void lockDelivery()
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -402,7 +392,7 @@
    public void pause()
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -411,7 +401,7 @@
    public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -420,7 +410,7 @@
    public void referenceHandled()
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -456,7 +446,7 @@
    public void resume()
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -469,32 +459,23 @@
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#setDistributionPolicy(org.hornetq.core.server.Distributor)
-    */
-   public void setDistributionPolicy(Distributor policy)
-   {
-      // TODO Auto-generated method stub
-      
-   }
-
-   /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
     */
    public void setExpiryAddress(SimpleString expiryAddress)
    {
       // TODO Auto-generated method stub
-      
+
    }
 
-      // TODO Auto-generated method stub
-      
+   // TODO Auto-generated method stub
+
    /* (non-Javadoc)
     * @see org.hornetq.core.server.Queue#unlockDelivery()
     */
    public void unlockDelivery()
    {
       // TODO Auto-generated method stub
-      
+
    }
 
    /* (non-Javadoc)
@@ -503,9 +484,13 @@
    public void route(ServerMessage message, RoutingContext context) throws Exception
    {
       // TODO Auto-generated method stub
-      
+
    }
-   
-   
 
+   public boolean hasMatchingConsumer(ServerMessage message)
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
 }
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -23,13 +23,11 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
 import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.QueueImpl;
-import org.hornetq.core.server.impl.RoundRobinDistributor;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -118,21 +116,6 @@
       assertFalse(queue.removeConsumer(cons3));
    }
 
-   public void testGetSetDistributionPolicy()
-   {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
-
-      assertNotNull(queue.getDistributionPolicy());
-
-      assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
-      Distributor policy = new DummyDistributionPolicy();
-
-      queue.setDistributionPolicy(policy);
-
-      assertEquals(policy, queue.getDistributionPolicy());
-   }
-
    public void testGetFilter()
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -608,8 +591,6 @@
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
-      assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
       final int numMessages = 10;
 
       List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -653,8 +634,6 @@
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
-      assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
       final int numMessages = 10;
 
       List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -943,7 +922,7 @@
       int currId = 0;
       for (MessageReference receeivedRef : receeivedRefs)
       {
-         assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID() , currId++);
+         assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID(), currId++);
       }
    }
 
@@ -1017,7 +996,7 @@
       int currId = 10;
       for (MessageReference receeivedRef : receeivedRefs)
       {
-         assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID() , currId++);
+         assertEquals("messages received out of order", receeivedRef.getMessage().getMessageID(), currId++);
       }
    }
 
@@ -1248,7 +1227,7 @@
    public void testPauseAndResumeWithAsync() throws Exception
    {
       Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
-      
+
       // pauses the queue
       queue.pause();
 
@@ -1377,34 +1356,4 @@
       }
    }
 
-   class DummyDistributionPolicy implements Distributor
-   {
-      Consumer consumer;
-
-      public List<Consumer> getConsumers()
-      {
-         return null;
-      }
-
-      public void addConsumer(Consumer consumer)
-      {
-         this.consumer = consumer;
-      }
-
-      public boolean removeConsumer(Consumer consumer)
-      {
-         return false;
-      }
-
-      public int getConsumerCount()
-      {
-         return 0;
-      }
-
-      public Consumer getNextConsumer()
-      {
-         return consumer;
-      }
-   }
-
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java	2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java	2009-11-13 20:52:21 UTC (rev 8283)
@@ -9,7 +9,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
  * implied.  See the License for the specific language governing
  * permissions and limitations under the License.
- */ 
+ */
 
 package org.hornetq.tests.unit.core.settings.impl;
 
@@ -26,14 +26,13 @@
    public void testDefaults()
    {
       AddressSettings addressSettings = new AddressSettings();
-      assertEquals(AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS, addressSettings.getDistributionPolicy().getClass());
-      assertEquals(null, addressSettings.getDistributionPolicyClass());
       assertEquals(null, addressSettings.getDeadLetterAddress());
       assertEquals(null, addressSettings.getExpiryAddress());
       assertEquals(AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS, addressSettings.getMaxDeliveryAttempts());
       assertEquals(addressSettings.getMaxSizeBytes(), AddressSettings.DEFAULT_MAX_SIZE_BYTES);
       assertEquals(AddressSettings.DEFAULT_PAGE_SIZE, addressSettings.getPageSizeBytes());
-      assertEquals(AddressSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT, addressSettings.getMessageCounterHistoryDayLimit());
+      assertEquals(AddressSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT,
+                   addressSettings.getMessageCounterHistoryDayLimit());
       assertEquals(AddressSettings.DEFAULT_REDELIVER_DELAY, addressSettings.getRedeliveryDelay());
 
    }
@@ -53,8 +52,6 @@
       addressSettingsToMerge.setRedeliveryDelay((long)1003);
       addressSettingsToMerge.setPageSizeBytes(1004);
       addressSettings.merge(addressSettingsToMerge);
-      assertEquals(addressSettings.getDistributionPolicy().getClass(), AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
-      assertEquals(addressSettings.getDistributionPolicyClass(), null);
       assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
       assertEquals(addressSettings.getExpiryAddress(), exp);
       assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
@@ -86,8 +83,6 @@
       addressSettingsToMerge2.setRedeliveryDelay((long)2003);
       addressSettings.merge(addressSettingsToMerge2);
 
-      assertEquals(addressSettings.getDistributionPolicy().getClass(), AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
-      assertEquals(addressSettings.getDistributionPolicyClass(), null);
       assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
       assertEquals(addressSettings.getExpiryAddress(), exp);
       assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
@@ -122,14 +117,12 @@
       addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
       addressSettings.merge(addressSettingsToMerge2);
 
-      assertEquals(addressSettings.getDistributionPolicy().getClass(), AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
-      assertEquals(addressSettings.getDistributionPolicyClass(), null);
       assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
       assertEquals(addressSettings.getExpiryAddress(), exp);
       assertEquals(addressSettings.getMaxDeliveryAttempts(), 2000);
       assertEquals(addressSettings.getMaxSizeBytes(), 1001);
       assertEquals(addressSettings.getMessageCounterHistoryDayLimit(), 2002);
-      assertEquals(addressSettings.getRedeliveryDelay(),1003);
+      assertEquals(addressSettings.getRedeliveryDelay(), 1003);
       assertEquals(AddressFullMessagePolicy.DROP, addressSettings.getAddressFullMessagePolicy());
    }
 }



More information about the hornetq-commits mailing list