Author: timfox
Date: 2009-11-13 15:52:21 -0500 (Fri, 13 Nov 2009)
New Revision: 8283
Removed:
trunk/src/main/org/hornetq/core/server/Distributor.java
trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/queue-attributes.xml
trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml
trunk/examples/javaee/hajndi/config/hornetq-queues.xml
trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml
trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml
trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml
trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml
trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
trunk/src/config/trunk/clustered/hornetq-configuration.xml
trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java
Log:
refactored queue delivery logic
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -775,15 +775,8 @@
<entry>boolean</entry>
<entry>whether to treat the queue as a last value
queue</entry>
<entry>false</entry>
- </row>
+ </row>
<row>
- <entry><link
linkend="queue-attributes.address-settings"
-
>address-settings.distribution-policy-class</link></entry>
- <entry>String</entry>
- <entry>the class to use for distributing messages to a
consumer</entry>
- <entry>RoundRobinDistributor</entry>
- </row>
- <row>
<entry><link linkend="paging"
address-settings.page-size-bytes</link></entry>
<entry>Long</entry>
Modified: trunk/docs/user-manual/en/queue-attributes.xml
===================================================================
--- trunk/docs/user-manual/en/queue-attributes.xml 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/docs/user-manual/en/queue-attributes.xml 2009-11-13 20:52:21 UTC (rev 8283)
@@ -95,8 +95,7 @@
<max-delivery-attempts>3</max-delivery-attempts>
<redelivery-delay>5000</redelivery-delay>
<expiry-address>jms.queue.expiryQueue</expiry-address>
- <last-value-queue>true</last-value-queue>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <last-value-queue>true</last-value-queue>
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>20000</page-size-bytes>
<redistribution-delay>0</redistribution-delay>
@@ -123,9 +122,6 @@
see <link
linkend="message-expiry.configuring">here</link>.</para>
<para><literal>last-value-queue</literal> defines whether a
queue only uses last values or
not. see <link
linkend="last-value-queues">here</link>.</para>
- <para><literal>distribution-policy-class</literal> defines the
class to use for distribution
- of messages by a queue to consumers. By default this is <literal
-
>org.hornetq.core.server.impl.RoundRobinDistributor</literal>.</para>
<para><literal>max-size-bytes</literal> and
<literal>page-size-bytes</literal> are used to
set paging on an address. This is explained <link
linkend="paging">here</link>.</para>
<para><literal>redistribution-delay</literal> defines how long
to wait when the last
Modified: trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/ejb-jms-transaction/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -51,8 +51,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/hajndi/config/hornetq-queues.xml
===================================================================
--- trunk/examples/javaee/hajndi/config/hornetq-queues.xml 2009-11-13 19:14:02 UTC (rev
8282)
+++ trunk/examples/javaee/hajndi/config/hornetq-queues.xml 2009-11-13 20:52:21 UTC (rev
8283)
@@ -17,8 +17,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml 2009-11-13 19:14:02 UTC
(rev 8282)
+++ trunk/examples/javaee/mdb-bmt/server/hornetq-configuration.xml 2009-11-13 20:52:21 UTC
(rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml
===================================================================
---
trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/examples/javaee/mdb-cmt-setrollbackonly/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-local/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml
===================================================================
---
trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/examples/javaee/mdb-cmt-tx-not-supported/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-cmt-tx-required/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/mdb-message-selector/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/examples/javaee/mdb-tx-send/server/hornetq-configuration.xml 2009-11-13 20:52:21
UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/examples/javaee/servlet-ssl/server/hornetq-configuration.xml 2009-11-13 20:52:21
UTC (rev 8283)
@@ -57,8 +57,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/examples/javaee/servlet-transport/server/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -54,8 +54,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml
===================================================================
--- trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/examples/javaee/xarecovery/server/hornetq-configuration.xml 2009-11-13 20:52:21
UTC (rev 8283)
@@ -51,8 +51,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-13 19:14:02 UTC (rev
8282)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-11-13 20:52:21 UTC (rev
8283)
@@ -454,9 +454,7 @@
<xsd:element maxOccurs="1" minOccurs="0"
name="page-size-bytes" type="xsd:int">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="address-full-policy" type="addressFullMessagePolicyType">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0"
name="distribution-policy-class" type="xsd:string">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="message-counter-history-day-limit" type="xsd:int">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="last-value-queue" type="xsd:boolean">
Modified: trunk/src/config/jboss-as/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC
(rev 8282)
+++ trunk/src/config/jboss-as/clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC
(rev 8283)
@@ -82,8 +82,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/src/config/jboss-as/non-clustered/hornetq-configuration.xml 2009-11-13 20:52:21
UTC (rev 8283)
@@ -56,8 +56,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/stand-alone/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/src/config/stand-alone/clustered/hornetq-configuration.xml 2009-11-13 20:52:21
UTC (rev 8283)
@@ -61,8 +61,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/src/config/stand-alone/non-clustered/hornetq-configuration.xml 2009-11-13
20:52:21 UTC (rev 8283)
@@ -35,8 +35,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
-
<distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/trunk/clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC
(rev 8282)
+++ trunk/src/config/trunk/clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC
(rev 8283)
@@ -61,8 +61,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/config/trunk/non-clustered/hornetq-configuration.xml
===================================================================
--- trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2009-11-13 19:14:02 UTC
(rev 8282)
+++ trunk/src/config/trunk/non-clustered/hornetq-configuration.xml 2009-11-13 20:52:21 UTC
(rev 8283)
@@ -35,8 +35,7 @@
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>-1</max-size-bytes>
- <page-size-bytes>10485760</page-size-bytes>
- <distribution-policy-class>org.hornetq.core.server.impl.RoundRobinDistributor</distribution-policy-class>
+ <page-size-bytes>10485760</page-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
Modified: trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/deployers/impl/AddressSettingsDeployer.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -30,7 +30,7 @@
public class AddressSettingsDeployer extends XmlDeployer
{
private static final Logger log = Logger.getLogger(AddressSettingsDeployer.class);
-
+
private static final String DEAD_LETTER_ADDRESS_NODE_NAME =
"dead-letter-address";
private static final String EXPIRY_ADDRESS_NODE_NAME = "expiry-address";
@@ -45,14 +45,12 @@
private static final String PAGE_SIZE_BYTES_NODE_NAME = "page-size-bytes";
- private static final String DISTRIBUTION_POLICY_CLASS_NODE_NAME =
"distribution-policy-class";
-
private static final String MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME =
"message-counter-history-day-limit";
private static final String LVQ_NODE_NAME = "last-value-queue";
-
+
private static final String REDISTRIBUTION_DELAY_NODE_NAME =
"redistribution-delay";
-
+
private static final String SEND_TO_DLA_ON_NO_ROUTE =
"send-to-dla-on-no-route";
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
@@ -85,7 +83,7 @@
* @throws Exception .
*/
public void deploy(Node node) throws Exception
- {
+ {
String match =
node.getAttributes().getNamedItem(getKeyAttribute()).getNodeValue();
NodeList children = node.getChildNodes();
@@ -118,10 +116,6 @@
{
addressSettings.setPageSizeBytes(Integer.valueOf(child.getTextContent()));
}
- else if
(DISTRIBUTION_POLICY_CLASS_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
- {
- addressSettings.setDistributionPolicyClass(child.getTextContent());
- }
else if
(MESSAGE_COUNTER_HISTORY_DAY_LIMIT_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
{
addressSettings.setMessageCounterHistoryDayLimit(Integer.valueOf(child.getTextContent()));
@@ -142,7 +136,7 @@
else if (value.equals(AddressFullMessagePolicy.PAGE.toString()))
{
policy = AddressFullMessagePolicy.PAGE;
- }
+ }
addressSettings.setAddressFullMessagePolicy(policy);
}
else if (LVQ_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
@@ -154,11 +148,11 @@
addressSettings.setMaxDeliveryAttempts(Integer.valueOf(child.getTextContent().trim()));
}
else if (REDISTRIBUTION_DELAY_NODE_NAME.equalsIgnoreCase(child.getNodeName()))
- {
+ {
addressSettings.setRedistributionDelay(Long.valueOf(child.getTextContent().trim()));
}
else if (SEND_TO_DLA_ON_NO_ROUTE.equalsIgnoreCase(child.getNodeName()))
- {
+ {
addressSettings.setSendToDLAOnNoRoute(Boolean.valueOf(child.getTextContent().trim()));
}
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -14,7 +14,7 @@
package org.hornetq.core.postoffice.impl;
-import java.util.Set;
+import java.util.Collection;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
@@ -118,31 +118,7 @@
{
//It's a high accept priority if the queue has at least one matching consumer
- Set<Consumer> consumers = queue.getConsumers();
-
- for (Consumer consumer: consumers)
- {
- if (consumer instanceof Redistributor)
- {
- continue;
- }
-
- Filter filter = consumer.getFilter();
-
- if (filter == null)
- {
- return true;
- }
- else
- {
- if (filter.match(message))
- {
- return true;
- }
- }
- }
-
- return false;
+ return queue.hasMatchingConsumer(message);
}
public void route(final ServerMessage message, final RoutingContext context) throws
Exception
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -420,6 +420,7 @@
public void closeContext()
{
final ReplicationContext token = tlReplicationContext.get();
+
if (token != null)
{
// Disassociate thread local
Deleted: trunk/src/main/org/hornetq/core/server/Distributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Distributor.java 2009-11-13 19:14:02 UTC (rev
8282)
+++ trunk/src/main/org/hornetq/core/server/Distributor.java 2009-11-13 20:52:21 UTC (rev
8283)
@@ -1,33 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server;
-
-
-/**
- *
- * A Distributor
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- */
-public interface Distributor
-{
- void addConsumer(Consumer consumer);
-
- boolean removeConsumer(Consumer consumer);
-
- int getConsumerCount();
-
- Consumer getNextConsumer();
-}
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-11-13 19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-11-13 20:52:21 UTC (rev 8283)
@@ -13,9 +13,9 @@
package org.hornetq.core.server;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
@@ -49,8 +49,6 @@
int getConsumerCount();
- Set<Consumer> getConsumers();
-
void addLast(MessageReference ref);
void addFirst(MessageReference ref);
@@ -79,10 +77,10 @@
List<MessageReference> getScheduledMessages();
- Distributor getDistributionPolicy();
+// Distributor getDistributionPolicy();
+//
+// void setDistributionPolicy(Distributor policy);
- void setDistributionPolicy(Distributor policy);
-
int getMessagesAdded();
MessageReference removeReferenceWithID(long id) throws Exception;
@@ -119,10 +117,14 @@
void addRedistributor(long delay, Executor executor);
void cancelRedistributor() throws Exception;
+
+ boolean hasMatchingConsumer(ServerMessage message);
// Only used in testing
void deliverNow();
-
+
+ Collection<Consumer> getConsumers();
+
boolean checkDLQ(MessageReference ref) throws Exception;
void lockDelivery();
Deleted: trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/DistributorImpl.java 2009-11-13 20:52:21
UTC (rev 8283)
@@ -1,50 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-package org.hornetq.core.server.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
-
-/**
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- */
-public abstract class DistributorImpl implements Distributor
-{
- private static final Logger log = Logger.getLogger(DistributorImpl.class);
-
- protected final List<Consumer> consumers = new ArrayList<Consumer>();
-
- public void addConsumer(final Consumer consumer)
- {
- consumers.add(consumer);
- }
-
- public boolean removeConsumer(final Consumer consumer)
- {
- return consumers.remove(consumer);
- }
-
- public int getConsumerCount()
- {
- return consumers.size();
- }
-
- public List<Consumer> getConsumers()
- {
- return consumers;
- }
-}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-13 20:52:21
UTC (rev 8283)
@@ -96,8 +96,6 @@
addressSettingsRepository);
}
- queue.setDistributionPolicy(addressSettings.getDistributionPolicy());
-
return queue;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-13 19:14:02 UTC
(rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-13 20:52:21 UTC
(rev 8283)
@@ -40,7 +40,6 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -88,14 +87,12 @@
private final PriorityLinkedList<MessageReference> messageReferences = new
PriorityLinkedListImpl<MessageReference>(NUM_PRIORITIES);
- private final MessageHandler globalHandler = new NullFilterMessageHandler();
+ private List<MessageHandler> handlers = new ArrayList<MessageHandler>();
private final ConcurrentSet<MessageReference> expiringMessageReferences = new
ConcurrentHashSet<MessageReference>();
private final ScheduledDeliveryHandler scheduledDeliveryHandler;
- private volatile Distributor distributionPolicy = new RoundRobinDistributor();
-
private boolean direct;
private boolean promptDelivery;
@@ -128,14 +125,14 @@
// We cache the consumers here since we don't want to include the redistributor
- private final Set<Consumer> consumers = new HashSet<Consumer>();
+ private final Set<Consumer> consumerSet = new HashSet<Consumer>();
- private final Map<Consumer, MessageHandler> messageHandlers = new
HashMap<Consumer, MessageHandler>();
-
private final ConcurrentMap<SimpleString, Consumer> groups = new
ConcurrentHashMap<SimpleString, Consumer>();
private volatile SimpleString expiryAddress;
+ private int pos;
+
public QueueImpl(final long id,
final SimpleString address,
final SimpleString name,
@@ -276,29 +273,33 @@
{
cancelRedistributor();
- distributionPolicy.addConsumer(consumer);
+ MessageHandler handler;
- consumers.add(consumer);
-
if (consumer.getFilter() != null)
{
- messageHandlers.put(consumer, new
FilterMessageHandler(messageReferences.iterator()));
+ handler = new FilterMessageHandler(consumer, messageReferences.iterator());
}
+ else
+ {
+ handler = new NullFilterMessageHandler(consumer);
+ }
+
+ handlers.add(handler);
+
+ consumerSet.add(consumer);
}
public synchronized boolean removeConsumer(final Consumer consumer) throws Exception
{
- boolean removed = distributionPolicy.removeConsumer(consumer);
+ boolean removed = this.removeHandlerGivenConsumer(consumer);
- if (distributionPolicy.getConsumerCount() == 0)
+ if (handlers.isEmpty())
{
promptDelivery = false;
}
- consumers.remove(consumer);
+ consumerSet.remove(consumer);
- messageHandlers.remove(consumer);
-
if (removed)
{
for (SimpleString groupID : groups.keySet())
@@ -330,7 +331,7 @@
if (delay > 0)
{
- if (consumers.size() == 0)
+ if (consumerSet.isEmpty())
{
DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
@@ -353,7 +354,7 @@
redistributor = null;
- distributionPolicy.removeConsumer(redistributor);
+ removeHandlerGivenConsumer(redistributor);
}
if (future != null)
@@ -366,14 +367,42 @@
public synchronized int getConsumerCount()
{
- return consumers.size();
+ return consumerSet.size();
}
public synchronized Set<Consumer> getConsumers()
{
- return consumers;
+ return consumerSet;
}
+ public synchronized boolean hasMatchingConsumer(final ServerMessage message)
+ {
+ for (MessageHandler handler : handlers)
+ {
+ Consumer consumer = handler.getConsumer();
+
+ if (consumer instanceof Redistributor)
+ {
+ continue;
+ }
+
+ Filter filter = consumer.getFilter();
+
+ if (filter == null)
+ {
+ return true;
+ }
+ else
+ {
+ if (filter.match(message))
+ {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
public Iterator<MessageReference> iterator()
{
return new Iterator<MessageReference>()
@@ -604,16 +633,6 @@
deliveringCount.incrementAndGet();
}
- public Distributor getDistributionPolicy()
- {
- return distributionPolicy;
- }
-
- public void setDistributionPolicy(final Distributor distributionPolicy)
- {
- this.distributionPolicy = distributionPolicy;
- }
-
public int getMessagesAdded()
{
return messagesAdded.get();
@@ -862,14 +881,37 @@
// Private
// ------------------------------------------------------------------------------
+ private boolean removeHandlerGivenConsumer(final Consumer consumer)
+ {
+ Iterator<MessageHandler> iter = handlers.iterator();
+
+ boolean removed = false;
+
+ while (iter.hasNext())
+ {
+ MessageHandler handler = iter.next();
+
+ if (handler.getConsumer() == consumer)
+ {
+ iter.remove();
+
+ removed = true;
+
+ break;
+ }
+ }
+
+ return removed;
+ }
+
private void internalAddRedistributor(final Executor executor)
{
// create the redistributor only once if there are no local consumers
- if (consumers.size() == 0 && redistributor == null)
+ if (consumerSet.isEmpty() && redistributor == null)
{
redistributor = new Redistributor(this, storageManager, postOffice, executor,
REDISTRIBUTOR_BATCH_SIZE);
- distributionPolicy.addConsumer(redistributor);
+ handlers.add(new NullFilterMessageHandler(redistributor));
redistributor.start();
@@ -979,6 +1021,7 @@
private void sendToDeadLetterAddress(final MessageReference ref) throws Exception
{
SimpleString deadLetterAddress =
addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress();
+
if (deadLetterAddress != null)
{
Bindings bindingList = postOffice.getBindingsForAddress(deadLetterAddress);
@@ -1021,122 +1064,222 @@
tx.commit();
}
+ private MessageHandler getHandlerRoundRobin()
+ {
+ MessageHandler handler = handlers.get(pos);
+
+ pos++;
+
+ if (pos == handlers.size())
+ {
+ pos = 0;
+ }
+
+ return handler;
+ }
+
+ private boolean checkExpired(final MessageReference reference)
+ {
+ if (reference.getMessage().isExpired())
+ {
+ reference.handled();
+
+ try
+ {
+ expire(reference);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to expire ref", e);
+ }
+
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
/*
* Attempt to deliver all the messages in the queue
*/
private synchronized void deliver()
{
- if (paused)
+ if (paused || handlers.isEmpty())
{
return;
}
direct = false;
- if (distributionPolicy.getConsumerCount() == 0)
- {
- return;
- }
-
- Consumer consumer;
-
- MessageReference reference;
-
- // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
- int totalConsumers = distributionPolicy.getConsumerCount();
- Set<Consumer> busyConsumers = new HashSet<Consumer>();
- Set<Consumer> nullReferences = new HashSet<Consumer>();
-
+ int startPos = pos;
+ int totalCount = handlers.size();
+ int nullCount = 0;
+ int busyCount = 0;
while (true)
{
- consumer = distributionPolicy.getNextConsumer();
+ MessageHandler handler = getHandlerRoundRobin();
- MessageHandler handler = messageHandlers.get(consumer);
+ Consumer consumer = handler.getConsumer();
- if (handler == null)
- {
- handler = globalHandler;
- }
+ MessageReference reference = handler.peek(consumer);
- reference = handler.peek(consumer);
-
if (reference == null)
{
- nullReferences.add(consumer);
-
- if (nullReferences.size() + busyConsumers.size() == totalConsumers)
- {
- // We delivered all the messages - go into direct delivery
- direct = true;
-
- promptDelivery = false;
-
- return;
- }
-
- continue;
+ nullCount++;
}
else
{
- nullReferences.remove(consumer);
-
- if (reference.getMessage().isExpired())
+ if (checkExpired(reference))
{
- // We expire messages on the server too
handler.remove();
+ }
+ else
+ {
+ final SimpleString groupID =
reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
- reference.handled();
+ boolean tryHandle = true;
- try
+ if (groupID != null)
{
- expire(reference);
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ tryHandle = false;
+
+ busyCount++;
+ }
}
- catch (Exception e)
+
+ if (tryHandle)
{
- log.error("Failed to expire ref", e);
+ HandleStatus status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ handler.remove();
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ busyCount++;
+
+ handler.reset();
+
+ // if (groupID != null )
+ // {
+ // // group id being set seems to make delivery stop
+ // // FIXME !!! why??
+ // break;
+ // }
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ // if consumer filter reject the message make sure it won't be
assigned the message group
+ if (groupID != null)
+ {
+ groups.remove(groupID);
+ }
+ }
}
-
- continue;
}
}
- final SimpleString groupID =
reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
-
- if (groupID != null)
+ if (pos == startPos)
{
- Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+ // We've done all the consumers
- if (groupConsumer != null && groupConsumer != consumer)
+ if (nullCount + busyCount == totalCount)
{
- continue;
+ if (nullCount == totalCount)
+ {
+ // We delivered all the messages - go into direct delivery
+ direct = true;
+
+ promptDelivery = false;
+ }
+
+ break;
}
+
+ nullCount = busyCount = 0;
}
+ }
+ }
- HandleStatus status = handle(reference, consumer);
+ private synchronized boolean directDeliver(final MessageReference reference)
+ {
+ if (paused || handlers.isEmpty())
+ {
+ return false;
+ }
- if (status == HandleStatus.HANDLED)
+ int startPos = pos;
+ int busyCount = 0;
+ boolean setPromptDelivery = false;
+ while (true)
+ {
+ MessageHandler handler = getHandlerRoundRobin();
+
+ Consumer consumer = handler.getConsumer();
+
+ if (!checkExpired(reference))
{
- handler.remove();
- }
- else if (status == HandleStatus.BUSY)
- {
- busyConsumers.add(consumer);
+ SimpleString groupID =
reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
- handler.reset();
+ boolean tryHandle = true;
- if (groupID != null || busyConsumers.size() == totalConsumers)
+ if (groupID != null)
{
- // when all consumers are busy, we stop
- break;
+ Consumer groupConsumer = groups.putIfAbsent(groupID, consumer);
+
+ if (groupConsumer != null && groupConsumer != consumer)
+ {
+ tryHandle = false;
+ }
}
+
+ if (tryHandle)
+ {
+ HandleStatus status = handle(reference, consumer);
+
+ if (status == HandleStatus.HANDLED)
+ {
+ return true;
+ }
+ else if (status == HandleStatus.BUSY)
+ {
+ busyCount++;
+
+ if (groupID != null)
+ {
+ // If the group has been assigned a consumer there is no point in
trying others
+
+ return false;
+ }
+ }
+ else if (status == HandleStatus.NO_MATCH)
+ {
+ // if consumer filter reject the message make sure it won't be
assigned the message group
+ if (groupID != null)
+ {
+ groups.remove(groupID);
+ }
+
+ setPromptDelivery = true;
+ }
+ }
}
- else if (status == HandleStatus.NO_MATCH)
+
+ if (pos == startPos)
{
- // if consumer filter reject the message make sure it won't be assigned
the message group
- if (groupID != null)
+ if (setPromptDelivery)
{
- groups.remove(consumer);
+ promptDelivery = true;
}
+
+ return false;
}
}
}
@@ -1159,23 +1302,12 @@
{
// Deliver directly
- HandleStatus status = directDeliver(ref);
+ boolean delivered = directDeliver(ref);
- if (status == HandleStatus.HANDLED)
+ if (!delivered)
{
- // Ok
- }
- else if (status == HandleStatus.BUSY)
- {
add = true;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- add = true;
- }
- if (add)
- {
direct = false;
}
}
@@ -1213,81 +1345,6 @@
}
}
- private synchronized HandleStatus directDeliver(final MessageReference reference)
- {
- if (distributionPolicy.getConsumerCount() == 0)
- {
- return HandleStatus.BUSY;
- }
-
- HandleStatus status;
-
- boolean filterRejected = false;
-
- int consumerCount = 0;
-
- while (true)
- {
- Consumer consumer = distributionPolicy.getNextConsumer();
- consumerCount++;
-
- final SimpleString groupId =
reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
-
- if (groupId != null)
- {
- Consumer groupConsumer = groups.putIfAbsent(groupId, consumer);
- if (groupConsumer != null && groupConsumer != consumer)
- {
- continue;
- }
- }
-
- status = handle(reference, consumer);
-
- if (status == HandleStatus.HANDLED)
- {
- break;
- }
- else if (status == HandleStatus.NO_MATCH)
- {
- filterRejected = true;
- if (groupId != null)
- {
- groups.remove(consumer);
- }
- }
- else if (status == HandleStatus.BUSY)
- {
- if (groupId != null)
- {
- break;
- }
- }
- // if we've tried all of them
- if (consumerCount == distributionPolicy.getConsumerCount())
- {
- if (filterRejected)
- {
- status = HandleStatus.NO_MATCH;
- break;
- }
- else
- {
- // Give up - all consumers busy
- status = HandleStatus.BUSY;
- break;
- }
- }
- }
-
- if (status == HandleStatus.NO_MATCH)
- {
- promptDelivery = true;
- }
-
- return status;
- }
-
private synchronized HandleStatus handle(final MessageReference reference, final
Consumer consumer)
{
HandleStatus status;
@@ -1392,7 +1449,6 @@
{
public void run()
{
-
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
@@ -1408,7 +1464,7 @@
}
}
- final class RefsOperation implements TransactionOperation
+ private final class RefsOperation implements TransactionOperation
{
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
@@ -1523,21 +1579,38 @@
void remove();
void reset();
+
+ Consumer getConsumer();
}
private class FilterMessageHandler implements MessageHandler
{
+ private final Consumer consumer;
+
private Iterator<MessageReference> iterator;
- public FilterMessageHandler(final Iterator<MessageReference> iterator)
+ private MessageReference lastReference;
+
+ private boolean resetting;
+
+ public FilterMessageHandler(final Consumer consumer, final
Iterator<MessageReference> iterator)
{
+ this.consumer = consumer;
+
this.iterator = iterator;
}
public MessageReference peek(final Consumer consumer)
{
+ if (resetting)
+ {
+ resetting = false;
+
+ return lastReference;
+ }
+
MessageReference reference;
-
+
if (iterator.hasNext())
{
reference = iterator.next();
@@ -1554,6 +1627,8 @@
iterator = messageReferences.iterator();
}
}
+ lastReference = reference;
+
return reference;
}
@@ -1564,12 +1639,24 @@
public void reset()
{
- iterator = messageReferences.iterator();
+ resetting = true;
}
+
+ public Consumer getConsumer()
+ {
+ return consumer;
+ }
}
private class NullFilterMessageHandler implements MessageHandler
{
+ private final Consumer consumer;
+
+ NullFilterMessageHandler(final Consumer consumer)
+ {
+ this.consumer = consumer;
+ }
+
public MessageReference peek(final Consumer consumer)
{
return messageReferences.peekFirst();
@@ -1584,5 +1671,10 @@
{
// no-op
}
+
+ public Consumer getConsumer()
+ {
+ return consumer;
+ }
}
}
Deleted: trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/RoundRobinDistributor.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -1,68 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.server.impl;
-
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Consumer;
-
-/**
- * A RoundRobinDistributor
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-public class RoundRobinDistributor extends DistributorImpl
-{
- private static final Logger log = Logger.getLogger(RoundRobinDistributor.class);
-
- protected int pos = 0;
-
- @Override
- public synchronized void addConsumer(final Consumer consumer)
- {
- pos = 0;
- super.addConsumer(consumer);
- }
-
- @Override
- public synchronized boolean removeConsumer(final Consumer consumer)
- {
- pos = 0;
- return super.removeConsumer(consumer);
- }
-
- @Override
- public synchronized int getConsumerCount()
- {
- return super.getConsumerCount();
- }
-
- public synchronized Consumer getNextConsumer()
- {
- Consumer consumer = consumers.get(pos);
- incrementPosition();
- return consumer;
- }
-
- private synchronized void incrementPosition()
- {
- pos++;
-
- if (pos == consumers.size())
- {
- pos = 0;
- }
- }
-}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-13 20:52:21
UTC (rev 8283)
@@ -1728,6 +1728,7 @@
}
});
+
storageManager.completeReplication();
}
else
@@ -1750,6 +1751,7 @@
if (confirmPacket != null)
{
channel.confirm(confirmPacket);
+
if (flush)
{
channel.flushConfirmations();
Modified: trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java
===================================================================
--- trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-11-13 19:14:02
UTC (rev 8282)
+++ trunk/src/main/org/hornetq/core/settings/impl/AddressSettings.java 2009-11-13 20:52:21
UTC (rev 8283)
@@ -14,8 +14,6 @@
package org.hornetq.core.settings.impl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.Distributor;
-import org.hornetq.core.server.impl.RoundRobinDistributor;
import org.hornetq.core.settings.Mergeable;
import org.hornetq.utils.SimpleString;
@@ -32,14 +30,12 @@
/**
* defaults used if null, this allows merging
*/
- public static final Class<?> DEFAULT_DISTRIBUTION_POLICY_CLASS = new
RoundRobinDistributor().getClass();
-
public static final int DEFAULT_MAX_SIZE_BYTES = -1;
public static final AddressFullMessagePolicy DEFAULT_ADDRESS_FULL_MESSAGE_POLICY =
AddressFullMessagePolicy.PAGE;
public static final int DEFAULT_PAGE_SIZE = 10 * 1024 * 1024;
-
+
public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;
public static final int DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT = 0;
@@ -60,8 +56,6 @@
private Boolean dropMessagesWhenFull = null;
- private String distributionPolicyClass = null;
-
private Integer maxDeliveryAttempts = null;
private Integer messageCounterHistoryDayLimit = null;
@@ -149,16 +143,6 @@
this.redeliveryDelay = redeliveryDelay;
}
- public String getDistributionPolicyClass()
- {
- return distributionPolicyClass;
- }
-
- public void setDistributionPolicyClass(final String distributionPolicyClass)
- {
- this.distributionPolicyClass = distributionPolicyClass;
- }
-
public SimpleString getDeadLetterAddress()
{
return deadLetterAddress;
@@ -189,25 +173,6 @@
sendToDLAOnNoRoute = value;
}
- public Distributor getDistributionPolicy()
- {
- try
- {
- if (distributionPolicyClass != null)
- {
- return
(Distributor)getClass().getClassLoader().loadClass(distributionPolicyClass).newInstance();
- }
- else
- {
- return (Distributor)DEFAULT_DISTRIBUTION_POLICY_CLASS.newInstance();
- }
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating distribution policy
'" + e + " '");
- }
- }
-
public long getRedistributionDelay()
{
return redistributionDelay != null ? redistributionDelay :
DEFAULT_REDISTRIBUTION_DELAY;
@@ -248,10 +213,6 @@
{
redeliveryDelay = merged.redeliveryDelay;
}
- if (distributionPolicyClass == null)
- {
- distributionPolicyClass = merged.distributionPolicyClass;
- }
if (deadLetterAddress == null)
{
deadLetterAddress = merged.deadLetterAddress;
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -12,7 +12,7 @@
*/
package org.hornetq.tests.integration.client;
-import java.util.Set;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -911,7 +911,7 @@
for (Binding binding : bindings.getBindings())
{
- Set<Consumer> consumers =
((QueueBinding)binding).getQueue().getConsumers();
+ Collection<Consumer> consumers =
((QueueBinding)binding).getQueue().getConsumers();
for (Consumer consumer : consumers)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -29,6 +29,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -41,6 +42,8 @@
*/
public class MessageGroupingTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(MessageGroupingTest.class);
+
private HornetQServer server;
private ClientSession clientSession;
@@ -85,13 +88,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -130,13 +133,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -178,18 +181,18 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
- for(int i = 0; i < numMessages/2; i++)
+ for (int i = 0; i < numMessages / 2; i++)
{
ClientMessage cm = consumer.receive(500);
assertNotNull(cm);
@@ -199,18 +202,25 @@
assertNotNull(cm);
assertEquals(cm.getBody().readString(), "m" + i);
}
-
+
+ log.info("closing consumers");
+
consumer2.close();
+
+ log.info("closed consumer 2");
+
consumer.close();
- //check that within their groups the messages are still in the correct order
+
+ log.info("closed consuemrs");
+ // check that within their groups the messages are still in the correct order
consumer = clientSession.createConsumer(qName);
- for(int i = 0; i < numMessages; i+=2)
+ for (int i = 0; i < numMessages; i += 2)
{
ClientMessage cm = consumer.receive(500);
assertNotNull(cm);
assertEquals(cm.getBody().readString(), "m" + i);
}
- for(int i = 1; i < numMessages; i+=2)
+ for (int i = 1; i < numMessages; i += 2)
{
ClientMessage cm = consumer.receive(500);
assertNotNull(cm);
@@ -230,13 +240,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -269,13 +279,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -322,13 +332,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -393,13 +403,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -451,13 +461,13 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, clientSession);
- if( i % 2 == 0 || i == 0)
+ if (i % 2 == 0 || i == 0)
{
message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
}
else
{
- message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId2);
}
clientProducer.send(message);
}
@@ -536,14 +546,14 @@
}
server = null;
clientSession = null;
-
+
super.tearDown();
}
protected void setUp() throws Exception
{
super.setUp();
-
+
ConfigurationImpl configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
TransportConfiguration transportConfig = new
TransportConfiguration(INVM_ACCEPTOR_FACTORY);
@@ -578,13 +588,13 @@
if (acknowledge)
{
try
- {
- message.acknowledge();
- }
- catch (HornetQException e)
{
- //ignore
+ message.acknowledge();
}
+ catch (HornetQException e)
+ {
+ // ignore
+ }
}
latch.countDown();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/tests/src/org/hornetq/tests/integration/client/SessionStopStartTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -149,7 +149,7 @@
session.close();
}
-
+
public void testStopStartConsumerAsyncSyncStoppedByHandler() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -182,6 +182,8 @@
boolean failed;
boolean started = true;
+
+ int count = 0;
public void onMessage(final ClientMessage message)
{
@@ -192,16 +194,17 @@
{
failed = true;
}
-
- latch.countDown();
-
- if (latch.getCount() == 0)
+
+ count++;
+
+ if (count == 10)
{
-
message.acknowledge();
session.stop();
started = false;
}
+
+ latch.countDown();
}
catch (Exception e)
{
@@ -215,8 +218,6 @@
latch.await();
- Thread.sleep(100);
-
assertFalse(handler.failed);
// Make sure no exceptions were thrown from onMessage
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -281,7 +281,7 @@
csf.close();
}
-
+
/*
* Test the client triggering failure due to no ping from server received in time
*/
@@ -325,7 +325,7 @@
//Setting the handler to null will prevent server sending pings back to client
serverConn.getChannel(0, -1).setHandler(null);
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < 2000; i++)
{
// a few tries to avoid a possible race caused by GCs or similar issues
if (server.getRemotingService().getConnections().isEmpty() &&
clientListener.getException() != null)
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/AddressSettingsDeployerTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -33,8 +33,7 @@
"
<dead-letter-address>DLQtest</dead-letter-address>\n" +
"
<expiry-address>ExpiryQueueTest</expiry-address>\n" +
" <redelivery-delay>100</redelivery-delay>\n" +
- " <max-size-bytes>-100</max-size-bytes>\n" +
- "
<distribution-policy-class>org.hornetq.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>\n"
+
+ " <max-size-bytes>-100</max-size-bytes>\n" +
"
<message-counter-history-day-limit>1000</message-counter-history-day-limit>\n"
+
" </address-settings>";
@@ -57,8 +56,7 @@
AddressSettings as = repository.getMatch("queues.aq");
assertNotNull(as);
assertEquals(100, as.getRedeliveryDelay());
- assertEquals(-100, as.getMaxSizeBytes());
- assertEquals("org.hornetq.core.impl.RoundRobinDistributionPolicy",
as.getDistributionPolicyClass());
+ assertEquals(-100, as.getMaxSizeBytes());
assertEquals(1000, as.getMessageCounterHistoryDayLimit());
assertEquals(new SimpleString("DLQtest"), as.getDeadLetterAddress());
assertEquals(new SimpleString("ExpiryQueueTest"),
as.getExpiryAddress());
@@ -72,8 +70,7 @@
+ "
<dead-letter-address>DLQtest</dead-letter-address>\n"
+ "
<expiry-address>ExpiryQueueTest</expiry-address>\n"
+ "
<redelivery-delay>100</redelivery-delay>\n"
- + " <max-size-bytes>-100</max-size-bytes>\n"
- + "
<distribution-policy-class>org.hornetq.core.impl.RoundRobinDistributionPolicy</distribution-policy-class>"
+ + " <max-size-bytes>-100</max-size-bytes>\n"
+ "
<message-counter-history-day-limit>1000</message-counter-history-day-limit>"
+ " </address-setting>"
+ "</address-settings>"
@@ -88,8 +85,7 @@
AddressSettings as = repository.getMatch("queues.aq");
assertNotNull(as);
assertEquals(100, as.getRedeliveryDelay());
- assertEquals(-100, as.getMaxSizeBytes());
- assertEquals("org.hornetq.core.impl.RoundRobinDistributionPolicy",
as.getDistributionPolicyClass());
+ assertEquals(-100, as.getMaxSizeBytes());
assertEquals(1000, as.getMessageCounterHistoryDayLimit());
assertEquals(new SimpleString("DLQtest"), as.getDeadLetterAddress());
assertEquals(new SimpleString("ExpiryQueueTest"),
as.getExpiryAddress());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -20,7 +20,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
@@ -31,7 +30,7 @@
public class FakeQueue implements Queue
{
private SimpleString name;
-
+
public FakeQueue(SimpleString name)
{
this.name = name;
@@ -43,7 +42,7 @@
public void acknowledge(MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -52,7 +51,7 @@
public void acknowledge(Transaction tx, MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -61,7 +60,7 @@
public void addConsumer(Consumer consumer) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -70,7 +69,7 @@
public void addFirst(MessageReference ref)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -79,7 +78,7 @@
public void addLast(MessageReference ref)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -88,7 +87,7 @@
public void addRedistributor(long delay, Executor executor)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -97,7 +96,7 @@
public void cancel(MessageReference reference) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -106,7 +105,7 @@
public void cancel(Transaction tx, MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -115,7 +114,7 @@
public void cancelRedistributor() throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -169,7 +168,7 @@
public void deliverAsync(Executor executor)
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -178,7 +177,7 @@
public void deliverNow()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -187,7 +186,7 @@
public void expire(MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -205,7 +204,7 @@
public void expireReferences() throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -245,15 +244,6 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#getDistributionPolicy()
- */
- public Distributor getDistributionPolicy()
- {
- // TODO Auto-generated method stub
- return null;
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#getFilter()
*/
public Filter getFilter()
@@ -375,7 +365,7 @@
public void lockDelivery()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -402,7 +392,7 @@
public void pause()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -411,7 +401,7 @@
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -420,7 +410,7 @@
public void referenceHandled()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -456,7 +446,7 @@
public void resume()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -469,32 +459,23 @@
}
/* (non-Javadoc)
- * @see
org.hornetq.core.server.Queue#setDistributionPolicy(org.hornetq.core.server.Distributor)
- */
- public void setDistributionPolicy(Distributor policy)
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
* @see
org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
*/
public void setExpiryAddress(SimpleString expiryAddress)
{
// TODO Auto-generated method stub
-
+
}
- // TODO Auto-generated method stub
-
+ // TODO Auto-generated method stub
+
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#unlockDelivery()
*/
public void unlockDelivery()
{
// TODO Auto-generated method stub
-
+
}
/* (non-Javadoc)
@@ -503,9 +484,13 @@
public void route(ServerMessage message, RoutingContext context) throws Exception
{
// TODO Auto-generated method stub
-
+
}
-
-
+ public boolean hasMatchingConsumer(ServerMessage message)
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -23,13 +23,11 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.server.Consumer;
-import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.QueueImpl;
-import org.hornetq.core.server.impl.RoundRobinDistributor;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
@@ -118,21 +116,6 @@
assertFalse(queue.removeConsumer(cons3));
}
- public void testGetSetDistributionPolicy()
- {
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
-
- assertNotNull(queue.getDistributionPolicy());
-
- assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
- Distributor policy = new DummyDistributionPolicy();
-
- queue.setDistributionPolicy(policy);
-
- assertEquals(policy, queue.getDistributionPolicy());
- }
-
public void testGetFilter()
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
@@ -608,8 +591,6 @@
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
- assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -653,8 +634,6 @@
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
- assertTrue(queue.getDistributionPolicy() instanceof RoundRobinDistributor);
-
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -943,7 +922,7 @@
int currId = 0;
for (MessageReference receeivedRef : receeivedRefs)
{
- assertEquals("messages received out of order",
receeivedRef.getMessage().getMessageID() , currId++);
+ assertEquals("messages received out of order",
receeivedRef.getMessage().getMessageID(), currId++);
}
}
@@ -1017,7 +996,7 @@
int currId = 10;
for (MessageReference receeivedRef : receeivedRefs)
{
- assertEquals("messages received out of order",
receeivedRef.getMessage().getMessageID() , currId++);
+ assertEquals("messages received out of order",
receeivedRef.getMessage().getMessageID(), currId++);
}
}
@@ -1248,7 +1227,7 @@
public void testPauseAndResumeWithAsync() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
-
+
// pauses the queue
queue.pause();
@@ -1377,34 +1356,4 @@
}
}
- class DummyDistributionPolicy implements Distributor
- {
- Consumer consumer;
-
- public List<Consumer> getConsumers()
- {
- return null;
- }
-
- public void addConsumer(Consumer consumer)
- {
- this.consumer = consumer;
- }
-
- public boolean removeConsumer(Consumer consumer)
- {
- return false;
- }
-
- public int getConsumerCount()
- {
- return 0;
- }
-
- public Consumer getNextConsumer()
- {
- return consumer;
- }
- }
-
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java 2009-11-13
19:14:02 UTC (rev 8282)
+++
trunk/tests/src/org/hornetq/tests/unit/core/settings/impl/AddressSettingsTest.java 2009-11-13
20:52:21 UTC (rev 8283)
@@ -9,7 +9,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
- */
+ */
package org.hornetq.tests.unit.core.settings.impl;
@@ -26,14 +26,13 @@
public void testDefaults()
{
AddressSettings addressSettings = new AddressSettings();
- assertEquals(AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS,
addressSettings.getDistributionPolicy().getClass());
- assertEquals(null, addressSettings.getDistributionPolicyClass());
assertEquals(null, addressSettings.getDeadLetterAddress());
assertEquals(null, addressSettings.getExpiryAddress());
assertEquals(AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,
addressSettings.getMaxDeliveryAttempts());
assertEquals(addressSettings.getMaxSizeBytes(),
AddressSettings.DEFAULT_MAX_SIZE_BYTES);
assertEquals(AddressSettings.DEFAULT_PAGE_SIZE,
addressSettings.getPageSizeBytes());
- assertEquals(AddressSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT,
addressSettings.getMessageCounterHistoryDayLimit());
+ assertEquals(AddressSettings.DEFAULT_MESSAGE_COUNTER_HISTORY_DAY_LIMIT,
+ addressSettings.getMessageCounterHistoryDayLimit());
assertEquals(AddressSettings.DEFAULT_REDELIVER_DELAY,
addressSettings.getRedeliveryDelay());
}
@@ -53,8 +52,6 @@
addressSettingsToMerge.setRedeliveryDelay((long)1003);
addressSettingsToMerge.setPageSizeBytes(1004);
addressSettings.merge(addressSettingsToMerge);
- assertEquals(addressSettings.getDistributionPolicy().getClass(),
AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
- assertEquals(addressSettings.getDistributionPolicyClass(), null);
assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
assertEquals(addressSettings.getExpiryAddress(), exp);
assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
@@ -86,8 +83,6 @@
addressSettingsToMerge2.setRedeliveryDelay((long)2003);
addressSettings.merge(addressSettingsToMerge2);
- assertEquals(addressSettings.getDistributionPolicy().getClass(),
AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
- assertEquals(addressSettings.getDistributionPolicyClass(), null);
assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
assertEquals(addressSettings.getExpiryAddress(), exp);
assertEquals(addressSettings.getMaxDeliveryAttempts(), 1000);
@@ -122,14 +117,12 @@
addressSettingsToMerge.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
addressSettings.merge(addressSettingsToMerge2);
- assertEquals(addressSettings.getDistributionPolicy().getClass(),
AddressSettings.DEFAULT_DISTRIBUTION_POLICY_CLASS);
- assertEquals(addressSettings.getDistributionPolicyClass(), null);
assertEquals(addressSettings.getDeadLetterAddress(), DLQ);
assertEquals(addressSettings.getExpiryAddress(), exp);
assertEquals(addressSettings.getMaxDeliveryAttempts(), 2000);
assertEquals(addressSettings.getMaxSizeBytes(), 1001);
assertEquals(addressSettings.getMessageCounterHistoryDayLimit(), 2002);
- assertEquals(addressSettings.getRedeliveryDelay(),1003);
+ assertEquals(addressSettings.getRedeliveryDelay(), 1003);
assertEquals(AddressFullMessagePolicy.DROP,
addressSettings.getAddressFullMessagePolicy());
}
}