Author: ataylor
Date: 2009-09-11 10:22:36 -0400 (Fri, 11 Sep 2009)
New Revision: 7955
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
Modified:
branches/hornetq_grouping/examples/core/hornetq-core-examples.iml
branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml
branches/hornetq_grouping/hornetq.ipr
branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java
branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/hornetq_grouping/tests/hornetq-tests.iml
branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml
branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
merged in JBM branch
Modified: branches/hornetq_grouping/examples/core/hornetq-core-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/core/hornetq-core-examples.iml 2009-09-11 10:56:25
UTC (rev 7954)
+++ branches/hornetq_grouping/examples/core/hornetq-core-examples.iml 2009-09-11 14:22:36
UTC (rev 7955)
@@ -1,6 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
<component name="NewModuleRootManager"
inherit-compiler-output="true">
+ <output url="file:///production/hornetq-core-examples" />
+ <output-test url="file:///test/hornetq-core-examples" />
<exclude-output />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/embedded/src"
isTestSource="false" />
Modified: branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml 2009-09-11
10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/examples/javaee/hornetq-javaee-examples.iml 2009-09-11
14:22:36 UTC (rev 7955)
@@ -1,91 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="FacetManager">
- <facet type="web" name="Web">
- <configuration>
- <descriptors>
- <deploymentDescriptor name="web.xml"
url="file://$MODULE_DIR$/servlet-transport/config/WEB-INF/web.xml"
optional="false" version="2.5" />
- </descriptors>
- <webroots>
- <root
url="file://$MODULE_DIR$/servlet-transport/config/jms-servlet.war"
relative="/" />
- </webroots>
- <sourceRoots>
- <root url="file://$MODULE_DIR$/ejb-jms-transaction/src" />
- <root url="file://$MODULE_DIR$/hajndi/src" />
- <root url="file://$MODULE_DIR$/jca-config/src" />
- <root url="file://$MODULE_DIR$/jms-bridge/src" />
- <root url="file://$MODULE_DIR$/mdb/src" />
- <root url="file://$MODULE_DIR$/servlet-transport/src" />
- <root url="file://$MODULE_DIR$/xarecovery/src" />
- </sourceRoots>
- <building>
- <setting name="EXPLODED_URL" value="file://" />
- <setting name="EXPLODED_ENABLED" value="false" />
- <setting name="JAR_URL" value="file://" />
- <setting name="JAR_ENABLED" value="false" />
- <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true"
/>
- </building>
- <packaging>
- <containerElement type="module"
name="messaging-javaee-examples">
- <attribute name="method" value="1" />
- <attribute name="URI" value="/WEB-INF/classes" />
- </containerElement>
- </packaging>
- </configuration>
- </facet>
- <facet type="javaeeApplication" name="javaEEApplication">
- <configuration>
- <descriptors>
- <deploymentDescriptor name="application.xml"
url="file://$MODULE_DIR$/servlet-transport/config/META-INF/application.xml"
optional="false" version="5" />
- </descriptors>
- <building>
- <setting name="EXPLODED_URL" value="file://" />
- <setting name="EXPLODED_ENABLED" value="false" />
- <setting name="JAR_URL" value="file://" />
- <setting name="JAR_ENABLED" value="false" />
- <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true"
/>
- </building>
- </configuration>
- </facet>
- <facet type="javaeeApplication" name="javaEEApplication2">
- <configuration>
- <descriptors>
- <deploymentDescriptor name="application.xml"
url="file://$MODULE_DIR$/servlet-ssl-example/config/META-INF/application.xml"
optional="false" version="5" />
- </descriptors>
- <building>
- <setting name="EXPLODED_URL" value="file://" />
- <setting name="EXPLODED_ENABLED" value="false" />
- <setting name="JAR_URL" value="file://" />
- <setting name="JAR_ENABLED" value="false" />
- <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true"
/>
- </building>
- </configuration>
- </facet>
- <facet type="web" name="Web2">
- <configuration>
- <descriptors>
- <deploymentDescriptor name="web.xml"
url="file://$MODULE_DIR$/servlet-ssl-example/config/WEB-INF/web.xml"
optional="false" version="2.5" />
- </descriptors>
- <webroots>
- <root url="file://$MODULE_DIR$/servlet-ssl-example/config"
relative="/" />
- </webroots>
- <building>
- <setting name="EXPLODED_URL" value="file://" />
- <setting name="EXPLODED_ENABLED" value="false" />
- <setting name="JAR_URL" value="file://" />
- <setting name="JAR_ENABLED" value="false" />
- <setting name="EXCLUDE_EXPLODED_DIRECTORY" value="true"
/>
- </building>
- <packaging>
- <containerElement type="module"
name="messaging-javaee-examples">
- <attribute name="method" value="1" />
- <attribute name="URI" value="/WEB-INF/classes" />
- </containerElement>
- </packaging>
- </configuration>
- </facet>
- </component>
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<output url="file://$MODULE_DIR$/output/classes" />
<exclude-output />
<content url="file://$MODULE_DIR$">
Modified: branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml
===================================================================
--- branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml 2009-09-11 10:56:25
UTC (rev 7954)
+++ branches/hornetq_grouping/examples/jms/hornetq-jms-examples.iml 2009-09-11 14:22:36
UTC (rev 7955)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<output url="file://$MODULE_DIR$/output/classes" />
<output-test url="file://$MODULE_DIR$/output" />
<exclude-output />
Modified: branches/hornetq_grouping/hornetq.ipr
===================================================================
--- branches/hornetq_grouping/hornetq.ipr 2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/hornetq.ipr 2009-09-11 14:22:36 UTC (rev 7955)
@@ -336,11 +336,21 @@
<facet-type id="javaeeApplication">
<modules>
<module name="messaging" />
+ <module name="hornetq-javaee-examples">
+ <files>
+ <file
url="file://$PROJECT_DIR$/examples/javaee/servlet-transport/config/META-INF/application.xml"
/>
+ </files>
+ </module>
</modules>
</facet-type>
<facet-type id="web">
<modules>
<module name="messaging" />
+ <module name="hornetq-javaee-examples">
+ <files>
+ <file
url="file://$PROJECT_DIR$/examples/javaee/servlet-transport/config/WEB-INF/web.xml"
/>
+ </files>
+ </module>
</modules>
</facet-type>
</autodetection-disabled>
@@ -579,7 +589,7 @@
</component>
<component name="ProjectFileVersion" converted="true" />
<component name="ProjectKey">
- <option name="state"
value="https://svn.jboss.org/repos/hornetq/trunk/hornetq.ipr" />
+ <option name="state"
value="https://svn.jboss.org/repos/hornetq/branches/hornetq_grouping...
/>
</component>
<component name="ProjectModuleManager">
<modules>
@@ -592,7 +602,9 @@
<module fileurl="file://$PROJECT_DIR$/tests/hornetq-tests.iml"
filepath="$PROJECT_DIR$/tests/hornetq-tests.iml" />
</modules>
</component>
- <component name="ProjectRootManager" version="2"
languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true"
project-jdk-name="1.5" project-jdk-type="JavaSDK" />
+ <component name="ProjectRootManager" version="2"
languageLevel="JDK_1_5" assert-keyword="true" jdk-15="true"
project-jdk-name="1.5" project-jdk-type="JavaSDK">
+ <output url="file://$PROJECT_DIR$/build/classes" />
+ </component>
<component name="ResourceManagerContainer">
<option name="myResourceBundles">
<value>
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/client/management/impl/ManagementHelper.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -71,6 +71,12 @@
public static final SimpleString HDR_CHECK_TYPE = new
SimpleString("_HQ_CheckType");
+ public static final SimpleString HDR_PROPOSAL_TYPE = new
SimpleString("_JBM_ProposalType");
+
+ public static final SimpleString HDR_PROPOSAL_VALUE = new
SimpleString("_JBM_ProposalValue");
+
+ public static final SimpleString HDR_PROPOSAL_ALT_VALUE = new
SimpleString("_JBM_ProposalAltValue");
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/management/NotificationType.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -30,7 +30,9 @@
BROADCAST_GROUP_STARTED(10),
BROADCAST_GROUP_STOPPED(11),
BRIDGE_STARTED(12),
- BRIDGE_STOPPED(13);
+ BRIDGE_STOPPED(13),
+ PROPOSAL(14),
+ PROPOSAL_RESPONSE(15);
private final int value;
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/BindingsFactory.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.postoffice;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface BindingsFactory
+{
+ Bindings createBindings();
+}
Modified: branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -19,6 +19,7 @@
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.Arbitrator;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -66,5 +67,9 @@
void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws
Exception;
- Object getNotificationLock();
+ Object getNotificationLock();
+
+ void addArbitrator(Arbitrator arbitrator);
+
+ Arbitrator getArbitrator();
}
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -28,9 +28,13 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.Arbitrator;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -38,10 +42,8 @@
* A BindingsImpl
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 11 Dec 2008 08:34:33
- *
- *
+ * <p/>
+ * Created 11 Dec 2008 08:34:33
*/
public class BindingsImpl implements Bindings
{
@@ -57,6 +59,13 @@
private volatile boolean routeWhenNoConsumers;
+ private final PostOffice postOffice;
+
+ public BindingsImpl(PostOffice postOffice)
+ {
+ this.postOffice = postOffice;
+ }
+
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
{
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -125,7 +134,7 @@
private void routeFromCluster(final ServerMessage message, final Transaction tx)
throws Exception
{
- byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
+ byte[] ids = (byte[]) message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
ByteBuffer buff = ByteBuffer.wrap(ids);
@@ -167,7 +176,7 @@
{
return false;
}
-
+
SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -251,10 +260,14 @@
}
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public void route(final ServerMessage message, final Transaction tx) throws
Exception
{
boolean routed = false;
-
+ Object o = message.getProperty("count_prop");
+ if (o != null)
+ {
+ System.out.println("routing message " + o);
+ }
if (!exclusiveBindings.isEmpty())
{
for (Binding binding : exclusiveBindings)
@@ -262,7 +275,7 @@
if (binding.getFilter() == null || binding.getFilter().match(message))
{
binding.getBindable().route(message, tx);
-
+
routed = true;
}
}
@@ -270,10 +283,96 @@
if (!routed)
{
+ Arbitrator groupingArbitrator = postOffice.getArbitrator();
+
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
routeFromCluster(message, tx);
}
+ else if(groupingArbitrator != null &&
message.getProperty(MessageImpl.HDR_GROUP_ID)!= null)
+ {
+ SimpleString groupId = (SimpleString)
message.getProperty(MessageImpl.HDR_GROUP_ID);
+ Response resp = groupingArbitrator.propose(new Proposal(groupId, null));
+ if(resp == null)
+ {
+ for (Map.Entry<SimpleString, List<Binding>> entry :
routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+ Binding chosen = null;
+ Binding lowestPriorityBinding = null;
+ int lowestPriority = Integer.MAX_VALUE;
+ for (Binding binding : bindings)
+ {
+ boolean bindingIsHighAcceptPriority =
binding.isHighAcceptPriority(message);
+ int distance = binding.getDistance();
+ if((distance < lowestPriority))
+ {
+ lowestPriorityBinding = binding;
+ lowestPriority = distance;
+ if(bindingIsHighAcceptPriority)
+ {
+ chosen = binding;
+ }
+ }
+ }
+ if(chosen == null)
+ {
+ chosen = lowestPriorityBinding;
+ }
+ resp = groupingArbitrator.propose(new Proposal(groupId,
chosen.getClusterName()));
+ if(!resp.getChosen().equals(chosen.getClusterName()))
+ {
+ for (Binding binding : bindings)
+ {
+ if (binding.getClusterName().equals(resp.getChosen()))
+ {
+ chosen = binding;
+ break;
+ }
+ }
+ }
+
+ if( chosen != null )
+ {
+ System.out.println("sending message" +
message.getProperty("count_prop") + " to " +
chosen.getClusterName());
+ chosen.willRoute(message);
+ chosen.getBindable().preroute(message, tx);
+ chosen.getBindable().route(message, tx);
+ }
+ }
+ }
+ else
+ {
+ for (Map.Entry<SimpleString, List<Binding>> entry :
routingNameBindingMap.entrySet())
+ {
+ SimpleString routingName = entry.getKey();
+
+ List<Binding> bindings = entry.getValue();
+ Binding chosen = null;
+ for (Binding binding : bindings)
+ {
+ if(binding.getClusterName().equals(resp.getChosen()))
+ {
+ chosen = binding;
+ break;
+ }
+ }
+ if( chosen != null && (routeWhenNoConsumers ||
chosen.isHighAcceptPriority(message)))
+ {
+ System.out.println("found sending message" +
message.getProperty("count_prop") + " to " +
chosen.getClusterName());
+ chosen.willRoute(message);
+ chosen.getBindable().preroute(message, tx);
+ chosen.getBindable().route(message, tx);
+ }
+ else
+ {
+ System.out.println("BindingsImpl.route");
+ }
+ }
+ }
+ }
else
{
Set<Bindable> chosen = new HashSet<Bindable>();
@@ -327,74 +426,74 @@
}
}
- Filter filter = binding.getFilter();
+ Filter filter = binding.getFilter();
- if (filter == null || filter.match(message))
- {
- // bindings.length == 1 ==> only a local queue so we don't
check for matching consumers (it's an
- // unnecessary overhead)
- if (length == 1 || routeWhenNoConsumers ||
binding.isHighAcceptPriority(message))
- {
- theBinding = binding;
+ if (filter == null || filter.match(message))
+ {
+ // bindings.length == 1 ==> only a local queue so we don't check
for matching consumers (it's an
+ // unnecessary overhead)
+ if (length == 1 || routeWhenNoConsumers ||
binding.isHighAcceptPriority(message))
+ {
+ theBinding = binding;
- pos = incrementPos(pos, length);
+ pos = incrementPos(pos, length);
- break;
- }
- else
- {
- if (lastLowPriorityBinding == -1)
- {
- lastLowPriorityBinding = pos;
- }
- }
+ break;
+ }
+ else
+ {
+ if (lastLowPriorityBinding == -1)
+ {
+ lastLowPriorityBinding = pos;
}
+ }
+ }
- pos = incrementPos(pos, length);
+ pos = incrementPos(pos, length);
- if (pos == startPos)
+ if (pos == startPos)
+ {
+ if (lastLowPriorityBinding != -1)
+ {
+ try
{
- if (lastLowPriorityBinding != -1)
+ theBinding = bindings.get(pos);
+ }
+ catch (IndexOutOfBoundsException e)
+ {
+ // This can occur if binding is removed while in route
+ if (!bindings.isEmpty())
{
- try
- {
- theBinding = bindings.get(pos);
- }
- catch (IndexOutOfBoundsException e)
- {
- // This can occur if binding is removed while in route
- if (!bindings.isEmpty())
- {
- pos = 0;
+ pos = 0;
- lastLowPriorityBinding = -1;
+ lastLowPriorityBinding = -1;
- continue;
- }
- else
- {
- break;
- }
- }
-
- pos = lastLowPriorityBinding;
-
- pos = incrementPos(pos, length);
+ continue;
}
- break;
+ else
+ {
+ break;
+ }
}
- }
- if (theBinding != null)
- {
- theBinding.willRoute(message);
+ pos = lastLowPriorityBinding;
- chosen.add(theBinding.getBindable());
+ pos = incrementPos(pos, length);
}
-
- routingNamePositions.put(routingName, pos);
+ break;
}
+ }
+ if (theBinding != null)
+ {
+ theBinding.willRoute(message);
+
+ chosen.add(theBinding.getBindable());
+ }
+
+ routingNamePositions.put(routingName, pos);
+ }
+
// TODO refactor to do this is one iteration
for (Bindable bindable : chosen)
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -45,10 +45,12 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
+import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.Arbitrator;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -69,7 +71,7 @@
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
* @author <a href="csuconic(a)redhat.com">Clebert Suconic</a>
*/
-public class PostOfficeImpl implements PostOffice, NotificationListener
+public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
{
private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
@@ -123,6 +125,8 @@
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
+ private Arbitrator groupingArbitrator;
+
public PostOfficeImpl(final HornetQServer server,
final StorageManager storageManager,
final PagingManager pagingManager,
@@ -154,11 +158,11 @@
if (enableWildCardRouting)
{
- addressManager = new WildcardAddressManager();
+ addressManager = new WildcardAddressManager(this);
}
else
{
- addressManager = new SimpleAddressManager();
+ addressManager = new SimpleAddressManager(this);
}
this.backup = backup;
@@ -553,7 +557,7 @@
if (bindings == null)
{
- bindings = new BindingsImpl();
+ bindings = createBindings();
}
return bindings;
@@ -738,6 +742,17 @@
return notificationLock;
}
+
+ public void addArbitrator(Arbitrator arbitrator)
+ {
+ groupingArbitrator = arbitrator;
+ }
+
+ public Arbitrator getArbitrator()
+ {
+ return groupingArbitrator;
+ }
+
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString
address) throws Exception
{
// We send direct to the queue so we can send it to the same queue that is bound to
the notifications adress -
@@ -1099,4 +1114,9 @@
}
}
}
+
+ public Bindings createBindings()
+ {
+ return new BindingsImpl(this);
+ }
}
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/SimpleAddressManager.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -21,6 +21,7 @@
import org.hornetq.core.postoffice.AddressManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -38,6 +39,13 @@
private final ConcurrentMap<SimpleString, Binding> nameMap = new
ConcurrentHashMap<SimpleString, Binding>();
+ private final BindingsFactory bindingsFactory;
+
+ public SimpleAddressManager(BindingsFactory bindingsFactory)
+ {
+ this.bindingsFactory = bindingsFactory;
+ }
+
public boolean addBinding(final Binding binding)
{
if (nameMap.putIfAbsent(binding.getUniqueName(), binding) != null)
@@ -81,7 +89,7 @@
{
Address add = new AddressImpl(address);
- Bindings bindings = new BindingsImpl();
+ Bindings bindings = bindingsFactory.createBindings();
for (Binding binding: nameMap.values())
{
@@ -150,7 +158,7 @@
if (bindings == null)
{
- bindings = new BindingsImpl();
+ bindings = bindingsFactory.createBindings();
prevBindings = mappings.putIfAbsent(address, bindings);
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/postoffice/impl/WildcardAddressManager.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -21,6 +21,7 @@
import org.hornetq.core.postoffice.Address;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
+import org.hornetq.core.postoffice.BindingsFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -50,6 +51,11 @@
private final Map<SimpleString, Address> wildCardAddresses = new
HashMap<SimpleString, Address>();
+ public WildcardAddressManager(BindingsFactory bindingsFactory)
+ {
+ super(bindingsFactory);
+ }
+
public Bindings getBindingsForRoutingAddress(final SimpleString address)
{
Bindings bindings = super.getBindingsForRoutingAddress(address);
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -613,6 +613,10 @@
NotificationType.CONSUMER_CREATED +
"','" +
NotificationType.CONSUMER_CLOSED +
+ "','" +
+ NotificationType.PROPOSAL +
+ "','" +
+ NotificationType.PROPOSAL_RESPONSE +
"') AND " +
ManagementHelper.HDR_DISTANCE +
"<" +
Modified:
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -46,6 +46,8 @@
import
org.hornetq.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerRemovedMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.MessageFlowRecord;
@@ -609,6 +611,12 @@
case SECURITY_AUTHENTICATION_VIOLATION:
case SECURITY_PERMISSION_VIOLATION:
break;
+ case PROPOSAL:
+ doProposalReceived(message);
+ break;
+ case PROPOSAL_RESPONSE:
+ doProposalResponseReceived(message);
+ break;
default:
{
throw new IllegalArgumentException("Invalid type " + ntype);
@@ -621,6 +629,37 @@
}
}
+ private synchronized void doProposalReceived(final ClientMessage message) throws
Exception
+ {
+ SimpleString type = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_TYPE);
+ if (type == null)
+ {
+ throw new IllegalStateException("proposal type is null");
+ }
+ SimpleString val = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
+ Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
+ Response response = postOffice.getArbitrator().receive(new Proposal(type, val),
hops + 1);
+ if(response != null)
+ {
+ postOffice.getArbitrator().send(response, 0);
+ }
+ }
+
+ private synchronized void doProposalResponseReceived(final ClientMessage message)
throws Exception
+ {
+ SimpleString type = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_TYPE);
+ if (type == null)
+ {
+ throw new IllegalStateException("proposal type is null");
+ }
+ SimpleString val = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_VALUE);
+ SimpleString alt = (SimpleString)
message.getProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE);
+ Integer hops = (Integer) message.getProperty(ManagementHelper.HDR_DISTANCE);
+ Response response = new Response(type, val, alt);
+ postOffice.getArbitrator().proposed(response);
+ postOffice.getArbitrator().send(response, hops + 1);
+ }
+
private synchronized void clearBindings() throws Exception
{
for (RemoteQueueBinding binding : new
HashSet<RemoteQueueBinding>(bindings.values()))
Added: branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/Arbitrator.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.group.impl.Proposal;
+import org.hornetq.core.server.group.impl.Response;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface Arbitrator
+{
+ SimpleString getName();
+
+ Response propose(Proposal proposal) throws Exception;
+
+ void proposed(Response response) throws Exception;
+
+ void send(Response response, int distance) throws Exception;
+
+ Response receive(Proposal proposal, int distance) throws Exception;
+
+ Response rePropose(Proposal proposal) throws Exception;
+}
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/ProposalHandler.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group;
+
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.core.server.group.impl.Proposal;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public interface ProposalHandler
+{
+ public void handleSend(Proposal proposal, TypedProperties clientMessage);
+
+ void handleReceive(Proposal proposal);
+}
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/ArbitratorConfiguration.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ArbitratorConfiguration
+{
+ private final SimpleString name;
+
+ private final TYPE type;
+
+ private final SimpleString address;
+
+ public ArbitratorConfiguration(final SimpleString name, final TYPE type, SimpleString
address)
+ {
+ this.type = type;
+ this.name = name;
+ this.address = address;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public TYPE getType()
+ {
+ return type;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public enum TYPE
+ {
+ LOCAL("LOCAL"),
+ REMOTE("REMOTE");
+
+ private String type;
+
+ TYPE(String type)
+ {
+ this.type = type;
+ }
+
+ public String getType()
+ {
+ return type;
+ }
+ }
+}
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/LocalArbitrator.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.ConcurrentHashSet;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class LocalArbitrator implements Arbitrator
+{
+ private static Logger log = Logger.getLogger(LocalArbitrator.class);
+
+ private ConcurrentHashMap<SimpleString, Object> map = new
ConcurrentHashMap<SimpleString, Object>();
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private SimpleString address;
+
+ private ScheduledExecutorService scheduledExecutor;
+
+ private ConcurrentHashSet<SimpleString> reProposals = new
ConcurrentHashSet<SimpleString>();
+
+ public LocalArbitrator(final ManagementService managementService, final SimpleString
name, final SimpleString address, ScheduledExecutorService scheduledExecutor)
+ {
+ this.managementService = managementService;
+ this.name = name;
+ this.address = address;
+ this.scheduledExecutor = scheduledExecutor;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+
+ public Response propose(Proposal proposal) throws Exception
+ {
+ if(proposal.getProposal() == null)
+ {
+ Object original = map.get(proposal.getProposalType());
+ return original == null?null:new Response(proposal.getProposalType(),
original);
+ }
+ Response response = new Response(proposal.getProposalType(),
proposal.getProposal());
+ if (map.putIfAbsent(response.getResponseType(), response.getChosen()) == null)
+ {
+ log.info("accepted proposal for " + proposal.getProposalType() +
" with value " + proposal.getProposal());
+ return response;
+ }
+ else
+ {
+ log.info("denied proposal for " + proposal.getProposalType() + "
with value " + proposal.getProposal());
+ return new Response(proposal.getProposalType(), proposal.getProposal(),
map.get(proposal.getProposalType()));
+ }
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ Object value = response.getAlternative() != null ? response.getAlternative() :
response.getOriginal();
+ log.info("sending proposal response for " + response.getResponseType() +
" with value " + value);
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE,
response.getResponseType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
(SimpleString)response.getOriginal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_ALT_VALUE,
(SimpleString)response.getAlternative());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null,
NotificationType.PROPOSAL_RESPONSE, props);
+ managementService.sendNotification(notification);
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ return propose(proposal);
+ }
+
+ public Response rePropose(final Proposal proposal) throws Exception
+ {
+ if(reProposals.addIfAbsent(proposal.getProposalType()))
+ {
+ Response response = new Response(proposal.getProposalType(),
proposal.getProposal());
+ map.replace(proposal.getProposalType(), response);
+ send(response, 0);
+ scheduledExecutor.schedule(new Runnable()
+ {
+ public void run()
+ {
+ reProposals.remove(proposal.getProposalType());
+ }
+ }, 2000, TimeUnit.MILLISECONDS);
+ return response;
+ }
+ else
+ {
+ return new Response(proposal.getProposalType(),
map.get(proposal.getProposalType()));
+ }
+ }
+}
+
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Proposal.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class Proposal
+{
+ private final SimpleString proposalType;
+ private final Object proposal;
+
+ public static final String PROPOSAL_TYPE_HEADER = "_JBM_PROPOSAL_TYPE";
+ public static final String PROPOSAL_HEADER = "_JBM_PROPOSAL";
+
+ public Proposal(SimpleString proposalType, Object proposal)
+ {
+ this.proposal = proposal;
+ this.proposalType = proposalType;
+ }
+
+ public SimpleString getProposalType()
+ {
+ return proposalType;
+ }
+
+ public Object getProposal()
+ {
+ return proposal;
+ }
+}
+
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
===================================================================
---
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/RemoteArbitrator.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,136 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.management.Notification;
+import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.postoffice.BindingType;
+import org.hornetq.core.server.group.Arbitrator;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+import java.util.logging.Logger;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class RemoteArbitrator implements Arbitrator
+{
+ private static Logger log = Logger.getLogger(RemoteArbitrator.class.getName());
+
+ private final SimpleString name;
+
+ private final ManagementService managementService;
+
+ private final SimpleString address;
+
+ private Map<SimpleString, Response> responses = new HashMap<SimpleString,
Response>();
+
+ private final Lock lock = new ReentrantLock();
+
+ private final Condition sendCondition = lock.newCondition();
+
+ private int waitTime = 1000;
+
+ public RemoteArbitrator(final ManagementService managementService, final SimpleString
name, final SimpleString address)
+ {
+ this.name = name;
+ this.address = address;
+ this.managementService = managementService;
+ }
+
+ public SimpleString getName()
+ {
+ return name;
+ }
+
+ public Response propose(final Proposal proposal) throws Exception
+ {
+ Response response = responses.get(proposal.getProposalType());
+ if( response != null)
+ {
+ return response;
+ }
+ if (proposal.getProposal() == null)
+ {
+ return null;
+ }
+ try
+ {
+ lock.lock();
+ TypedProperties props = new TypedProperties();
+ log.info("sending proposal for " + proposal.getProposalType() + "
with value " + proposal.getProposal());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE,
proposal.getProposalType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
(SimpleString)proposal.getProposal());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
+ managementService.sendNotification(notification);
+ sendCondition.await(waitTime, TimeUnit.MILLISECONDS);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ return responses.get(proposal.getProposalType());
+ }
+
+ public void proposed(Response response) throws Exception
+ {
+ Object value = response.getAlternative() != null ? response.getAlternative() :
response.getOriginal();
+ log.info("received proposal response for " + response.getResponseType() +
" with value " + value);
+ try
+ {
+ lock.lock();
+ responses.put(response.getResponseType(), response);
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public Response receive(Proposal proposal, int distance) throws Exception
+ {
+ TypedProperties props = new TypedProperties();
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_TYPE,
proposal.getProposalType());
+ props.putStringProperty(ManagementHelper.HDR_PROPOSAL_VALUE,
(SimpleString)proposal.getProposal());
+ props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE,
BindingType.LOCAL_QUEUE_INDEX);
+ props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance);
+ Notification notification = new Notification(null, NotificationType.PROPOSAL,
props);
+ managementService.sendNotification(notification);
+ return null;
+ }
+
+ public void send(Response response, int distance) throws Exception
+ {
+ }
+
+ public Response rePropose(Proposal proposal)
+ {
+ return null;
+ }
+
+}
+
Added:
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
===================================================================
--- branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java
(rev 0)
+++
branches/hornetq_grouping/src/main/org/hornetq/core/server/group/impl/Response.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.server.group.impl;
+
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class Response
+{
+ private final boolean accepted;
+
+ private final Object original;
+
+ private final Object alternative;
+
+ private SimpleString responseType;
+
+ public Response(SimpleString responseType, Object original)
+ {
+ this(responseType, original, null);
+ }
+
+ public Response(SimpleString responseType, Object original, Object alternative)
+ {
+ this.responseType = responseType;
+ this.accepted = alternative == null;
+ this.original = original;
+ this.alternative = alternative;
+ }
+
+ public boolean isAccepted()
+ {
+ return accepted;
+ }
+
+ public Object getOriginal()
+ {
+ return original;
+ }
+
+ public Object getAlternative()
+ {
+ return alternative;
+ }
+
+ public Object getChosen()
+ {
+ return alternative != null?alternative:original;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "accepted = " + accepted + " original = " + original +
" alternative = " + alternative;
+ }
+
+ public SimpleString getResponseType()
+ {
+ return responseType;
+ }
+}
Modified: branches/hornetq_grouping/tests/hornetq-tests.iml
===================================================================
--- branches/hornetq_grouping/tests/hornetq-tests.iml 2009-09-11 10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/hornetq-tests.iml 2009-09-11 14:22:36 UTC (rev 7955)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<output url="file://$MODULE_DIR$/output/classes" />
<output-test url="file://$MODULE_DIR$/output" />
<exclude-output />
Modified: branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml
===================================================================
--- branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml 2009-09-11 10:56:25
UTC (rev 7954)
+++ branches/hornetq_grouping/tests/jms-tests/hornetq-jms-tests.iml 2009-09-11 14:22:36
UTC (rev 7955)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<output url="file://$MODULE_DIR$/output/classes" />
<output-test url="file://$MODULE_DIR$/output" />
<exclude-output />
Modified: branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml
===================================================================
--- branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml 2009-09-11
10:56:25 UTC (rev 7954)
+++ branches/hornetq_grouping/tests/joram-tests/hornetq-joram-tests.iml 2009-09-11
14:22:36 UTC (rev 7955)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<module relativePaths="true" type="JAVA_MODULE"
version="4">
- <component name="NewModuleRootManager"
inherit-compiler-output="false">
+ <component name="NewModuleRootManager"
inherit-compiler-output="true">
<output url="file://$MODULE_DIR$/output/classes" />
<output-test url="file://$MODULE_DIR$/output" />
<exclude-output />
@@ -14,6 +14,7 @@
<orderEntry type="module" module-name="hornetq-tests" />
<orderEntry type="library" name="messaging_jars"
level="project" />
<orderEntry type="library" name="messaging-joram-tests"
level="project" />
+ <orderEntry type="module" module-name="hornetq-jms-tests"
/>
</component>
</module>
Modified:
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -45,6 +45,10 @@
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
+import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.core.server.group.impl.LocalArbitrator;
+import org.hornetq.core.server.group.impl.RemoteArbitrator;
+import org.hornetq.core.server.group.Arbitrator;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -421,6 +425,52 @@
session.close();
}
+ protected void sendWithProperty(int node, String address, int numMessages, boolean
durable, SimpleString key, SimpleString val) throws Exception
+ {
+ sendInRange(node, address, 0, numMessages, durable, key, val);
+ }
+
+ protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean
durable, SimpleString key, SimpleString val) throws Exception
+ {
+ ClientSessionFactory sf = this.sfs[node];
+
+ if (sf == null)
+ {
+ throw new IllegalArgumentException("No sf at " + node);
+ }
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ ClientProducer producer = session.createProducer(address);
+
+ for (int i = msgStart; i < msgEnd; i++)
+ {
+ ClientMessage message = session.createClientMessage(durable);
+
+ message.putStringProperty(key, val);
+ message.putIntProperty(COUNT_PROP, i);
+ System.out.println("i = " + i);
+ producer.send(message);
+ }
+
+ session.close();
+ }
+
+
+ protected void setUpGroupArbitrator(ArbitratorConfiguration.TYPE type, int node)
+ {
+ Arbitrator arbitrator;
+ if(type == ArbitratorConfiguration.TYPE.LOCAL)
+ {
+ arbitrator = new LocalArbitrator(servers[node].getManagementService(), new
SimpleString("grouparbitrator"), new SimpleString("queues"), null);
+ }
+ else
+ {
+ arbitrator = new RemoteArbitrator(servers[node].getManagementService(), new
SimpleString("grouparbitrator"), new SimpleString("queues"));
+ }
+ this.servers[node].getPostOffice().addArbitrator(arbitrator);
+ }
+
protected void send(int node, String address, int numMessages, boolean durable, String
filterVal) throws Exception
{
sendInRange(node, address, 0, numMessages, durable, filterVal);
Added:
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
---
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
(rev 0)
+++
branches/hornetq_grouping/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -0,0 +1,529 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.cluster.distribution;
+
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.group.impl.ArbitratorConfiguration;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+public class ClusteredGroupingTest extends ClusterTestBase
+{
+
+ public void testGroupingSimple() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+ sendWithProperty(0, "queues.testaddress", 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAll(10, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo2queues() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ /*waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);*/
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(1, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queues() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(1, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesRemoteArbitrator() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 1);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 1);
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 1);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesNoConsumerOnLocalQueue() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ //addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesNoConsumersDeliveredToLocalQueue() throws
Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 0, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 0, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ verifyReceiveAllInRange(0, 30, 1);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ public void testGroupingSendTo3queuesQueueRemoved() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ sendInRange(0, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 0);
+ removeConsumer(0);
+ deleteQueue(0, "queue0");
+ sendInRange(1, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 0);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }
+
+ /*public void testGroupingSendTo3queuesPinnedNodeGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1,
isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1,
isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1,
isNetty(), 2, 0, 1);
+
+ startServers(0, 1, 2);
+
+ try
+ {
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.LOCAL, 0);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 1);
+ setUpGroupArbitrator(ArbitratorConfiguration.TYPE.REMOTE, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null,
false);
+ createQueue(1, "queues.testaddress", "queue0", null,
false);
+ createQueue(2, "queues.testaddress", "queue0", null,
false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 2, false);
+ waitForBindings(1, "queues.testaddress", 2, 2, false);
+ waitForBindings(2, "queues.testaddress", 2, 2, false);
+
+ sendInRange(1, "queues.testaddress", 0, 10, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(0, 10, 1);
+
+ stopClusterConnections(1);
+
+ stopServers(1);
+
+ Thread.sleep(5000);
+
+ sendInRange(2, "queues.testaddress", 10, 20, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(10, 20, 1);
+ sendInRange(0, "queues.testaddress", 20, 30, false,
MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
+
+ verifyReceiveAllInRange(20, 30, 1);
+
+
System.out.println("*****************************************************************************");
+ }
+ finally
+ {
+ //closeAllConsumers();
+
+ closeAllSessionFactories();
+
+ stopServers(0, 1, 2);
+ }
+ }*/
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+
+ public boolean isFileStorage()
+ {
+ return false;
+ }
+}
+
Modified:
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
---
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -39,6 +39,7 @@
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -86,7 +87,7 @@
{
final FakeBinding fake = new FakeBinding(new SimpleString("a"));
- final BindingsImpl bind = new BindingsImpl();
+ final BindingsImpl bind = new BindingsImpl(new FakePostOffice());
bind.addBinding(fake);
bind.addBinding(new FakeBinding(new SimpleString("a")));
bind.addBinding(new FakeBinding(new SimpleString("a")));
Modified:
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
---
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-09-11
10:56:25 UTC (rev 7954)
+++
branches/hornetq_grouping/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-09-11
14:22:36 UTC (rev 7955)
@@ -23,6 +23,7 @@
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.group.Arbitrator;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -150,4 +151,12 @@
}
+ public void addArbitrator(Arbitrator arbitrator)
+ {
+ }
+
+ public Arbitrator getArbitrator()
+ {
+ return null;
+ }
}
\ No newline at end of file