[jboss-cvs] JBoss Messaging SVN: r5780 - in trunk: src/main/org/jboss/messaging/core/postoffice and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Feb 2 15:12:41 EST 2009
Author: timfox
Date: 2009-02-02 15:12:41 -0500 (Mon, 02 Feb 2009)
New Revision: 5780
Modified:
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/schemas/jbm-queues.xsd
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
Log:
More clustering
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -117,7 +117,7 @@
private HierarchicalRepository<Set<Role>> securityRepository;
private HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
+
private MessagingServerControl managedServer;
private final MessageCounterManager messageCounterManager = new MessageCounterManagerImpl(10000);
@@ -127,9 +127,9 @@
private boolean started = false;
private boolean noticationsEnabled;
-
+
private final Set<NotificationListener> listeners = new ConcurrentHashSet<NotificationListener>();
-
+
// Static --------------------------------------------------------
public static ObjectName getMessagingServerObjectName() throws Exception
@@ -214,7 +214,7 @@
this.managementNotificationAddress = configuration.getManagementNotificationAddress();
managedServer = new MessagingServerControl(postOffice,
storageManager,
- configuration,
+ configuration,
resourceManager,
remotingService,
messagingServer,
@@ -240,30 +240,30 @@
AddressControl addressControl = new AddressControl(address, postOffice, securityRepository);
registerInJMX(objectName, new ReplicationAwareAddressControlWrapper(objectName, addressControl));
-
+
registerInRegistry(objectName, addressControl);
-
+
if (log.isDebugEnabled())
{
log.debug("registered address " + objectName);
}
TypedProperties props = new TypedProperties();
-
+
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-
+
sendNotification(new Notification(NotificationType.ADDRESS_ADDED, props));
}
public void unregisterAddress(final SimpleString address) throws Exception
{
ObjectName objectName = getAddressObjectName(address);
-
+
unregisterResource(objectName);
-
+
TypedProperties props = new TypedProperties();
-
+
props.putStringProperty(ManagementHelper.HDR_ADDRESS, address);
-
+
sendNotification(new Notification(NotificationType.ADDRESS_REMOVED, props));
}
@@ -284,14 +284,14 @@
if (log.isDebugEnabled())
{
log.debug("registered queue " + objectName);
- }
+ }
}
public void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
{
ObjectName objectName = getQueueObjectName(address, name);
unregisterResource(objectName);
- messageCounterManager.unregisterMessageCounter(name.toString());
+ messageCounterManager.unregisterMessageCounter(name.toString());
}
public void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
@@ -341,7 +341,7 @@
ObjectName objectName = getBridgeObjectName(configuration.getName());
BridgeControlMBean control = new BridgeControl(bridge, configuration);
registerInJMX(objectName, new StandardMBean(control, BridgeControlMBean.class));
- registerInRegistry(objectName, control);
+ registerInRegistry(objectName, control);
}
public void unregisterBridge(String name) throws Exception
@@ -416,15 +416,15 @@
unregisterFromRegistry(objectName);
unregisterFromJMX(objectName);
}
-
+
public void registerCluster(final ClusterConnection cluster, final ClusterConnectionConfiguration configuration) throws Exception
- {
- //TODO
+ {
+ // TODO
}
-
+
public void unregisterCluster(final String name) throws Exception
- {
- //TODO
+ {
+ // TODO
}
public Object getResource(final ObjectName objectName)
@@ -455,7 +455,7 @@
{
listeners.add(listener);
}
-
+
public void removeNotificationListener(final NotificationListener listener)
{
listeners.remove(listener);
@@ -512,16 +512,16 @@
}
}
}
-
+
public void sendNotification(final Notification notification) throws Exception
- {
+ {
if (managedServer != null && noticationsEnabled)
{
- //This needs to be synchronized since we need to ensure notifications are processed in strict sequence
+ // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
synchronized (this)
{
- //First send to any local listeners
- for (NotificationListener listener: listeners)
+ // First send to any local listeners
+ for (NotificationListener listener : listeners)
{
try
{
@@ -529,19 +529,20 @@
}
catch (Exception e)
{
- //Exception thrown from one listener should not stop execution of others
+ // Exception thrown from one listener should not stop execution of others
log.error("Failed to call listener", e);
- }
+ }
}
-
- //Now send message
-
+
+ // Now send message
+
ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID());
notificationMessage.setBody(new ByteBufferWrapper(ByteBuffer.allocate(0)));
- //Notification messages are always durable so the user can choose whether to add a durable queue to consume them in
+ // Notification messages are always durable so the user can choose whether to add a durable queue to consume
+ // them in
notificationMessage.setDurable(true);
notificationMessage.setDestination(managementNotificationAddress);
-
+
TypedProperties notifProps;
if (notification.getProperties() != null)
{
@@ -551,12 +552,13 @@
{
notifProps = new TypedProperties();
}
-
- notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE, new SimpleString(notification.getType().toString()));
+
+ notifProps.putStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+ new SimpleString(notification.getType().toString()));
notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
+
notificationMessage.putTypedProperties(notifProps);
-
+
postOffice.route(notificationMessage, null);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Binding.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.postoffice;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.util.SimpleString;
@@ -45,8 +46,10 @@
SimpleString getRoutingName();
- boolean filterMatches(ServerMessage message) throws Exception;
+ //boolean filterMatches(ServerMessage message) throws Exception;
+ Filter getFilter();
+
boolean isHighAcceptPriority(ServerMessage message);
//TODO find a better way
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -45,4 +45,6 @@
void addBinding(Binding binding);
void removeBinding(Binding binding);
+
+ void setRouteWhenNoConsumers(boolean routeWhenNoConsumers);
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/QueueInfo.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -45,16 +45,19 @@
private final SimpleString address;
+ private final SimpleString filterString;
+
private final int id;
private List<SimpleString> filterStrings;
private int numberOfConsumers;
- public QueueInfo(final SimpleString queueName, final SimpleString address, final int id)
+ public QueueInfo(final SimpleString queueName, final SimpleString address, final SimpleString filterString, final int id)
{
this.queueName = queueName;
this.address = address;
+ this.filterString = filterString;
this.id = id;
}
@@ -68,6 +71,11 @@
return address;
}
+ public SimpleString getFilterString()
+ {
+ return filterString;
+ }
+
public int getID()
{
return id;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -32,6 +32,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.postoffice.Binding;
@@ -61,7 +62,14 @@
private final Map<Integer, Binding> bindingsMap = new ConcurrentHashMap<Integer, Binding>();
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
+
+ private volatile boolean routeWhenNoConsumers;
+ public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
+ {
+ this.routeWhenNoConsumers = routeWhenNoConsumers;
+ }
+
public Collection<Binding> getBindings()
{
return bindingsMap.values();
@@ -203,7 +211,7 @@
Binding theBinding = null;
- int lastNoMatchingConsumerPos = -1;
+ int lastLowPriorityBinding = -1;
while (true)
{
@@ -227,11 +235,13 @@
}
}
- if (binding.filterMatches(message))
- {
+ Filter filter = binding.getFilter();
+
+ if (filter == null || filter.match(message))
+ {
// bindings.length == 1 ==> only a local queue so we don't check for matching consumers (it's an
// unnecessary overhead)
- if (length == 1 || binding.isHighAcceptPriority(message))
+ if (length == 1 || routeWhenNoConsumers || binding.isHighAcceptPriority(message))
{
theBinding = binding;
@@ -241,7 +251,10 @@
}
else
{
- lastNoMatchingConsumerPos = pos;
+ if (lastLowPriorityBinding == -1)
+ {
+ lastLowPriorityBinding = pos;
+ }
}
}
@@ -249,7 +262,7 @@
if (pos == startPos)
{
- if (lastNoMatchingConsumerPos != -1)
+ if (lastLowPriorityBinding != -1)
{
try
{
@@ -262,7 +275,7 @@
{
pos = 0;
- lastNoMatchingConsumerPos = -1;
+ lastLowPriorityBinding = -1;
continue;
}
@@ -272,7 +285,7 @@
}
}
- pos = lastNoMatchingConsumerPos;
+ pos = lastLowPriorityBinding;
pos = incrementPos(pos, length);
}
@@ -286,7 +299,7 @@
chosen.add(theBinding.getBindable());
}
-
+
routingNamePositions.put(routingName, pos);
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/DivertBinding.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -80,17 +80,9 @@
this.id = id;
}
-
- public boolean filterMatches(final ServerMessage message) throws Exception
+ public Filter getFilter()
{
- if (filter != null && !filter.match(message))
- {
- return false;
- }
- else
- {
- return true;
- }
+ return filter;
}
public SimpleString getAddress()
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/LocalQueueBinding.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -78,19 +78,11 @@
this.id = id;
}
-
- public boolean filterMatches(final ServerMessage message) throws Exception
+ public Filter getFilter()
{
- if (filter != null && !filter.match(message))
- {
- return false;
- }
- else
- {
- return true;
- }
+ return filter;
}
-
+
public SimpleString getAddress()
{
return address;
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -37,6 +37,7 @@
import org.jboss.messaging.core.client.management.impl.ManagementHelper;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.management.ManagementService;
import org.jboss.messaging.core.management.Notification;
@@ -243,8 +244,10 @@
Integer transientID = (Integer)props.getProperty(ManagementHelper.HDR_BINDING_ID);
- QueueInfo info = new QueueInfo(queueName, address, transientID);
+ SimpleString filterString = (SimpleString)props.getProperty(ManagementHelper.HDR_FILTERSTRING);
+ QueueInfo info = new QueueInfo(queueName, address, filterString, transientID);
+
queueInfos.put(queueName, info);
}
else if (type == NotificationType.BINDING_REMOVED)
@@ -369,6 +372,11 @@
props.putStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
props.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, binding.getRoutingName());
props.putIntProperty(ManagementHelper.HDR_BINDING_ID, binding.getID());
+ Filter filter = binding.getFilter();
+ if (filter != null)
+ {
+ props.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filter.getFilterString());
+ }
managementService.sendNotification(new Notification(NotificationType.BINDING_ADDED, props));
}
@@ -569,9 +577,7 @@
return cache;
}
-
-
-
+
public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
{
//We send direct to the queue so we can send it to the same queue that is bound to the notifications adress - this is crucial for ensuring
@@ -607,6 +613,7 @@
message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
message.putStringProperty(ManagementHelper.HDR_QUEUE_NAME, info.getQueueName());
message.putIntProperty(ManagementHelper.HDR_BINDING_ID, info.getID());
+ message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
queue.preroute(message, null);
queue.route(message, null);
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/BridgeImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -170,8 +170,7 @@
final MessageHandler queueInfoMessageHandler,
final String queueDataAddress,
final boolean forClusterConnector) throws Exception
- {
- log.info("Creating new bridge " + name + " queue " + queue);
+ {
this.name = name;
this.queue = queue;
@@ -213,8 +212,6 @@
this.queueInfoMessageHandler = queueInfoMessageHandler;
- log.info("queue info handler " + this.queueInfoMessageHandler);
-
this.queueDataAddress = queueDataAddress;
this.forClusterConnector = forClusterConnector;
@@ -252,7 +249,6 @@
{
try
{
- log.info("creating objects");
createTx();
queue.addConsumer(BridgeImpl.this);
@@ -320,8 +316,6 @@
prod.send(message);
}
- log.info("Created objects");
-
active = true;
queue.deliverAsync(executor);
@@ -497,8 +491,6 @@
return;
}
- log.info("sending batch");
-
// TODO - if batch size = 1 then don't need tx - actually we should use asynch send acknowledgement stream - then we don't need a transaction at all
while (true)
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterConnectionImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -43,9 +43,9 @@
import org.jboss.messaging.core.management.NotificationType;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.postoffice.impl.LocalQueueBinding;
-import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.cluster.Bridge;
@@ -84,7 +84,7 @@
private final boolean useDuplicateDetection;
- private final boolean forwardWhenNoMatchingConsumers;
+ private final boolean routeWhenNoConsumers;
private Map<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord> records = new HashMap<Pair<TransportConfiguration, TransportConfiguration>, MessageFlowRecord>();
@@ -103,7 +103,7 @@
final SimpleString address,
final BridgeConfiguration bridgeConfig,
final boolean useDuplicateDetection,
- final boolean forwardWhenNoMatchingConsumers,
+ final boolean routeWhenNoConsumers,
final ExecutorFactory executorFactory,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -119,7 +119,7 @@
this.useDuplicateDetection = useDuplicateDetection;
- this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+ this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
@@ -143,7 +143,7 @@
final SimpleString address,
final BridgeConfiguration bridgeConfig,
final boolean useDuplicateDetection,
- final boolean forwardWhenNoMatchingConsumers,
+ final boolean routeWhenNoConsumers,
final ExecutorFactory executorFactory,
final StorageManager storageManager,
final PostOffice postOffice,
@@ -171,7 +171,7 @@
this.useDuplicateDetection = useDuplicateDetection;
- this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
+ this.routeWhenNoConsumers = routeWhenNoConsumers;
}
public synchronized void start() throws Exception
@@ -424,8 +424,7 @@
NotificationType type = NotificationType.valueOf(message.getProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
.toString());
-
-
+
if (type == NotificationType.BINDING_ADDED)
{
SimpleString uniqueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
@@ -444,13 +443,16 @@
queueID,
filterString,
queue,
- useDuplicateDetection,
- forwardWhenNoMatchingConsumers,
+ useDuplicateDetection,
bridge.getName());
bindings.put(queueName, binding);
postOffice.addBinding(binding);
+
+ Bindings theBindings = postOffice.getBindingsForAddress(queueAddress);
+
+ theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
}
else if (type == NotificationType.BINDING_REMOVED)
{
@@ -467,8 +469,13 @@
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
RemoteQueueBinding binding = bindings.get(queueName);
-
- binding.addConsumer(filterString);
+
+ if (binding != null)
+ {
+ //Can legitimately be null if there are multiple cluster connections which will all receive create consumers for different addresses since
+ //the address isn't checked on the filter when it's an add or create consumer message
+ binding.addConsumer(filterString);
+ }
}
else if (type == NotificationType.CONSUMER_CLOSED)
{
@@ -477,8 +484,14 @@
SimpleString filterString = (SimpleString)message.getProperty(ManagementHelper.HDR_FILTERSTRING);
RemoteQueueBinding binding = bindings.get(queueName);
+
+ if (binding != null)
+ {
+ //Can legitimately be null if there are multiple cluster connections which will all receive create consumers for different addresses since
+ //the address isn't checked on the filter when it's an add or create consumer message
- binding.removeConsumer(filterString);
+ binding.removeConsumer(filterString);
+ }
}
}
catch (Exception e)
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/ClusterManagerImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -171,7 +171,6 @@
managementService.unregisterBridge(bridge.getName().toString());
}
- log.info("stopping cluster connecttions");
for (ClusterConnection clusterConnection : clusters.values())
{
clusterConnection.stop();
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -72,8 +72,6 @@
private final boolean duplicateDetection;
- private final boolean forwardWhenNoMatchingConsumers;
-
private final SimpleString idsHeaderName;
private int id;
@@ -84,8 +82,7 @@
final int remoteQueueID,
final SimpleString filterString,
final Queue storeAndForwardQueue,
- final boolean duplicateDetection,
- final boolean forwardWhenNoMatchingConsumers,
+ final boolean duplicateDetection,
final SimpleString bridgeName) throws Exception
{
this.address = address;
@@ -109,8 +106,6 @@
queueFilter = null;
}
- this.forwardWhenNoMatchingConsumers = forwardWhenNoMatchingConsumers;
-
this.idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(bridgeName);
}
@@ -154,25 +149,13 @@
return false;
}
- public boolean filterMatches(final ServerMessage message) throws Exception
+ public Filter getFilter()
{
- if (queueFilter != null && !queueFilter.match(message))
- {
- return false;
- }
- else
- {
- return true;
- }
+ return queueFilter;
}
public boolean isHighAcceptPriority(final ServerMessage message)
- {
- if (forwardWhenNoMatchingConsumers)
- {
- return true;
- }
-
+ {
if (consumerCount == 0)
{
return false;
@@ -198,8 +181,6 @@
public void willRoute(final ServerMessage message)
{
- log.info("routing to remote queue binding");
-
//We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
//TODO - this can be optimised
Modified: trunk/src/schemas/jbm-queues.xsd
===================================================================
--- trunk/src/schemas/jbm-queues.xsd 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/src/schemas/jbm-queues.xsd 2009-02-02 20:12:41 UTC (rev 5780)
@@ -74,7 +74,7 @@
<xsd:attribute name="match" type="xsd:string" use="required"></xsd:attribute>
</xsd:complexType>
- <xsd:element name="queue" type="queueType"></xsd:element>
+ <xsd:element name="queue" type="queueType"></xsd:element>
<xsd:complexType name="queueType">
<xsd:attribute name="name" type="xsd:string" use="required"></xsd:attribute>
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -90,7 +90,7 @@
private static final SimpleString COUNT_PROP = new SimpleString("count_prop");
- private static final SimpleString FILTER_PROP = new SimpleString("animal");
+ protected static final SimpleString FILTER_PROP = new SimpleString("animal");
private static final int MAX_SERVERS = 10;
@@ -136,7 +136,7 @@
}
}
- //log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
+ log.info("binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
{
@@ -277,7 +277,7 @@
{
ClientMessage message = consumer.receive(500);
- assertNotNull(message);
+ assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
assertEquals(j, message.getProperty(COUNT_PROP));
}
@@ -299,7 +299,7 @@
ClientMessage message = consumer.receive(500);
- assertNotNull(message);
+ assertNotNull("consumer " + consumerIDs[count] + " did not receive message " + i, message);
assertEquals(i, message.getProperty(COUNT_PROP));
@@ -323,7 +323,7 @@
throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
}
- assertNull(consumer.receive(200));
+ assertNull("consumer " + i + " received message", consumer.receive(200));
}
}
@@ -348,6 +348,9 @@
}
ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc);
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
sfs[node] = sf;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -40,9 +40,7 @@
super.setUp();
setupServer(0, false, false);
- setupServer(1, false, false);
-
- setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ setupServer(1, false, false);
}
@Override
@@ -55,40 +53,53 @@
public void testStartTargetServerBeforeSourceServer() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
setupSessionFactory(1, false);
- createQueue(1, "queues.testaddress", "queue0", null, false);
+ String myFilter = "zebra";
+
+ createQueue(1, "queues.testaddress", "queue0", myFilter, false);
addConsumer(0, 1, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, false);
- send(0, "queues.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, myFilter);
verifyReceiveAll(10, 0);
verifyNotReceive(0);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyNotReceive(0);
}
public void testStartSourceServerBeforeTargetServer() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(0, 1);
setupSessionFactory(0, false);
setupSessionFactory(1, false);
+
+ String myFilter = "bison";
- createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", myFilter, false);
addConsumer(0, 1, "queue0", null);
waitForBindings(0, "queues.testaddress", 1, 1, false);
- send(0, "queues.testaddress", 10, false, null);
+ send(0, "queues.testaddress", 10, false, myFilter);
verifyReceiveAll(10, 0);
verifyNotReceive(0);
+
+ send(0, "queues.testaddress", 10, false, null);
+ verifyNotReceive(0);
}
public void testBasicLocalReceive() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
@@ -107,6 +118,7 @@
public void testBasicRoundRobin() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
@@ -130,6 +142,7 @@
public void testRoundRobinMultipleQueues() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
@@ -166,9 +179,10 @@
verifyNotReceive(0, 1, 2, 3, 4, 5);
}
-
+
public void testMultipleNonLoadBalancedQueues() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
@@ -212,6 +226,7 @@
public void testMixtureLoadBalancedAndNonLoadBalancedQueues() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
@@ -275,8 +290,141 @@
verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
}
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnTargetBeforeStartSource() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ startServers(1);
+
+ setupSessionFactory(1, false);
+
+ createQueue(1, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue8", null, false);
+ createQueue(1, "queues.testaddress", "queue9", null, false);
+
+ createQueue(1, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue12", null, false);
+
+
+ addConsumer(5, 1, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+
+ addConsumer(11, 1, "queue10", null);
+ addConsumer(13, 1, "queue11", null);
+ addConsumer(15, 1, "queue12", null);
+
+ startServers(0);
+
+ waitForBindings(0, "queues.testaddress", 8, 8, false);
+
+ setupSessionFactory(0, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(0, "queues.testaddress", "queue11", null, false);
+ createQueue(0, "queues.testaddress", "queue12", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(12, 0, "queue11", null);
+ addConsumer(14, 0, "queue12", null);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ verifyReceiveRoundRobin(10, 11, 10);
+ verifyReceiveRoundRobin(10, 13, 12);
+ verifyReceiveRoundRobin(10, 15, 14);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ }
+
+ public void testMixtureLoadBalancedAndNonLoadBalancedQueuesAddQueuesOnSourceBeforeStartTarget() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ startServers(0);
+
+ setupSessionFactory(0, false);
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(0, "queues.testaddress", "queue11", null, false);
+ createQueue(0, "queues.testaddress", "queue12", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+
+ addConsumer(10, 0, "queue10", null);
+ addConsumer(12, 0, "queue11", null);
+ addConsumer(14, 0, "queue12", null);
+
+ startServers(1);
+
+ setupSessionFactory(1, false);
+
+ createQueue(1, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue8", null, false);
+ createQueue(1, "queues.testaddress", "queue9", null, false);
+
+ createQueue(1, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+ createQueue(1, "queues.testaddress", "queue12", null, false);
+
+
+ addConsumer(5, 1, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+
+ addConsumer(11, 1, "queue10", null);
+ addConsumer(13, 1, "queue11", null);
+ addConsumer(15, 1, "queue12", null);
+
+ waitForBindings(0, "queues.testaddress", 8, 8, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ verifyReceiveRoundRobin(10, 10, 11);
+ verifyReceiveRoundRobin(10, 12, 13);
+ verifyReceiveRoundRobin(10, 14, 15);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
+ }
+
public void testNotRouteToNonMatchingAddress() throws Exception
{
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
startServers(1, 0);
setupSessionFactory(0, false);
@@ -309,6 +457,615 @@
verifyNotReceive(2, 3, 4, 5);
}
+
+ public void testNonLoadBalancedQueuesWithFilters() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ startServers(1, 0);
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ String filter1 = "giraffe";
+ String filter2 = "aardvark";
+ createQueue(0, "queues.testaddress", "queue0", filter1, false);
+ createQueue(0, "queues.testaddress", "queue1", filter2, false);
+ createQueue(0, "queues.testaddress", "queue2", filter1, false);
+ createQueue(0, "queues.testaddress", "queue3", filter2, false);
+ createQueue(0, "queues.testaddress", "queue4", filter1, false);
+
+
+ createQueue(1, "queues.testaddress", "queue5", filter2, false);
+ createQueue(1, "queues.testaddress", "queue6", filter1, false);
+ createQueue(1, "queues.testaddress", "queue7", filter2, false);
+ createQueue(1, "queues.testaddress", "queue8", filter1, false);
+ createQueue(1, "queues.testaddress", "queue9", filter2, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+
+ addConsumer(5, 1, "queue5", null);
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+
+ addConsumer(10, 0, "queue10", null);
+
+ addConsumer(11, 1, "queue11", null);
+
+ waitForBindings(0, "queues.testaddress", 6, 6, true);
+ waitForBindings(0, "queues.testaddress", 6, 6, false);
+
+ send(0, "queues.testaddress", 10, false, filter1);
+
+ verifyReceiveAll(10, 0, 2, 4, 6, 8, 10, 11);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+ send(0, "queues.testaddress", 10, false, filter2);
+
+ verifyReceiveAll(10, 1, 3, 5, 7, 9, 10, 11);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+ }
+
+ public void testRoundRobinMultipleQueuesWithFilters() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ String filter1 = "giraffe";
+ String filter2 = "aardvark";
+
+ createQueue(0, "queues.testaddress", "queue0", filter1, false);
+ createQueue(1, "queues.testaddress", "queue0", filter1, false);
+
+ createQueue(0, "queues.testaddress", "queue1", filter1, false);
+ createQueue(1, "queues.testaddress", "queue1", filter2, false);
+
+ createQueue(0, "queues.testaddress", "queue2", filter2, false);
+ createQueue(1, "queues.testaddress", "queue2", filter1, false);
+
+ createQueue(0, "queues.testaddress", "queue3", filter2, false);
+ createQueue(1, "queues.testaddress", "queue3", filter2, false);
+
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+ createQueue(1, "queues.testaddress", "queue4", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+
+ addConsumer(2, 0, "queue1", null);
+ addConsumer(3, 1, "queue1", null);
+
+ addConsumer(4, 0, "queue2", null);
+ addConsumer(5, 1, "queue2", null);
+
+ addConsumer(6, 0, "queue3", null);
+ addConsumer(7, 1, "queue3", null);
+
+ addConsumer(8, 0, "queue4", null);
+ addConsumer(9, 1, "queue4", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(0, "queues.testaddress", 5, 5, false);
+
+ send(0, "queues.testaddress", 10, false, filter1);
+
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyReceiveRoundRobin(10, 8, 9);
+
+ verifyReceiveAll(10, 2, 5);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ send(0, "queues.testaddress", 10, false, filter2);
+
+ verifyReceiveRoundRobin(10, 6, 7);
+ verifyReceiveRoundRobin(10, 8, 9);
+
+ verifyReceiveAll(10, 3, 4);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ }
+
+ public void testRouteWhenNoConsumersFalseNonBalancedQueues() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue3", null, false);
+ createQueue(1, "queues2.testaddress", "queue4", null, false);
+ createQueue(1, "queues2.testaddress", "queue5", null, false);
+
+ waitForBindings(0, "queues2.testaddress", 3, 0, true);
+ waitForBindings(0, "queues2.testaddress", 3, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+
+ addConsumer(3, 1, "queue3", null);
+ addConsumer(4, 1, "queue4", null);
+ addConsumer(5, 1, "queue5", null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testRouteWhenNoConsumersTrueNonBalancedQueues() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue3", null, false);
+ createQueue(1, "queues2.testaddress", "queue4", null, false);
+ createQueue(1, "queues2.testaddress", "queue5", null, false);
+
+ waitForBindings(0, "queues2.testaddress", 3, 0, true);
+ waitForBindings(0, "queues2.testaddress", 3, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+
+ addConsumer(3, 1, "queue3", null);
+ addConsumer(4, 1, "queue4", null);
+ addConsumer(5, 1, "queue5", null);
+
+ verifyReceiveAll(10, 0, 1, 2, 3, 4, 5);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testRouteWhenNoConsumersFalseLoadBalancedQueues() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue0", null, false);
+ createQueue(1, "queues2.testaddress", "queue1", null, false);
+ createQueue(1, "queues2.testaddress", "queue2", null, false);
+
+ waitForBindings(0, "queues2.testaddress", 3, 0, true);
+ waitForBindings(0, "queues2.testaddress", 3, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+
+ addConsumer(3, 1, "queue0", null);
+ addConsumer(4, 1, "queue1", null);
+ addConsumer(5, 1, "queue2", null);
+
+ //If route when no consumers is false but there is no consumer on the local queue then messages should be round robin'd
+ //It's only in the case where there is a local consumer they shouldn't be round robin'd
+
+ verifyReceiveRoundRobin(10, 0, 3);
+ verifyReceiveRoundRobin(10, 1, 4);
+ verifyReceiveRoundRobin(10, 2, 5);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testRouteWhenNoConsumersFalseLoadBalancedQueuesLocalConsumer() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue0", null, false);
+ createQueue(1, "queues2.testaddress", "queue1", null, false);
+ createQueue(1, "queues2.testaddress", "queue2", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+
+ waitForBindings(0, "queues2.testaddress", 3, 3, true);
+ waitForBindings(0, "queues2.testaddress", 3, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(3, 1, "queue0", null);
+ addConsumer(4, 1, "queue1", null);
+ addConsumer(5, 1, "queue2", null);
+
+ //In this case, since the local queue has a consumer, it should receive all the messages
+
+ verifyReceiveAll(10, 0, 1, 2);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testRouteWhenNoConsumersFalseLoadBalancedQueuesNoLocalQueue() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue0", null, false);
+ createQueue(1, "queues2.testaddress", "queue1", null, false);
+
+ waitForBindings(0, "queues2.testaddress", 2, 0, true);
+ waitForBindings(0, "queues2.testaddress", 2, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+
+ addConsumer(2, 1, "queue0", null);
+ addConsumer(3, 1, "queue1", null);
+
+ verifyReceiveRoundRobin(10, 0, 2);
+ verifyReceiveRoundRobin(10, 1, 3);
+
+ verifyNotReceive(0, 1, 2, 3);
+ }
+
+ public void testRouteWhenNoConsumersTrueLoadBalancedQueues() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue0", null, false);
+ createQueue(1, "queues2.testaddress", "queue1", null, false);
+ createQueue(1, "queues2.testaddress", "queue2", null, false);
+
+ waitForBindings(0, "queues2.testaddress", 3, 0, true);
+ waitForBindings(0, "queues2.testaddress", 3, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+
+ addConsumer(3, 1, "queue0", null);
+ addConsumer(4, 1, "queue1", null);
+ addConsumer(5, 1, "queue2", null);
+
+ verifyReceiveRoundRobin(10, 0, 3);
+ verifyReceiveRoundRobin(10, 1, 4);
+ verifyReceiveRoundRobin(10, 2, 5);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testRouteWhenNoConsumersTrueLoadBalancedQueuesLocalConsumer() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue0", null, false);
+ createQueue(1, "queues2.testaddress", "queue1", null, false);
+ createQueue(1, "queues2.testaddress", "queue2", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+
+ waitForBindings(0, "queues2.testaddress", 3, 3, true);
+ waitForBindings(0, "queues2.testaddress", 3, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(3, 1, "queue0", null);
+ addConsumer(4, 1, "queue1", null);
+ addConsumer(5, 1, "queue2", null);
+
+ verifyReceiveRoundRobin(10, 0, 3);
+ verifyReceiveRoundRobin(10, 1, 4);
+ verifyReceiveRoundRobin(10, 2, 5);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5);
+ }
+
+ public void testRouteWhenNoConsumersTrueLoadBalancedQueuesNoLocalQueue() throws Exception
+ {
+ setupClusterConnection("cluster2", 0, 1, "queues2", true, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ createQueue(0, "queues2.testaddress", "queue0", null, false);
+ createQueue(0, "queues2.testaddress", "queue1", null, false);
+
+ createQueue(1, "queues2.testaddress", "queue0", null, false);
+ createQueue(1, "queues2.testaddress", "queue1", null, false);
+
+ waitForBindings(0, "queues2.testaddress", 2, 0, true);
+ waitForBindings(0, "queues2.testaddress", 2, 0, false);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+
+ addConsumer(2, 1, "queue0", null);
+ addConsumer(3, 1, "queue1", null);
+
+ verifyReceiveRoundRobin(10, 0, 2);
+ verifyReceiveRoundRobin(10, 1, 3);
+
+ verifyNotReceive(0, 1, 2, 3);
+ }
+
+ public void testNonLoadBalancedQueuesWithConsumersWithFilters() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ String filter1 = "giraffe";
+ String filter2 = "aardvark";
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+
+
+ createQueue(1, "queues.testaddress", "queue5", null, false);
+ createQueue(1, "queues.testaddress", "queue6", null, false);
+ createQueue(1, "queues.testaddress", "queue7", null, false);
+ createQueue(1, "queues.testaddress", "queue8", null, false);
+ createQueue(1, "queues.testaddress", "queue9", null, false);
+
+ createQueue(0, "queues.testaddress", "queue10", null, false);
+ createQueue(1, "queues.testaddress", "queue11", null, false);
+
+
+ addConsumer(0, 0, "queue0", filter1);
+ addConsumer(1, 0, "queue1", filter2);
+ addConsumer(2, 0, "queue2", filter1);
+ addConsumer(3, 0, "queue3", filter2);
+ addConsumer(4, 0, "queue4", filter1);
+
+ addConsumer(5, 1, "queue5", filter2);
+ addConsumer(6, 1, "queue6", filter1);
+ addConsumer(7, 1, "queue7", filter2);
+ addConsumer(8, 1, "queue8", filter1);
+ addConsumer(9, 1, "queue9", filter2);
+
+ addConsumer(10, 0, "queue10", null);
+
+ addConsumer(11, 1, "queue11", null);
+
+ waitForBindings(0, "queues.testaddress", 6, 6, true);
+ waitForBindings(0, "queues.testaddress", 6, 6, false);
+
+ send(0, "queues.testaddress", 10, false, filter1);
+
+ verifyReceiveAll(10, 0, 2, 4, 6, 8, 10, 11);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+ send(0, "queues.testaddress", 10, false, filter2);
+
+ verifyReceiveAll(10, 1, 3, 5, 7, 9, 10, 11);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+ }
+
+ public void testRoundRobinMultipleQueuesWithConsumersWithFilters() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues", false, false);
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ String filter1 = "giraffe";
+ String filter2 = "aardvark";
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+
+ createQueue(0, "queues.testaddress", "queue1", null, false);
+ createQueue(1, "queues.testaddress", "queue1", null, false);
+
+ createQueue(0, "queues.testaddress", "queue2", null, false);
+ createQueue(1, "queues.testaddress", "queue2", null, false);
+
+ createQueue(0, "queues.testaddress", "queue3", null, false);
+ createQueue(1, "queues.testaddress", "queue3", null, false);
+
+ createQueue(0, "queues.testaddress", "queue4", null, false);
+ createQueue(1, "queues.testaddress", "queue4", null, false);
+
+ addConsumer(0, 0, "queue0", filter1);
+ addConsumer(1, 1, "queue0", filter1);
+
+ addConsumer(2, 0, "queue1", filter1);
+ addConsumer(3, 1, "queue1", filter2);
+
+ addConsumer(4, 0, "queue2", filter2);
+ addConsumer(5, 1, "queue2", filter1);
+
+ addConsumer(6, 0, "queue3", filter2);
+ addConsumer(7, 1, "queue3", filter2);
+
+ addConsumer(8, 0, "queue4", null);
+ addConsumer(9, 1, "queue4", null);
+
+ waitForBindings(0, "queues.testaddress", 5, 5, true);
+ waitForBindings(0, "queues.testaddress", 5, 5, false);
+
+ send(0, "queues.testaddress", 10, false, filter1);
+
+ verifyReceiveRoundRobin(10, 0, 1);
+ verifyReceiveRoundRobin(10, 8, 9);
+
+ verifyReceiveAll(10, 2, 5);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ send(0, "queues.testaddress", 10, false, filter2);
+
+ verifyReceiveRoundRobin(10, 6, 7);
+ verifyReceiveRoundRobin(10, 8, 9);
+
+ verifyReceiveAll(10, 3, 4);
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+
+ }
+
+ public void testMultipleClusterConnections() throws Exception
+ {
+ setupClusterConnection("cluster1", 0, 1, "queues1", false, false);
+ setupClusterConnection("cluster2", 0, 1, "queues2", false, false);
+ setupClusterConnection("cluster3", 0, 1, "queues3", false, false);
+
+ startServers(1, 0);
+
+ setupSessionFactory(0, false);
+ setupSessionFactory(1, false);
+
+ //Make sure the different connections don't conflict
+
+ createQueue(0, "queues1.testaddress", "queue0", null, false);
+ createQueue(0, "queues1.testaddress", "queue1", null, false);
+ createQueue(0, "queues2.testaddress", "queue2", null, false);
+ createQueue(0, "queues2.testaddress", "queue3", null, false);
+ createQueue(0, "queues3.testaddress", "queue4", null, false);
+ createQueue(0, "queues3.testaddress", "queue5", null, false);
+
+
+ createQueue(1, "queues1.testaddress", "queue6", null, false);
+ createQueue(1, "queues1.testaddress", "queue7", null, false);
+ createQueue(1, "queues2.testaddress", "queue8", null, false);
+ createQueue(1, "queues2.testaddress", "queue9", null, false);
+ createQueue(1, "queues3.testaddress", "queue10", null, false);
+ createQueue(1, "queues3.testaddress", "queue11", null, false);
+
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 0, "queue1", null);
+ addConsumer(2, 0, "queue2", null);
+ addConsumer(3, 0, "queue3", null);
+ addConsumer(4, 0, "queue4", null);
+ addConsumer(5, 0, "queue5", null);
+
+
+ addConsumer(6, 1, "queue6", null);
+ addConsumer(7, 1, "queue7", null);
+ addConsumer(8, 1, "queue8", null);
+ addConsumer(9, 1, "queue9", null);
+ addConsumer(10, 1, "queue10", null);
+ addConsumer(11, 1, "queue11", null);
+
+ waitForBindings(0, "queues1.testaddress", 2, 2, true);
+ waitForBindings(0, "queues1.testaddress", 2, 2, false);
+
+ waitForBindings(0, "queues2.testaddress", 2, 2, true);
+ waitForBindings(0, "queues2.testaddress", 2, 2, false);
+
+ waitForBindings(0, "queues3.testaddress", 2, 2, true);
+ waitForBindings(0, "queues3.testaddress", 2, 2, false);
+
+ send(0, "queues1.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 0, 1, 6, 7);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+ send(0, "queues2.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 2, 3, 8, 9);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+
+ send(0, "queues3.testaddress", 10, false, null);
+
+ verifyReceiveAll(10, 4, 5, 10, 11);
+
+ verifyNotReceive(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
+ }
+
+//
+// public void testDurableAndRestart()
+// {
+// }
+//
+// public void testWithNetty()
+// {
+// }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-02-02 18:57:51 UTC (rev 5779)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/FakeBinding.java 2009-02-02 20:12:41 UTC (rev 5780)
@@ -22,6 +22,7 @@
package org.jboss.messaging.tests.performance.persistence;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.postoffice.Binding;
import org.jboss.messaging.core.server.Bindable;
import org.jboss.messaging.core.server.ServerMessage;
@@ -35,6 +36,12 @@
public class FakeBinding implements Binding
{
+ public Filter getFilter()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public int getID()
{
// TODO Auto-generated method stub
More information about the jboss-cvs-commits
mailing list